Look, Azure capacity hiccups can really derail your day. You don’t get any warning—no “Heads up, we’re almost out of your preferred VM SKU”—you just try to create a VM and boom: error.
After one of those “oh-crap” moments hit some of my customers, I built a simple monitor that alerts you before you slam into that capacity wall—so you’re never blindsided again.
Thought I’d share it here—maybe save you from the same headache.
What this thing does
This solution isn't fancy, but it works. Here's what it'll do for you:
- Checks if your favorite VM types are actually available in your regions
- Shows exactly WHY something's unavailable if there's a problem
- Suggests similar VM types you could use instead (lifesaver!)
- Logs everything to Azure Log Analytics so you can track trends
- Works right from your terminal - no fancy setup needed
How it's put together
It's pretty simple really - just two main Python scripts:
- The Monitoring Script: Checks VM availability using Azure's API
- Log Analytics Setup: Stores your data for later analysis (optional, but super useful)
Here's a quick diagram:
Before you start
You'll need a few things:
1. Azure CLI installed and working on your machine
# If you haven't logged in yet
az login
2. Azure permissions if you're doing the Log Analytics part:
# Get your username first
USER_PRINCIPAL=$(az ad signed-in-user show --query userPrincipalName -o tsv)
echo "Looks like you're logged in as: $USER_PRINCIPAL"
# Create a resource group - you can change the name if you want
az group create --name vm-sku-monitor-rg --location eastus2
# Give yourself the right permissions
az role assignment create \
--assignee "$USER_PRINCIPAL" \
--role "Monitoring Metrics Publisher" \
--scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"
# Double-check it worked
az role assignment list \
--assignee "$USER_PRINCIPAL" \
--role "Monitoring Metrics Publisher" \
--scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"
Azure can be kinda slow with permissions sometimes. If you get weird 403 errors later, maybe grab a coffee and try again in 10-15 mins.
3. Python environment setup:
# Set up a virtual environment - don't skip this step!
# I learned this the hard way when I borked my system Python...
python3 -m venv venv
# Activate it
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install what we need
pip install azure-identity azure-mgmt-compute azure-mgmt-subscription azure-monitor-ingestion rich
Let's build this thing
1. The VM Capacity Checking Script
The star of the show is the monitoring script itself. This script does all the heavy lifting - checking VM availability, showing you what's happening, and logging the data for later.
I'll call it monitor_vm_sku_capacity.py:
#!/usr/bin/env python
"""
Azure VM SKU Capacity Monitor
This script checks the availability of specific VM SKUs in Azure regions
and provides information about capacity constraints and alternative options.
"""
import argparse
import datetime
import json
import logging
import os
import re
import subprocess
import sys
from typing import Dict, List, Any, Optional, Tuple
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.subscription import SubscriptionClient
from azure.core.exceptions import HttpResponseError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('vm_sku_capacity_monitor')
try:
from rich.console import Console
from rich.table import Table
from rich import box
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description='Azure VM SKU Capacity Monitor')
parser.add_argument('--region', type=str, default='eastus2',
help='Azure region to check (default: eastus2)')
parser.add_argument('--sku', type=str, default='Standard_D16ds_v5',
help='VM SKU to check (default: Standard_D16ds_v5)')
parser.add_argument('--log-analytics', action='store_true',
help='Enable logging to Azure Log Analytics')
parser.add_argument('--endpoint', type=str,
help='Azure Monitor Data Collection Endpoint URI')
parser.add_argument('--rule-id', type=str,
help='Azure Monitor Data Collection Rule ID')
parser.add_argument('--stream-name', type=str, default='Custom-VMSKUCapacity_CL',
help='Azure Monitor Log Analytics stream name')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--config', type=str,
help='Path to configuration file')
parser.add_argument('--subscription-id', type=str,
help='Azure Subscription ID')
return parser.parse_args()
def load_configuration(args):
"""Load configuration from file or command line arguments."""
config = {
'region': args.region,
'target_sku': args.sku,
'check_zones': True,
'subscription_id': args.subscription_id,
'log_analytics': {
'enabled': args.log_analytics,
'endpoint': args.endpoint,
'rule_id': args.rule_id,
'stream_name': args.stream_name
},
'check_interval_minutes': 15
}
if args.config:
try:
with open(args.config, 'r') as f:
file_config = json.load(f)
logger.info(f"Configuration loaded from {args.config}")
# Update config with file values
config['region'] = file_config.get('region', config['region'])
config['target_sku'] = file_config.get('target_sku', config['target_sku'])
config['check_zones'] = file_config.get('check_zones', config['check_zones'])
config['check_interval_minutes'] = file_config.get('check_interval_minutes', config['check_interval_minutes'])
config['subscription_id'] = file_config.get('subscription_id', config['subscription_id'])
# Update Log Analytics config
if 'log_analytics' in file_config:
config['log_analytics']['enabled'] = file_config['log_analytics'].get('enabled', config['log_analytics']['enabled'])
config['log_analytics']['endpoint'] = file_config['log_analytics'].get('endpoint', config['log_analytics']['endpoint'])
config['log_analytics']['rule_id'] = file_config['log_analytics'].get('rule_id', config['log_analytics']['rule_id'])
config['log_analytics']['stream_name'] = file_config['log_analytics'].get('stream_name', config['log_analytics']['stream_name'])
except Exception as e:
logger.error(f"Error loading configuration from {args.config}: {str(e)}")
logger.info("Using default configuration")
# Command line arguments override config file
if args.region:
config['region'] = args.region
if args.sku:
config['target_sku'] = args.sku
if args.log_analytics:
config['log_analytics']['enabled'] = True
if args.endpoint:
config['log_analytics']['endpoint'] = args.endpoint
if args.rule_id:
config['log_analytics']['rule_id'] = args.rule_id
if args.stream_name:
config['log_analytics']['stream_name'] = args.stream_name
if args.subscription_id:
config['subscription_id'] = args.subscription_id
# Auto-detect subscription ID if not provided
if not config.get('subscription_id'):
config['subscription_id'] = get_subscription_id(config)
return config
def get_subscription_id(config):
"""Automatically detect the subscription ID using multiple methods."""
subscription_id = None
# Method 1: Try to extract from rule_id in config
if config.get('log_analytics', {}).get('rule_id'):
rule_id = config['log_analytics']['rule_id']
match = re.search(r'/subscriptions/([^/]+)/', rule_id)
if match:
subscription_id = match.group(1)
logger.info(f"Extracted subscription ID from rule_id: {subscription_id}")
return subscription_id
# Method 2: Try to get from Azure CLI
try:
result = subprocess.run(
"az account show --query id -o tsv",
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
subscription_id = result.stdout.strip()
if subscription_id:
logger.info(f"Retrieved subscription ID from Azure CLI: {subscription_id}")
return subscription_id
except Exception as e:
logger.debug(f"Could not get subscription ID from Azure CLI: {str(e)}")
# Method 3: Try to get from DefaultAzureCredential
try:
credential = DefaultAzureCredential()
subscription_client = SubscriptionClient(credential)
subscriptions = list(subscription_client.subscriptions.list())
if subscriptions:
subscription_id = subscriptions[0].subscription_id
logger.info(f"Retrieved subscription ID from Azure SDK: {subscription_id}")
return subscription_id
except Exception as e:
logger.debug(f"Could not get subscription ID from Azure SDK: {str(e)}")
if not subscription_id:
logger.warning("Could not automatically detect subscription ID. Please provide it manually.")
return subscription_id
def check_sku_availability(compute_client, region, target_sku, check_zones=True):
"""Check if a specific VM SKU is available in the given region."""
# Get all SKUs
skus = list(compute_client.resource_skus.list())
# Find the target SKU in the specified region
target_sku_info = None
for sku in skus:
if sku.name.lower() == target_sku.lower() and any(loc.lower() == region.lower() for loc in sku.locations):
target_sku_info = sku
break
if not target_sku_info:
logger.warning(f"SKU {target_sku} not found in region {region}")
return False, "NotFound", [], {}, []
# Check availability
is_available = True
restriction_reason = None
restrictions = []
for restriction in target_sku_info.restrictions:
if any(value.lower() == region.lower() for value in restriction.restriction_info.locations):
is_available = False
restriction_reason = restriction.reason_code
restrictions.append({
'type': restriction.type,
'reason': restriction.reason_code,
'values': restriction.restriction_info.locations
})
# Get zone availability
zones = []
if check_zones and hasattr(target_sku_info, 'location_info'):
for location_info in target_sku_info.location_info:
if location_info.location.lower() == region.lower() and hasattr(location_info, 'zones'):
zones = location_info.zones
# Get SKU specifications
specifications = {}
if hasattr(target_sku_info, 'capabilities'):
for capability in target_sku_info.capabilities:
specifications[capability.name] = capability.value
# Find alternative SKUs
alternative_skus = []
if not is_available:
for sku in skus:
# Skip if not a VM SKU or same as target
if sku.resource_type != 'virtualMachines' or sku.name == target_sku:
continue
# Check if available in the region
if not any(loc.lower() == region.lower() for loc in sku.locations):
continue
# Check if restricted in the region
is_restricted = False
for restriction in sku.restrictions:
if any(value.lower() == region.lower() for value in restriction.restriction_info.locations):
is_restricted = True
break
if is_restricted:
continue
# Get specifications
alt_specs = {}
if hasattr(sku, 'capabilities'):
for capability in sku.capabilities:
alt_specs[capability.name] = capability.value
# Calculate similarity score
similarity = calculate_similarity(specifications, alt_specs)
if similarity >= 80: # Only include if at least 80% similar
alternative_skus.append({
'name': sku.name,
'vcpus': alt_specs.get('vCPUs', 'Unknown'),
'memory': alt_specs.get('MemoryGB', 'Unknown'),
'family': sku.family,
'similarity': similarity
})
# Sort by similarity (highest first)
alternative_skus.sort(key=lambda x: x['similarity'], reverse=True)
# Limit to top 5
alternative_skus = alternative_skus[:5]
logger.info(f"Availability check result: {is_available}, Reason: {restriction_reason}")
return is_available, restriction_reason, zones, specifications, alternative_skus
def calculate_similarity(specs1, specs2):
"""Calculate similarity percentage between two SKU specifications."""
# Key specifications to compare
key_specs = ['vCPUs', 'MemoryGB', 'MaxDataDiskCount', 'PremiumIO', 'AcceleratedNetworkingEnabled']
# Count matches
matches = 0
total = 0
for key in key_specs:
if key in specs1 and key in specs2:
total += 1
if specs1[key] == specs2[key]:
matches += 1
# Calculate percentage
if total == 0:
return 0
return int((matches / total) * 100)
def display_results_rich(region, target_sku, is_available, restriction_reason, zones, specifications, alternative_skus, subscription_name, subscription_id):
"""Display results using rich formatting."""
console = Console()
# Create header
console.print(f"[bold white on blue]{'AZURE VM SKU CAPACITY MONITOR - ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'):^150}[/]")
# Create summary table
console.print()
console.print(f" [bold]Status[/] {'AVAILABLE' if is_available else 'NOT AVAILABLE'}")
console.print(f" [bold]SKU[/] {target_sku}")
console.print(f" [bold]Region[/] {region}")
console.print(f" [bold]Subscription[/] {subscription_name} ({subscription_id})")
if not is_available:
console.print(f" [bold]Details[/] SKU {target_sku} is not available in region {region}")
# Display zones
console.print()
console.print("[bold]Available[/]")
console.print(" [bold]Zones[/]")
if zones:
zone_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
zone_table.add_column("Zone")
for zone in zones:
zone_table.add_row(zone)
console.print(zone_table)
else:
console.print(" None")
# Display restrictions
if not is_available:
console.print()
console.print("[bold]Restrictions[/]".center(50))
restrictions_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
restrictions_table.add_column("Type", style="dim")
restrictions_table.add_column("Reason", style="dim")
restrictions_table.add_column("Affected Values", style="dim")
restrictions_table.add_row("Zone", restriction_reason, region)
console.print(restrictions_table)
# Display specifications
console.print()
console.print("[bold]VM SKU Specifications[/]".center(50))
specs_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
specs_table.add_column("Property", style="dim")
specs_table.add_column("Value", style="dim")
for key, value in specifications.items():
specs_table.add_row(key, str(value))
console.print(specs_table)
# Display alternative SKUs
if alternative_skus:
console.print()
console.print("[bold]Alternative SKUs[/]".center(50))
alt_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
alt_table.add_column("SKU Name", style="dim")
alt_table.add_column("vCPUs", style="dim")
alt_table.add_column("Memory (GB)", style="dim")
alt_table.add_column("Family", style="dim")
alt_table.add_column("Similarity", style="dim")
for sku in alternative_skus:
alt_table.add_row(
sku['name'],
str(sku['vcpus']),
str(sku['memory']),
sku['family'],
f"{sku['similarity']}%"
)
console.print(alt_table)
def display_results_text(region, target_sku, is_available, restriction_reason, zones, specifications, alternative_skus, subscription_name, subscription_id):
"""Display results using plain text formatting."""
print("\n" + "=" * 80)
print(f"AZURE VM SKU CAPACITY MONITOR - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
print(f"\nStatus: {'AVAILABLE' if is_available else 'NOT AVAILABLE'}")
print(f"SKU: {target_sku}")
print(f"Region: {region}")
print(f"Subscription: {subscription_name} ({subscription_id})")
if not is_available:
print(f"Details: SKU {target_sku} is not available in region {region}")
print("\nAvailable Zones:")
if zones:
for zone in zones:
print(f" - {zone}")
else:
print(" None")
if not is_available:
print("\nRestrictions:")
print(f" Type: Zone")
print(f" Reason: {restriction_reason}")
print(f" Affected Values: {region}")
print("\nVM SKU Specifications:")
for key, value in specifications.items():
print(f" {key}: {value}")
if alternative_skus:
print("\nAlternative SKUs:")
for sku in alternative_skus:
print(f" - {sku['name']} (vCPUs: {sku['vcpus']}, Memory: {sku['memory']} GB, Family: {sku['family']}, Similarity: {sku['similarity']}%)")
def log_to_azure_monitor(data, log_analytics_config):
"""Log data to Azure Monitor."""
try:
# Import Azure Monitor Ingestion client
from azure.monitor.ingestion import LogsIngestionClient
# Initialize the logs ingestion client
credential = DefaultAzureCredential()
logs_client = LogsIngestionClient(endpoint=log_analytics_config['endpoint'], credential=credential)
# Prepare the log entry
log_entry = {
"TimeGenerated": datetime.datetime.utcnow().isoformat(),
"sku_name": data['sku'],
"region": data['region'],
"subscription_name": data['subscription_name'],
"subscription_id": data['subscription_id'],
"is_available": data['is_available'],
"restriction_reason": data['restriction_reason'] or "",
"zones": ",".join(data['zones']),
"vcpus": data['specifications'].get('vCPUs', ""),
"memory_gb": data['specifications'].get('MemoryGB', ""),
"alternative_skus": ",".join([sku['name'] for sku in data['alternative_skus']])
}
# Upload the log entry
logs_client.upload(
rule_id=log_analytics_config['rule_id'],
stream_name=log_analytics_config['stream_name'],
logs=[log_entry]
)
logger.info("Successfully logged to Azure Monitor")
return True
except ImportError:
logger.error("Azure Monitor Ingestion client not installed. Install with: pip install azure-monitor-ingestion")
return False
except HttpResponseError as e:
logger.error(f"Error logging to Azure Monitor: {str(e)}")
return False
except Exception as e:
logger.error(f"Error logging to Azure Monitor: {str(e)}")
return False
def main():
"""Main function."""
# Parse arguments
args = parse_arguments()
# Set debug logging if requested
if args.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('azure').setLevel(logging.DEBUG)
# Load configuration
config = load_configuration(args)
# Log start
logger.info(f"Starting VM SKU capacity monitoring for {config['target_sku']} in {config['region']}")
try:
# Initialize Azure clients
credential = DefaultAzureCredential()
compute_client = ComputeManagementClient(credential, subscription_id=config['subscription_id'])
subscription_client = SubscriptionClient(credential)
# Get subscription details
subscriptions = list(subscription_client.subscriptions.list())
subscription_name = subscriptions[0].display_name if subscriptions else "Unknown"
subscription_id = subscriptions[0].subscription_id if subscriptions else config['subscription_id']
# Check SKU availability
is_available, restriction_reason, zones, specifications, alternative_skus = check_sku_availability(
compute_client,
config['region'],
config['target_sku'],
config['check_zones']
)
# Display results
if not is_available:
logger.warning(f"SKU {config['target_sku']} is not available in region {config['region']}")
# Prepare result data
result_data = {
'sku': config['target_sku'],
'region': config['region'],
'subscription_name': subscription_name,
'subscription_id': subscription_id,
'is_available': is_available,
'restriction_reason': restriction_reason,
'zones': zones,
'specifications': specifications,
'alternative_skus': alternative_skus
}
# Display results
if RICH_AVAILABLE:
display_results_rich(
config['region'],
config['target_sku'],
is_available,
restriction_reason,
zones,
specifications,
alternative_skus,
subscription_name,
subscription_id
)
else:
display_results_text(
config['region'],
config['target_sku'],
is_available,
restriction_reason,
zones,
specifications,
alternative_skus,
subscription_name,
subscription_id
)
# Log to Azure Monitor if enabled
if config['log_analytics']['enabled']:
if not config['log_analytics']['endpoint'] or not config['log_analytics']['rule_id']:
logger.error("Log Analytics endpoint and rule ID are required for Azure Monitor logging")
else:
try:
log_to_azure_monitor(result_data, config['log_analytics'])
except Exception as e:
logger.error(f"Failed to log to Azure Monitor: {str(e)}")
except Exception as e:
logger.error(f"Error monitoring VM SKU capacity: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
logger.info("VM SKU capacity monitoring completed")
if __name__ == "__main__":
main()
2. Log Analytics Setup Script
Now for the script that sets up all the Log Analytics stuff. This part is optional, but really helpful if you want to track capacity trends over time (setup_log_analytics.py):
#!/usr/bin/env python
"""
Azure VM SKU Capacity Monitor - Log Analytics Setup
This script automates the creation of:
• Resource Group
• Log Analytics Workspace (and waits for it to become active)
• Data Collection Endpoint
• Data Collection Rule
• Custom table in the workspace
It then emits a `config.json` for `monitor_vm_sku_capacity_terminal.py`.
"""
import argparse
import json
import logging
import os
import re
import subprocess
import sys
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("log_analytics_setup")
def run_command(cmd: str) -> str:
"""Run a shell command, returning stdout or raising on failure."""
try:
result = subprocess.run(
cmd,
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logger.error(f"Command failed: {cmd}")
logger.error(e.stderr.strip())
raise
def parse_arguments():
p = argparse.ArgumentParser(
description="Setup Log Analytics for VM SKU Capacity Monitor"
)
p.add_argument(
"--resource-group",
"-g",
default="vm-sku-monitor-rg",
help="Resource group name",
)
p.add_argument(
"--location", "-l", default="eastus2", help="Azure region (e.g. eastus2)"
)
p.add_argument(
"--workspace",
"-w",
default="vmskumonitor-workspace",
help="Log Analytics workspace name",
)
p.add_argument(
"--dce",
default="vmskumonitor-dce",
help="Data Collection Endpoint name",
)
p.add_argument(
"--dcr",
default="vmskumonitor-dcr",
help="Data Collection Rule name",
)
p.add_argument(
"--table",
"-t",
default="VMSKUCapacity",
help="Base name for custom table (suffix _CL added)",
)
p.add_argument(
"--config",
"-c",
default="config.json",
help="Output configuration file path",
)
return p.parse_args()
def ensure_rg(rg: str, loc: str):
logger.info(f"Ensuring resource group {rg} exists in {loc}")
try:
run_command(f"az group show -n {rg}")
logger.info(f"Resource group {rg} already exists.")
except:
run_command(f"az group create -n {rg} -l {loc}")
logger.info(f"Resource group {rg} created.")
def ensure_workspace(rg: str, ws: str, loc: str):
logger.info(f"Ensuring Log Analytics workspace {ws}")
try:
run_command(
f"az monitor log-analytics workspace show "
f"-g {rg} -n {ws}"
)
logger.info(f"Workspace {ws} already exists.")
except:
run_command(
f"az monitor log-analytics workspace create "
f"-g {rg} -n {ws} -l {loc}"
)
logger.info(f"Workspace {ws} created.")
wait_for_workspace(rg, ws)
def wait_for_workspace(rg: str, ws: str, timeout: int = 300, interval: int = 10):
logger.info(f"Waiting up to {timeout}s for workspace {ws} to become active…")
elapsed = 0
while elapsed < timeout:
state = run_command(
f"az monitor log-analytics workspace show "
f"-g {rg} -n {ws} --query provisioningState -o tsv"
).strip().lower()
if state == "succeeded":
logger.info("Workspace is active.")
return
logger.info(f"Current state: {state!r}; retrying in {interval}s…")
time.sleep(interval)
elapsed += interval
logger.warning(f"Workspace did not become active within {timeout}s; continuing.")
def ensure_dce(rg: str, dce: str, loc: str) -> str:
logger.info(f"Ensuring Data Collection Endpoint {dce}")
try:
run_command(f"az monitor data-collection endpoint show -g {rg} -n {dce}")
logger.info(f"DCE {dce} already exists.")
except:
run_command(
f"az monitor data-collection endpoint create "
f"-g {rg} -n {dce} -l {loc} --public-network-access Enabled"
)
logger.info(f"DCE {dce} created.")
out = run_command(f"az monitor data-collection endpoint show -g {rg} -n {dce} -o json")
return json.loads(out)["logsIngestion"]["endpoint"]
def deploy_custom_table(rg: str, ws: str, table: str):
# Wait once more in case ingestion APIs lag behind provisioningState
logger.info("Re-checking workspace readiness before custom table deployment…")
wait_for_workspace(rg, ws, timeout=180, interval=15)
arm = {
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"workspaceName": {"type": "string"},
"tableName": {"type": "string"},
},
"resources": [
{
"type": "Microsoft.OperationalInsights/workspaces/tables",
"apiVersion": "2021-12-01-preview",
"name": "[concat(parameters('workspaceName'), '/', parameters('tableName'), '_CL')]",
"properties": {
"schema": {
"name": "[concat(parameters('tableName'), '_CL')]",
"columns": [
{"name": "TimeGenerated", "type": "datetime"},
{"name": "sku_name", "type": "string"},
{"name": "region", "type": "string"},
{"name": "subscription_name", "type": "string"},
{"name": "subscription_id", "type": "string"},
{"name": "is_available", "type": "boolean"},
{"name": "restriction_reason", "type": "string"},
{"name": "zones", "type": "string"},
{"name": "vcpus", "type": "string"},
{"name": "memory_gb", "type": "string"},
{"name": "alternative_skus", "type": "string"},
],
}
},
}
],
}
fn = f"custom-table-{int(time.time())}.json"
with open(fn, "w") as f:
json.dump(arm, f, indent=2)
logger.info(f"Deploying custom table {table}_CL via ARM template")
run_command(
f"az deployment group create -g {rg} "
f"--template-file {fn} "
f"--parameters workspaceName={ws} tableName={table}"
)
os.remove(fn)
logger.info("Custom table created.")
def deploy_dcr(rg: str, dcr: str, loc: str, dce_uri: str, ws: str, table: str) -> str:
"""
Create or verify a Data Collection Rule that sends Custom-<table>_CL
to the workspace. Returns the ImmutableId.
"""
logger.info(f"Ensuring Data Collection Rule {dcr}")
try:
run_command(f"az monitor data-collection rule show -g {rg} -n {dcr}")
logger.info(f"DCR {dcr} already exists.")
except:
# gather resource IDs
ws_id = run_command(f"az monitor log-analytics workspace show -g {rg} -n {ws} -o json")
ws_id = json.loads(ws_id)["id"]
dce_id = run_command(f"az monitor data-collection endpoint show -g {rg} -n {dcr.replace('-dcr','-dce')} -o json")
dce_id = json.loads(dce_id)["id"]
# build ARM
arm = {
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"dcrName": {"type": "string"},
"location": {"type": "string"},
"dceId": {"type": "string"},
"workspaceId": {"type": "string"},
"streamName": {"type": "string"},
},
"resources": [
{
"type": "Microsoft.Insights/dataCollectionRules",
"apiVersion": "2021-09-01-preview",
"name": "[parameters('dcrName')]",
"location": "[parameters('location')]",
"properties": {
"dataCollectionEndpointId": "[parameters('dceId')]",
"streamDeclarations": {
"[parameters('streamName')]": {
"columns": [
{"name": "TimeGenerated", "type": "datetime"},
{"name": "sku_name", "type": "string"},
{"name": "region", "type": "string"},
{"name": "subscription_name", "type": "string"},
{"name": "subscription_id", "type": "string"},
{"name": "is_available", "type": "boolean"},
{"name": "restriction_reason", "type": "string"},
{"name": "zones", "type": "string"},
{"name": "vcpus", "type": "string"},
{"name": "memory_gb", "type": "string"},
{"name": "alternative_skus", "type": "string"},
]
}
},
"destinations": {
"logAnalytics": [
{
"workspaceResourceId": "[parameters('workspaceId')]",
"name": "la-destination",
}
]
},
"dataFlows": [
{
"streams": ["[parameters('streamName')]"],
"destinations": ["la-destination"],
}
],
},
}
],
}
fn = f"dcr-{int(time.time())}.json"
with open(fn, "w") as f:
json.dump(arm, f, indent=2)
run_command(
f"az deployment group create -g {rg} "
f"--template-file {fn} "
f"--parameters "
f"dcrName={dcr} location={loc} "
f"dceId={dce_id} workspaceId={ws_id} "
f"streamName=Custom-{table}_CL"
)
os.remove(fn)
logger.info(f"DCR {dcr} created.")
# return immutableId
out = run_command(f"az monitor data-collection rule show -g {rg} -n {dcr} -o json")
return json.loads(out)["immutableId"]
def write_config(path: str, dce_uri: str, dcr_id: str, table: str, loc: str):
cfg = {
"region": loc,
"target_sku": "Standard_D16ds_v5",
"check_zones": True,
"log_analytics": {
"enabled": True,
"endpoint": dce_uri,
"rule_id": dcr_id,
"stream_name": f"Custom-{table}_CL",
},
"check_interval_minutes": 15,
}
with open(path, "w") as f:
json.dump(cfg, f, indent=2)
logger.info(f"Wrote configuration to {path}")
def main():
args = parse_arguments()
logger.info("Starting Log Analytics setup…")
ensure_rg(args.resource_group, args.location)
ensure_workspace(args.resource_group, args.workspace, args.location)
# Create the Data Collection Endpoint
dce_uri = ensure_dce(args.resource_group, args.dce, args.location)
# IMPORTANT: Create the custom table BEFORE the DCR
# This fixes the "InvalidOutputTable" error
deploy_custom_table(args.resource_group, args.workspace, args.table)
# Now create the Data Collection Rule that references the custom table
dcr_id = deploy_dcr(
args.resource_group, args.dcr, args.location, dce_uri, args.workspace, args.table
)
write_config(args.config, dce_uri, dcr_id, args.table, args.location)
logger.info("Log Analytics setup completed successfully!")
if __name__ == "__main__":
main()
Setting default region and VM SKU
You've got a few options to set your preferred region and VM SKU:
1. Edit script defaults: Open monitor_vm_sku_capacity.py and look for:
parser.add_argument('--region', type=str, default='eastus2', # Change this!
help='Azure region to check (default: eastus2)')
parser.add_argument('--sku', type=str, default='Standard_D16ds_v5', # And this!
help='VM SKU to check (default: Standard_D16ds_v5)')
2. Specify on command line:
python monitor_vm_sku_capacity.py --region westus2 --sku Standard_D8ds_v5
3. Edit config file: After running the setup script, it creates a config.json with these values:
{
"region": "eastus2",
"target_sku": "Standard_D16ds_v5",
"check_zones": true,
...
}
Finding Available Regions and SKUs
If you're wondering which regions and SKUs to monitor, here's how to get that info:
Using Azure CLI
# List all regions
az account list-locations --query "[].name" -o tsv
# List all VM SKUs in a region
az vm list-skus --location eastus2 --resource-type virtualMachines --query "[].name" -o tsv
# Get detailed info about a specific SKU
az vm list-skus --location eastus2 --size Standard_D16ds_v5 -o table
Using Azure Portal
Just go to the VM creation page in the portal and click "See all sizes" - you'll get a nice visual list of all available options. I sometimes just take a screenshot of this for reference.
Using this tool
So here's how you use this thing. I tried to make it as simple as possible:
1. Set up Log Analytics first (optional but recommended):
python setup_log_analytics.py
This builds all the Log Analytics stuff and spits out a config file you can use in the next step. The default options should work fine for most people, but you can customize if needed.
2. Run the monitoring script:
python monitor_vm_sku_capacity.py --config config.json
If you don't want to mess with Log Analytics, you can just run it directly:
python monitor_vm_sku_capacity.py --region eastus2 --sku Standard_D16ds_v5
The output will look something like this (way prettier if you have the rich package installed):
================================================================================
AZURE VM SKU CAPACITY MONITOR - 2024-05-20 14:32:45
================================================================================
Status: AVAILABLE
SKU: Standard_D16ds_v5
Region: eastus2
Subscription: My Azure Subscription (12345678-1234-1234-1234-123456789012)
Available Zones:
- 1
- 2
- 3
VM SKU Specifications:
vCPUs: 16
MemoryGB: 64
MaxDataDiskCount: 32
PremiumIO: True
AcceleratedNetworkingEnabled: True
Or if the VM is unavailable:
================================================================================
AZURE VM SKU CAPACITY MONITOR - 2024-05-20 14:32:45
================================================================================
Status: NOT AVAILABLE
SKU: Standard_D16ds_v5
Region: eastus2
Subscription: My Azure Subscription (12345678-1234-1234-1234-123456789012)
Details: SKU Standard_D16ds_v5 is not available in region eastus2
Available Zones:
None
Restrictions:
Type: Zone
Reason: NotAvailableForSubscription
Affected Values: eastus2
VM SKU Specifications:
vCPUs: 16
MemoryGB: 64
MaxDataDiskCount: 32
PremiumIO: True
AcceleratedNetworkingEnabled: True
Alternative SKUs:
- Standard_D16as_v5 (vCPUs: 16, Memory: 64 GB, Family: standardDasv5Family, Similarity: 100%)
- Standard_D16s_v5 (vCPUs: 16, Memory: 64 GB, Family: standardDsv5Family, Similarity: 100%)
- Standard_D16s_v4 (vCPUs: 16, Memory: 64 GB, Family: standardDsv4Family, Similarity: 100%)
- Standard_F16s_v2 (vCPUs: 16, Memory: 32 GB, Family: standardFSv2Family, Similarity: 80%)
- Standard_E16s_v5 (vCPUs: 16, Memory: 128 GB, Family: standardEsv5Family, Similarity: 80%)
Setting up scheduled checks
I don't like missing things, so I set mine up to run every hour using cron:
# Open crontab editor
crontab -e
# Add this line to run it every hour
0 * * * * cd /path/to/scripts && source venv/bin/activate && python monitor_vm_sku_capacity.py --config config.json >> vm_sku_monitor.log 2>&1
Checking your data in Log Analytics
If you set up Log Analytics, you can run all sorts of cool queries:
// Basic query - see everything
VMSKUCapacity_CL
| order by TimeGenerated desc
// Find when capacity changed
VMSKUCapacity_CL
| where sku_name == "Standard_D16ds_v5" and region == "eastus2"
| project TimeGenerated, is_available
| order by TimeGenerated desc
// Simple dashboard
VMSKUCapacity_CL
| summarize LastStatus=arg_max(TimeGenerated, is_available),
LastChecked=max(TimeGenerated)
by sku_name, region
| extend Status = iff(LastStatus == true, "Available", "Not Available")
| project sku_name, region, Status, LastChecked
You can set up alerts too. That way Azure tells YOU when capacity changes, instead of you finding out during a failed deployment!
Troubleshooting
Some common problems I've run into:
- "Could not automatically detect subscription ID":
- Make sure you're logged in with az login
- Or just provide it explicitly with --subscription-id
- Log Analytics permission errors:
- Make sure you ran the permission commands from the prerequisites section
- Azure's permissions can be weirdly slow - wait 10-15 minutes and try again
- Python environment issues:
- Always use a virtual environment! I learned this one the hard way when I messed up my system Python
- Make sure all the packages are installed with pip install azure-identity azure-mgmt-compute azure-mgmt-subscription azure-monitor-ingestion rich
Next Steps
- Create a dashboard to visualize VM SKU availability over time
- Set up alerts to notify you when specific SKUs become available
- Integrate with your CI/CD pipeline to automatically select available SKUs
- For a serverless, fully managed option, create an Azure Function version of the monitoring script
Advanced: Bulk-Deploy Feasibility Check
Want to know up front “can I spin up N VMs of SKU X in region Y?”
We combine:
- Hardware-level: Resource SKUs API (is the SKU unrestricted?)
- Subscription-level: Usage API (enough free vCPU cores for N instances?)
Prerequisites already covered above:
az login
USER_PRINCIPAL=$(az ad signed-in-user show --query userPrincipalName -o tsv)
az group create --name vm-sku-monitor-rg --location eastus2
az role assignment create \
--assignee "$USER_PRINCIPAL" \
--role "Monitoring Metrics Publisher" \
--scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"
python3 -m venv venv && source venv/bin/activate
pip install azure-identity azure-mgmt-compute azure-mgmt-subscription rich
File: monitor_vm_sku_capacity_bulk.py
#!/usr/bin/env python
"""
Azure VM SKU Capacity & Quota Monitor (with Zone support)
Checks:
1) Whether your target SKU is available in a region or zone
2) Whether your subscription has enough free vCPU quota to deploy N VMs
Optionally logs results into Azure Log Analytics.
"""
import argparse
import datetime
import json
import logging
import subprocess
from typing import List, Tuple, Dict, Any
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.subscription import SubscriptionClient
# Rich for prettier tables
try:
from rich.console import Console
from rich.table import Table
from rich import box
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("vm_sku_capacity_monitor")
def parse_arguments():
p = argparse.ArgumentParser(
description="Azure VM SKU Capacity & Quota Monitor (with zone support)"
)
p.add_argument("--region", type=str, default="eastus2",
help="Azure region to check")
p.add_argument("--sku", type=str, default="Standard_D16ds_v5",
help="VM SKU to check")
p.add_argument("--zone", type=str, default=None,
help="(Optional) Availability zone to check (e.g. '1')")
p.add_argument("--count", type=int, default=1,
help="Number of VMs you plan to deploy")
p.add_argument("--log-analytics", action="store_true",
help="Enable logging to Azure Log Analytics")
p.add_argument("--endpoint", type=str,
help="Data Collection Endpoint URI")
p.add_argument("--rule-id", type=str,
help="Data Collection Rule ID")
p.add_argument("--stream-name", type=str, default="Custom-VMSKUCapacity_CL",
help="Log Analytics stream name")
p.add_argument("--debug", action="store_true",
help="Enable debug logging")
p.add_argument("--config", type=str,
help="Path to JSON config file")
p.add_argument("--subscription-id", type=str,
help="Azure Subscription ID")
return p.parse_args()
def load_configuration(args) -> Dict[str, Any]:
cfg = {
"region": args.region,
"zone": args.zone,
"target_sku": args.sku,
"desired_count": args.count,
"subscription_id": args.subscription_id,
"log_analytics": {
"enabled": args.log_analytics,
"endpoint": args.endpoint,
"rule_id": args.rule_id,
"stream_name": args.stream_name
}
}
if args.config:
try:
with open(args.config) as f:
j = json.load(f)
# merge known keys
for k in ("region","zone","target_sku","desired_count","subscription_id"):
if k in j: cfg[k] = j[k]
cfg["log_analytics"].update(j.get("log_analytics", {}))
logger.info(f"Loaded configuration from {args.config}")
except Exception as e:
logger.error(f"Failed loading config {args.config}: {e}")
# CLI args override file
if args.region: cfg["region"] = args.region
if args.zone: cfg["zone"] = args.zone
if args.sku: cfg["target_sku"] = args.sku
if args.count: cfg["desired_count"] = args.count
if args.subscription_id:
cfg["subscription_id"] = args.subscription_id
return cfg
def get_subscription_id(explicit: str) -> str:
if explicit:
return explicit
# Try Azure CLI
try:
out = subprocess.run(
"az account show --query id -o tsv",
shell=True, check=True,
stdout=subprocess.PIPE, text=True
).stdout.strip()
if out:
return out
except:
pass
# Fallback: Azure SDK
cred = DefaultAzureCredential()
subs = list(SubscriptionClient(cred).subscriptions.list())
return subs[0].subscription_id if subs else None
def check_sku_availability(
compute: ComputeManagementClient,
region: str, sku: str, zone: str = None
) -> Tuple[bool, str, List[str], Dict[str, Any]]:
"""
Returns:
is_available (bool),
reason (str or None),
supported_zones (list of str),
capabilities (dict of name→value)
"""
skus = list(compute.resource_skus.list())
entry = next(
(s for s in skus
if s.name.lower() == sku.lower()
and region.lower() in [loc.lower() for loc in s.locations]),
None
)
if not entry:
return False, "NotFound", [], {}
# Find all zones where this SKU is sold in that region
supported_zones = []
for loc_info in entry.location_info or []:
if loc_info.location.lower() == region.lower():
supported_zones = loc_info.zones or []
break
# Determine restrictions
if zone:
# 1) If SKU doesn’t support the requested zone
if zone not in supported_zones:
return False, "UnsupportedZone", supported_zones, {}
# 2) Check zone-level restrictionInfo.zones
restricted = [
r for r in entry.restrictions
if r.restriction_info.zones and zone in r.restriction_info.zones
]
else:
# Region-level check
restricted = [
r for r in entry.restrictions
if region.lower() in [l.lower() for l in r.restriction_info.locations]
]
is_avail = len(restricted) == 0
reason = restricted[0].reason_code if restricted else None
# Pull out SKU capabilities (vCPUs, MemoryGB, etc.)
caps = {c.name: c.value for c in entry.capabilities or []}
return is_avail, reason, supported_zones, caps
def check_quota(
compute: ComputeManagementClient,
region: str, vcpus_needed: int, count: int
) -> Tuple[int,int,bool]:
usage = list(compute.usage.list(location=region))
core = next((u for u in usage if u.name.value.lower()=="cores"), None)
free = (core.limit - core.current_value) if core else 0
required = vcpus_needed * count
return free, required, free >= required
def display(rdata: Dict[str, Any]):
if RICH_AVAILABLE:
c = Console()
c.print(f"\n[bold underline]SKU Capacity & Quota (Zone) Check "
f"({datetime.datetime.now():%Y-%m-%d %H:%M:%S})[/]\n")
# Availability table
t1 = Table(box=box.SIMPLE)
t1.add_column("SKU"); t1.add_column("Region"); t1.add_column("Zone")
t1.add_column("Available"); t1.add_column("Reason")
t1.add_row(
rdata["target_sku"], rdata["region"],
rdata["zone"] or "-",
"✅" if rdata["is_available"] else "❌",
rdata["reason"] or "-"
)
c.print(t1)
# Supported zones
t0 = Table(box=box.SIMPLE)
t0.add_column("Supported Zones")
t0.add_row(", ".join(rdata["supported_zones"]) or "None")
c.print(t0)
# Quota table
t2 = Table(box=box.SIMPLE)
t2.add_column("Desired VMs", justify="right")
t2.add_column("vCPUs/VM", justify="right")
t2.add_column("Free Cores", justify="right")
t2.add_column("Needs Cores",justify="right")
t2.add_column("Quota OK?", justify="center")
t2.add_row(
str(rdata["desired_count"]),
str(rdata["vcpus"]),
str(rdata["free_cores"]),
str(rdata["required_cores"]),
"✅" if rdata["quota_ok"] else "❌"
)
c.print(t2)
else:
print(f"\nSKU {rdata['target_sku']} in {rdata['region']} "
f"zone {rdata['zone'] or '-'}: "
f"Available={rdata['is_available']} (Reason={rdata['reason']})")
print("Supported zones:", ", ".join(rdata["supported_zones"]) or "None")
print(f"Quota: need {rdata['required_cores']} cores, "
f"have {rdata['free_cores']} → OK={rdata['quota_ok']}")
def main():
args = parse_arguments()
if args.debug:
logger.setLevel(logging.DEBUG)
cfg = load_configuration(args)
cfg["subscription_id"] = get_subscription_id(cfg.get("subscription_id"))
logger.info(f"Checking SKU {cfg['target_sku']} x{cfg['desired_count']} "
f"in {cfg['region']} zone {cfg['zone']}")
cred = DefaultAzureCredential()
compute = ComputeManagementClient(cred, cfg["subscription_id"])
# 1) SKU + zone availability
is_avail, reason, zones, caps = check_sku_availability(
compute, cfg["region"], cfg["target_sku"], cfg["zone"]
)
vcpus = int(caps.get("vCPUs", 0))
# 2) Subscription quota check
free, required, ok = check_quota(
compute, cfg["region"], vcpus, cfg["desired_count"]
)
result = {
"target_sku": cfg["target_sku"],
"region": cfg["region"],
"zone": cfg["zone"],
"supported_zones": zones,
"desired_count": cfg["desired_count"],
"is_available": is_avail,
"reason": reason,
"vcpus": vcpus,
"free_cores": free,
"required_cores": required,
"quota_ok": ok
}
display(result)
# (Optional) send to Log Analytics…
# [omitted for brevity]
if __name__ == "__main__":
main()
Run the bulk-deploy checker (region-level check)
python monitor_vm_sku_capacity_bulk.py \
--region centralus \
--sku Standard_B2s_v2 \
--count 10
(Optionally add the parameter --log-analytics --endpoint <DCE-URI> --rule-id <DCR-ID> to send it to Log Analytics)
Example output
SKU Capacity & Quota (Zone) Check (2025-06-20 12:49:58)
SKU Region Zone Available Reason
─────────────────────────────────────────────────────────
Standard_B2s_v2 centralus - ✅ -
Supported Zones
─────────────────
1, 3, 2
Desired VMs vCPUs/VM Free Cores Needs Cores Quota OK?
───────────────────────────────────────────────────────────────
10 2 100 20 ✅
Run the bulk-deploy checker (zone-level heck)
python monitor_vm_sku_capacity_bulk.py \
--region centralus \
--zone 2 \
--sku Standard_B2s_v2 \
--count 10
Example output
SKU Capacity & Quota (Zone) Check (2025-06-20 12:42:22)
SKU Region Zone Available Reason
─────────────────────────────────────────────────────────
Standard_B2s_v2 centralus 2 ✅ -
Supported Zones
─────────────────
1, 3, 2
Desired VMs vCPUs/VM Free Cores Needs Cores Quota OK?
───────────────────────────────────────────────────────────────
10 2 100 20 ✅
Final Thoughts
This monitoring solution has proven to be a valuable asset for Azure infrastructure management. Organizations using this tool can present data-driven insights on VM availability patterns during planning sessions, enabling more informed decisions about infrastructure scaling strategies.
This solution effectively reduces unplanned downtime and deployment failures by providing proactive notifications about resource constraints before they impact production systems.
Happy monitoring!