Blog Post

Startups at Microsoft
27 MIN READ

A practical guide to Azure VM SKU capacity monitoring

rmmartins's avatar
rmmartins
Icon for Microsoft rankMicrosoft
May 22, 2025

Look, Azure capacity hiccups can really derail your day. You don’t get any warning—no “Heads up, we’re almost out of your preferred VM SKU”—you just try to create a VM and boom: error.

After one of those “oh-crap” moments hit some of my customers, I built a simple monitor that alerts you before you slam into that capacity wall—so you’re never blindsided again.

Thought I’d share it here—maybe save you from the same headache.

What this thing does

This solution isn't fancy, but it works. Here's what it'll do for you:

  1. Checks if your favorite VM types are actually available in your regions
  2. Shows exactly WHY something's unavailable if there's a problem
  3. Suggests similar VM types you could use instead (lifesaver!)
  4. Logs everything to Azure Log Analytics so you can track trends
  5. Works right from your terminal - no fancy setup needed

How it's put together

It's pretty simple really - just two main Python scripts:

  1. The Monitoring Script: Checks VM availability using Azure's API
  2. Log Analytics Setup: Stores your data for later analysis (optional, but super useful)

Here's a quick diagram:

Before you start

You'll need a few things:

1. Azure CLI installed and working on your machine

# If you haven't logged in yet
az login

2. Azure permissions if you're doing the Log Analytics part:

# Get your username first
USER_PRINCIPAL=$(az ad signed-in-user show --query userPrincipalName -o tsv)
echo "Looks like you're logged in as: $USER_PRINCIPAL"

# Create a resource group - you can change the name if you want
az group create --name vm-sku-monitor-rg --location eastus2

# Give yourself the right permissions
az role assignment create \
  --assignee "$USER_PRINCIPAL" \
  --role "Monitoring Metrics Publisher" \
  --scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"

# Double-check it worked
az role assignment list \
  --assignee "$USER_PRINCIPAL" \
  --role "Monitoring Metrics Publisher" \
  --scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"

Azure can be kinda slow with permissions sometimes. If you get weird 403 errors later, maybe grab a coffee and try again in 10-15 mins.

3. Python environment setup:

# Set up a virtual environment - don't skip this step!
# I learned this the hard way when I borked my system Python...
python3 -m venv venv

# Activate it
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install what we need
pip install azure-identity azure-mgmt-compute azure-mgmt-subscription azure-monitor-ingestion rich

Let's build this thing

1. The VM Capacity Checking Script

The star of the show is the monitoring script itself. This script does all the heavy lifting - checking VM availability, showing you what's happening, and logging the data for later. 

I'll call it monitor_vm_sku_capacity.py:

#!/usr/bin/env python
"""
Azure VM SKU Capacity Monitor

This script checks the availability of specific VM SKUs in Azure regions
and provides information about capacity constraints and alternative options.
"""

import argparse
import datetime
import json
import logging
import os
import re
import subprocess
import sys
from typing import Dict, List, Any, Optional, Tuple

from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.subscription import SubscriptionClient
from azure.core.exceptions import HttpResponseError

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger('vm_sku_capacity_monitor')

try:
    from rich.console import Console
    from rich.table import Table
    from rich import box
    RICH_AVAILABLE = True
except ImportError:
    RICH_AVAILABLE = False

def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description='Azure VM SKU Capacity Monitor')
    parser.add_argument('--region', type=str, default='eastus2',
                        help='Azure region to check (default: eastus2)')
    parser.add_argument('--sku', type=str, default='Standard_D16ds_v5',
                        help='VM SKU to check (default: Standard_D16ds_v5)')
    parser.add_argument('--log-analytics', action='store_true',
                        help='Enable logging to Azure Log Analytics')
    parser.add_argument('--endpoint', type=str,
                        help='Azure Monitor Data Collection Endpoint URI')
    parser.add_argument('--rule-id', type=str,
                        help='Azure Monitor Data Collection Rule ID')
    parser.add_argument('--stream-name', type=str, default='Custom-VMSKUCapacity_CL',
                        help='Azure Monitor Log Analytics stream name')
    parser.add_argument('--debug', action='store_true',
                        help='Enable debug logging')
    parser.add_argument('--config', type=str,
                        help='Path to configuration file')
    parser.add_argument('--subscription-id', type=str,
                        help='Azure Subscription ID')
    
    return parser.parse_args()

def load_configuration(args):
    """Load configuration from file or command line arguments."""
    config = {
        'region': args.region,
        'target_sku': args.sku,
        'check_zones': True,
        'subscription_id': args.subscription_id,
        'log_analytics': {
            'enabled': args.log_analytics,
            'endpoint': args.endpoint,
            'rule_id': args.rule_id,
            'stream_name': args.stream_name
        },
        'check_interval_minutes': 15
    }
    
    if args.config:
        try:
            with open(args.config, 'r') as f:
                file_config = json.load(f)
                logger.info(f"Configuration loaded from {args.config}")
                
                # Update config with file values
                config['region'] = file_config.get('region', config['region'])
                config['target_sku'] = file_config.get('target_sku', config['target_sku'])
                config['check_zones'] = file_config.get('check_zones', config['check_zones'])
                config['check_interval_minutes'] = file_config.get('check_interval_minutes', config['check_interval_minutes'])
                config['subscription_id'] = file_config.get('subscription_id', config['subscription_id'])
                
                # Update Log Analytics config
                if 'log_analytics' in file_config:
                    config['log_analytics']['enabled'] = file_config['log_analytics'].get('enabled', config['log_analytics']['enabled'])
                    config['log_analytics']['endpoint'] = file_config['log_analytics'].get('endpoint', config['log_analytics']['endpoint'])
                    config['log_analytics']['rule_id'] = file_config['log_analytics'].get('rule_id', config['log_analytics']['rule_id'])
                    config['log_analytics']['stream_name'] = file_config['log_analytics'].get('stream_name', config['log_analytics']['stream_name'])
        except Exception as e:
            logger.error(f"Error loading configuration from {args.config}: {str(e)}")
            logger.info("Using default configuration")
    
    # Command line arguments override config file
    if args.region:
        config['region'] = args.region
    if args.sku:
        config['target_sku'] = args.sku
    if args.log_analytics:
        config['log_analytics']['enabled'] = True
    if args.endpoint:
        config['log_analytics']['endpoint'] = args.endpoint
    if args.rule_id:
        config['log_analytics']['rule_id'] = args.rule_id
    if args.stream_name:
        config['log_analytics']['stream_name'] = args.stream_name
    if args.subscription_id:
        config['subscription_id'] = args.subscription_id
    
    # Auto-detect subscription ID if not provided
    if not config.get('subscription_id'):
        config['subscription_id'] = get_subscription_id(config)
        
    return config

def get_subscription_id(config):
    """Automatically detect the subscription ID using multiple methods."""
    subscription_id = None
    
    # Method 1: Try to extract from rule_id in config
    if config.get('log_analytics', {}).get('rule_id'):
        rule_id = config['log_analytics']['rule_id']
        match = re.search(r'/subscriptions/([^/]+)/', rule_id)
        if match:
            subscription_id = match.group(1)
            logger.info(f"Extracted subscription ID from rule_id: {subscription_id}")
            return subscription_id
    
    # Method 2: Try to get from Azure CLI
    try:
        result = subprocess.run(
            "az account show --query id -o tsv",
            shell=True,
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        subscription_id = result.stdout.strip()
        if subscription_id:
            logger.info(f"Retrieved subscription ID from Azure CLI: {subscription_id}")
            return subscription_id
    except Exception as e:
        logger.debug(f"Could not get subscription ID from Azure CLI: {str(e)}")
    
    # Method 3: Try to get from DefaultAzureCredential
    try:
        credential = DefaultAzureCredential()
        subscription_client = SubscriptionClient(credential)
        subscriptions = list(subscription_client.subscriptions.list())
        if subscriptions:
            subscription_id = subscriptions[0].subscription_id
            logger.info(f"Retrieved subscription ID from Azure SDK: {subscription_id}")
            return subscription_id
    except Exception as e:
        logger.debug(f"Could not get subscription ID from Azure SDK: {str(e)}")
    
    if not subscription_id:
        logger.warning("Could not automatically detect subscription ID. Please provide it manually.")
    
    return subscription_id

def check_sku_availability(compute_client, region, target_sku, check_zones=True):
    """Check if a specific VM SKU is available in the given region."""
    # Get all SKUs
    skus = list(compute_client.resource_skus.list())
    
    # Find the target SKU in the specified region
    target_sku_info = None
    for sku in skus:
        if sku.name.lower() == target_sku.lower() and any(loc.lower() == region.lower() for loc in sku.locations):
            target_sku_info = sku
            break
    
    if not target_sku_info:
        logger.warning(f"SKU {target_sku} not found in region {region}")
        return False, "NotFound", [], {}, []
    
    # Check availability
    is_available = True
    restriction_reason = None
    restrictions = []
    
    for restriction in target_sku_info.restrictions:
        if any(value.lower() == region.lower() for value in restriction.restriction_info.locations):
            is_available = False
            restriction_reason = restriction.reason_code
            restrictions.append({
                'type': restriction.type,
                'reason': restriction.reason_code,
                'values': restriction.restriction_info.locations
            })
    
    # Get zone availability
    zones = []
    if check_zones and hasattr(target_sku_info, 'location_info'):
        for location_info in target_sku_info.location_info:
            if location_info.location.lower() == region.lower() and hasattr(location_info, 'zones'):
                zones = location_info.zones
    
    # Get SKU specifications
    specifications = {}
    if hasattr(target_sku_info, 'capabilities'):
        for capability in target_sku_info.capabilities:
            specifications[capability.name] = capability.value
    
    # Find alternative SKUs
    alternative_skus = []
    if not is_available:
        for sku in skus:
            # Skip if not a VM SKU or same as target
            if sku.resource_type != 'virtualMachines' or sku.name == target_sku:
                continue
            
            # Check if available in the region
            if not any(loc.lower() == region.lower() for loc in sku.locations):
                continue
            
            # Check if restricted in the region
            is_restricted = False
            for restriction in sku.restrictions:
                if any(value.lower() == region.lower() for value in restriction.restriction_info.locations):
                    is_restricted = True
                    break
            
            if is_restricted:
                continue
            
            # Get specifications
            alt_specs = {}
            if hasattr(sku, 'capabilities'):
                for capability in sku.capabilities:
                    alt_specs[capability.name] = capability.value
            
            # Calculate similarity score
            similarity = calculate_similarity(specifications, alt_specs)
            
            if similarity >= 80:  # Only include if at least 80% similar
                alternative_skus.append({
                    'name': sku.name,
                    'vcpus': alt_specs.get('vCPUs', 'Unknown'),
                    'memory': alt_specs.get('MemoryGB', 'Unknown'),
                    'family': sku.family,
                    'similarity': similarity
                })
        
        # Sort by similarity (highest first)
        alternative_skus.sort(key=lambda x: x['similarity'], reverse=True)
        
        # Limit to top 5
        alternative_skus = alternative_skus[:5]
    
    logger.info(f"Availability check result: {is_available}, Reason: {restriction_reason}")
    
    return is_available, restriction_reason, zones, specifications, alternative_skus

def calculate_similarity(specs1, specs2):
    """Calculate similarity percentage between two SKU specifications."""
    # Key specifications to compare
    key_specs = ['vCPUs', 'MemoryGB', 'MaxDataDiskCount', 'PremiumIO', 'AcceleratedNetworkingEnabled']
    
    # Count matches
    matches = 0
    total = 0
    
    for key in key_specs:
        if key in specs1 and key in specs2:
            total += 1
            if specs1[key] == specs2[key]:
                matches += 1
    
    # Calculate percentage
    if total == 0:
        return 0
    
    return int((matches / total) * 100)

def display_results_rich(region, target_sku, is_available, restriction_reason, zones, specifications, alternative_skus, subscription_name, subscription_id):
    """Display results using rich formatting."""
    console = Console()
    
    # Create header
    console.print(f"[bold white on blue]{'AZURE VM SKU CAPACITY MONITOR - ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'):^150}[/]")
    
    # Create summary table
    console.print()
    console.print(f"  [bold]Status[/]         {'AVAILABLE' if is_available else 'NOT AVAILABLE'}")
    console.print(f"  [bold]SKU[/]            {target_sku}")
    console.print(f"  [bold]Region[/]         {region}")
    console.print(f"  [bold]Subscription[/]   {subscription_name} ({subscription_id})")
    if not is_available:
        console.print(f"  [bold]Details[/]        SKU {target_sku} is not available in region {region}")
    
    # Display zones
    console.print()
    console.print("[bold]Available[/]")
    console.print("  [bold]Zones[/]")
    
    if zones:
        zone_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
        zone_table.add_column("Zone")
        for zone in zones:
            zone_table.add_row(zone)
        console.print(zone_table)
    else:
        console.print("  None")
    
    # Display restrictions
    if not is_available:
        console.print()
        console.print("[bold]Restrictions[/]".center(50))
        
        restrictions_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
        restrictions_table.add_column("Type", style="dim")
        restrictions_table.add_column("Reason", style="dim")
        restrictions_table.add_column("Affected Values", style="dim")
        
        restrictions_table.add_row("Zone", restriction_reason, region)
        console.print(restrictions_table)
    
    # Display specifications
    console.print()
    console.print("[bold]VM SKU Specifications[/]".center(50))
    
    specs_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
    specs_table.add_column("Property", style="dim")
    specs_table.add_column("Value", style="dim")
    
    for key, value in specifications.items():
        specs_table.add_row(key, str(value))
    
    console.print(specs_table)
    
    # Display alternative SKUs
    if alternative_skus:
        console.print()
        console.print("[bold]Alternative SKUs[/]".center(50))
        
        alt_table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
        alt_table.add_column("SKU Name", style="dim")
        alt_table.add_column("vCPUs", style="dim")
        alt_table.add_column("Memory (GB)", style="dim")
        alt_table.add_column("Family", style="dim")
        alt_table.add_column("Similarity", style="dim")
        
        for sku in alternative_skus:
            alt_table.add_row(
                sku['name'],
                str(sku['vcpus']),
                str(sku['memory']),
                sku['family'],
                f"{sku['similarity']}%"
            )
        
        console.print(alt_table)

def display_results_text(region, target_sku, is_available, restriction_reason, zones, specifications, alternative_skus, subscription_name, subscription_id):
    """Display results using plain text formatting."""
    print("\n" + "=" * 80)
    print(f"AZURE VM SKU CAPACITY MONITOR - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 80)
    
    print(f"\nStatus:       {'AVAILABLE' if is_available else 'NOT AVAILABLE'}")
    print(f"SKU:          {target_sku}")
    print(f"Region:       {region}")
    print(f"Subscription: {subscription_name} ({subscription_id})")
    if not is_available:
        print(f"Details:      SKU {target_sku} is not available in region {region}")
    
    print("\nAvailable Zones:")
    if zones:
        for zone in zones:
            print(f"  - {zone}")
    else:
        print("  None")
    
    if not is_available:
        print("\nRestrictions:")
        print(f"  Type:           Zone")
        print(f"  Reason:         {restriction_reason}")
        print(f"  Affected Values: {region}")
    
    print("\nVM SKU Specifications:")
    for key, value in specifications.items():
        print(f"  {key}: {value}")
    
    if alternative_skus:
        print("\nAlternative SKUs:")
        for sku in alternative_skus:
            print(f"  - {sku['name']} (vCPUs: {sku['vcpus']}, Memory: {sku['memory']} GB, Family: {sku['family']}, Similarity: {sku['similarity']}%)")

def log_to_azure_monitor(data, log_analytics_config):
    """Log data to Azure Monitor."""
    try:
        # Import Azure Monitor Ingestion client
        from azure.monitor.ingestion import LogsIngestionClient
        
        # Initialize the logs ingestion client
        credential = DefaultAzureCredential()
        logs_client = LogsIngestionClient(endpoint=log_analytics_config['endpoint'], credential=credential)
        
        # Prepare the log entry
        log_entry = {
            "TimeGenerated": datetime.datetime.utcnow().isoformat(),
            "sku_name": data['sku'],
            "region": data['region'],
            "subscription_name": data['subscription_name'],
            "subscription_id": data['subscription_id'],
            "is_available": data['is_available'],
            "restriction_reason": data['restriction_reason'] or "",
            "zones": ",".join(data['zones']),
            "vcpus": data['specifications'].get('vCPUs', ""),
            "memory_gb": data['specifications'].get('MemoryGB', ""),
            "alternative_skus": ",".join([sku['name'] for sku in data['alternative_skus']])
        }
        
        # Upload the log entry
        logs_client.upload(
            rule_id=log_analytics_config['rule_id'],
            stream_name=log_analytics_config['stream_name'],
            logs=[log_entry]
        )
        
        logger.info("Successfully logged to Azure Monitor")
        return True
    except ImportError:
        logger.error("Azure Monitor Ingestion client not installed. Install with: pip install azure-monitor-ingestion")
        return False
    except HttpResponseError as e:
        logger.error(f"Error logging to Azure Monitor: {str(e)}")
        return False
    except Exception as e:
        logger.error(f"Error logging to Azure Monitor: {str(e)}")
        return False

def main():
    """Main function."""
    # Parse arguments
    args = parse_arguments()
    
    # Set debug logging if requested
    if args.debug:
        logger.setLevel(logging.DEBUG)
        logging.getLogger('azure').setLevel(logging.DEBUG)
    
    # Load configuration
    config = load_configuration(args)
    
    # Log start
    logger.info(f"Starting VM SKU capacity monitoring for {config['target_sku']} in {config['region']}")
    
    try:
        # Initialize Azure clients
        credential = DefaultAzureCredential()
        compute_client = ComputeManagementClient(credential, subscription_id=config['subscription_id'])
        subscription_client = SubscriptionClient(credential)
        
        # Get subscription details
        subscriptions = list(subscription_client.subscriptions.list())
        subscription_name = subscriptions[0].display_name if subscriptions else "Unknown"
        subscription_id = subscriptions[0].subscription_id if subscriptions else config['subscription_id']
        
        # Check SKU availability
        is_available, restriction_reason, zones, specifications, alternative_skus = check_sku_availability(
            compute_client,
            config['region'],
            config['target_sku'],
            config['check_zones']
        )
        
        # Display results
        if not is_available:
            logger.warning(f"SKU {config['target_sku']} is not available in region {config['region']}")
        
        # Prepare result data
        result_data = {
            'sku': config['target_sku'],
            'region': config['region'],
            'subscription_name': subscription_name,
            'subscription_id': subscription_id,
            'is_available': is_available,
            'restriction_reason': restriction_reason,
            'zones': zones,
            'specifications': specifications,
            'alternative_skus': alternative_skus
        }
        
        # Display results
        if RICH_AVAILABLE:
            display_results_rich(
                config['region'],
                config['target_sku'],
                is_available,
                restriction_reason,
                zones,
                specifications,
                alternative_skus,
                subscription_name,
                subscription_id
            )
        else:
            display_results_text(
                config['region'],
                config['target_sku'],
                is_available,
                restriction_reason,
                zones,
                specifications,
                alternative_skus,
                subscription_name,
                subscription_id
            )
        
        # Log to Azure Monitor if enabled
        if config['log_analytics']['enabled']:
            if not config['log_analytics']['endpoint'] or not config['log_analytics']['rule_id']:
                logger.error("Log Analytics endpoint and rule ID are required for Azure Monitor logging")
            else:
                try:
                    log_to_azure_monitor(result_data, config['log_analytics'])
                except Exception as e:
                    logger.error(f"Failed to log to Azure Monitor: {str(e)}")
    
    except Exception as e:
        logger.error(f"Error monitoring VM SKU capacity: {str(e)}")
        if args.debug:
            import traceback
            traceback.print_exc()
    
    logger.info("VM SKU capacity monitoring completed")

if __name__ == "__main__":
    main()


2. Log Analytics Setup Script

Now for the script that sets up all the Log Analytics stuff. This part is optional, but really helpful if you want to track capacity trends over time (setup_log_analytics.py):

#!/usr/bin/env python
"""
Azure VM SKU Capacity Monitor - Log Analytics Setup

This script automates the creation of:
  • Resource Group
  • Log Analytics Workspace (and waits for it to become active)
  • Data Collection Endpoint
  • Data Collection Rule
  • Custom table in the workspace

It then emits a `config.json` for `monitor_vm_sku_capacity_terminal.py`.
"""

import argparse
import json
import logging
import os
import re
import subprocess
import sys
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()],
)
logger = logging.getLogger("log_analytics_setup")


def run_command(cmd: str) -> str:
    """Run a shell command, returning stdout or raising on failure."""
    try:
        result = subprocess.run(
            cmd,
            shell=True,
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        logger.error(f"Command failed: {cmd}")
        logger.error(e.stderr.strip())
        raise


def parse_arguments():
    p = argparse.ArgumentParser(
        description="Setup Log Analytics for VM SKU Capacity Monitor"
    )
    p.add_argument(
        "--resource-group",
        "-g",
        default="vm-sku-monitor-rg",
        help="Resource group name",
    )
    p.add_argument(
        "--location", "-l", default="eastus2", help="Azure region (e.g. eastus2)"
    )
    p.add_argument(
        "--workspace",
        "-w",
        default="vmskumonitor-workspace",
        help="Log Analytics workspace name",
    )
    p.add_argument(
        "--dce",
        default="vmskumonitor-dce",
        help="Data Collection Endpoint name",
    )
    p.add_argument(
        "--dcr",
        default="vmskumonitor-dcr",
        help="Data Collection Rule name",
    )
    p.add_argument(
        "--table",
        "-t",
        default="VMSKUCapacity",
        help="Base name for custom table (suffix _CL added)",
    )
    p.add_argument(
        "--config",
        "-c",
        default="config.json",
        help="Output configuration file path",
    )
    return p.parse_args()


def ensure_rg(rg: str, loc: str):
    logger.info(f"Ensuring resource group {rg} exists in {loc}")
    try:
        run_command(f"az group show -n {rg}")
        logger.info(f"Resource group {rg} already exists.")
    except:
        run_command(f"az group create -n {rg} -l {loc}")
        logger.info(f"Resource group {rg} created.")


def ensure_workspace(rg: str, ws: str, loc: str):
    logger.info(f"Ensuring Log Analytics workspace {ws}")
    try:
        run_command(
            f"az monitor log-analytics workspace show "
            f"-g {rg} -n {ws}"
        )
        logger.info(f"Workspace {ws} already exists.")
    except:
        run_command(
            f"az monitor log-analytics workspace create "
            f"-g {rg} -n {ws} -l {loc}"
        )
        logger.info(f"Workspace {ws} created.")
    wait_for_workspace(rg, ws)


def wait_for_workspace(rg: str, ws: str, timeout: int = 300, interval: int = 10):
    logger.info(f"Waiting up to {timeout}s for workspace {ws} to become active…")
    elapsed = 0
    while elapsed < timeout:
        state = run_command(
            f"az monitor log-analytics workspace show "
            f"-g {rg} -n {ws} --query provisioningState -o tsv"
        ).strip().lower()
        if state == "succeeded":
            logger.info("Workspace is active.")
            return
        logger.info(f"Current state: {state!r}; retrying in {interval}s…")
        time.sleep(interval)
        elapsed += interval
    logger.warning(f"Workspace did not become active within {timeout}s; continuing.")


def ensure_dce(rg: str, dce: str, loc: str) -> str:
    logger.info(f"Ensuring Data Collection Endpoint {dce}")
    try:
        run_command(f"az monitor data-collection endpoint show -g {rg} -n {dce}")
        logger.info(f"DCE {dce} already exists.")
    except:
        run_command(
            f"az monitor data-collection endpoint create "
            f"-g {rg} -n {dce} -l {loc} --public-network-access Enabled"
        )
        logger.info(f"DCE {dce} created.")
    out = run_command(f"az monitor data-collection endpoint show -g {rg} -n {dce} -o json")
    return json.loads(out)["logsIngestion"]["endpoint"]


def deploy_custom_table(rg: str, ws: str, table: str):
    # Wait once more in case ingestion APIs lag behind provisioningState
    logger.info("Re-checking workspace readiness before custom table deployment…")
    wait_for_workspace(rg, ws, timeout=180, interval=15)

    arm = {
        "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
        "contentVersion": "1.0.0.0",
        "parameters": {
            "workspaceName": {"type": "string"},
            "tableName": {"type": "string"},
        },
        "resources": [
            {
                "type": "Microsoft.OperationalInsights/workspaces/tables",
                "apiVersion": "2021-12-01-preview",
                "name": "[concat(parameters('workspaceName'), '/', parameters('tableName'), '_CL')]",
                "properties": {
                    "schema": {
                        "name": "[concat(parameters('tableName'), '_CL')]",
                        "columns": [
                            {"name": "TimeGenerated", "type": "datetime"},
                            {"name": "sku_name", "type": "string"},
                            {"name": "region", "type": "string"},
                            {"name": "subscription_name", "type": "string"},
                            {"name": "subscription_id", "type": "string"},
                            {"name": "is_available", "type": "boolean"},
                            {"name": "restriction_reason", "type": "string"},
                            {"name": "zones", "type": "string"},
                            {"name": "vcpus", "type": "string"},
                            {"name": "memory_gb", "type": "string"},
                            {"name": "alternative_skus", "type": "string"},
                        ],
                    }
                },
            }
        ],
    }

    fn = f"custom-table-{int(time.time())}.json"
    with open(fn, "w") as f:
        json.dump(arm, f, indent=2)
    logger.info(f"Deploying custom table {table}_CL via ARM template")
    run_command(
        f"az deployment group create -g {rg} "
        f"--template-file {fn} "
        f"--parameters workspaceName={ws} tableName={table}"
    )
    os.remove(fn)
    logger.info("Custom table created.")


def deploy_dcr(rg: str, dcr: str, loc: str, dce_uri: str, ws: str, table: str) -> str:
    """
    Create or verify a Data Collection Rule that sends Custom-<table>_CL
    to the workspace. Returns the ImmutableId.
    """
    logger.info(f"Ensuring Data Collection Rule {dcr}")
    try:
        run_command(f"az monitor data-collection rule show -g {rg} -n {dcr}")
        logger.info(f"DCR {dcr} already exists.")
    except:
        # gather resource IDs
        ws_id = run_command(f"az monitor log-analytics workspace show -g {rg} -n {ws} -o json")
        ws_id = json.loads(ws_id)["id"]
        dce_id = run_command(f"az monitor data-collection endpoint show -g {rg} -n {dcr.replace('-dcr','-dce')} -o json")
        dce_id = json.loads(dce_id)["id"]

        # build ARM
        arm = {
            "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
            "contentVersion": "1.0.0.0",
            "parameters": {
                "dcrName": {"type": "string"},
                "location": {"type": "string"},
                "dceId": {"type": "string"},
                "workspaceId": {"type": "string"},
                "streamName": {"type": "string"},
            },
            "resources": [
                {
                    "type": "Microsoft.Insights/dataCollectionRules",
                    "apiVersion": "2021-09-01-preview",
                    "name": "[parameters('dcrName')]",
                    "location": "[parameters('location')]",
                    "properties": {
                        "dataCollectionEndpointId": "[parameters('dceId')]",
                        "streamDeclarations": {
                            "[parameters('streamName')]": {
                                "columns": [
                                    {"name": "TimeGenerated", "type": "datetime"},
                                    {"name": "sku_name", "type": "string"},
                                    {"name": "region", "type": "string"},
                                    {"name": "subscription_name", "type": "string"},
                                    {"name": "subscription_id", "type": "string"},
                                    {"name": "is_available", "type": "boolean"},
                                    {"name": "restriction_reason", "type": "string"},
                                    {"name": "zones", "type": "string"},
                                    {"name": "vcpus", "type": "string"},
                                    {"name": "memory_gb", "type": "string"},
                                    {"name": "alternative_skus", "type": "string"},
                                ]
                            }
                        },
                        "destinations": {
                            "logAnalytics": [
                                {
                                    "workspaceResourceId": "[parameters('workspaceId')]",
                                    "name": "la-destination",
                                }
                            ]
                        },
                        "dataFlows": [
                            {
                                "streams": ["[parameters('streamName')]"],
                                "destinations": ["la-destination"],
                            }
                        ],
                    },
                }
            ],
        }
        fn = f"dcr-{int(time.time())}.json"
        with open(fn, "w") as f:
            json.dump(arm, f, indent=2)
        run_command(
            f"az deployment group create -g {rg} "
            f"--template-file {fn} "
            f"--parameters "
            f"dcrName={dcr} location={loc} "
            f"dceId={dce_id} workspaceId={ws_id} "
            f"streamName=Custom-{table}_CL"
        )
        os.remove(fn)
        logger.info(f"DCR {dcr} created.")

    # return immutableId
    out = run_command(f"az monitor data-collection rule show -g {rg} -n {dcr} -o json")
    return json.loads(out)["immutableId"]


def write_config(path: str, dce_uri: str, dcr_id: str, table: str, loc: str):
    cfg = {
        "region": loc,
        "target_sku": "Standard_D16ds_v5",
        "check_zones": True,
        "log_analytics": {
            "enabled": True,
            "endpoint": dce_uri,
            "rule_id": dcr_id,
            "stream_name": f"Custom-{table}_CL",
        },
        "check_interval_minutes": 15,
    }
    with open(path, "w") as f:
        json.dump(cfg, f, indent=2)
    logger.info(f"Wrote configuration to {path}")


def main():
    args = parse_arguments()
    logger.info("Starting Log Analytics setup…")

    ensure_rg(args.resource_group, args.location)
    ensure_workspace(args.resource_group, args.workspace, args.location)
    
    # Create the Data Collection Endpoint
    dce_uri = ensure_dce(args.resource_group, args.dce, args.location)
    
    # IMPORTANT: Create the custom table BEFORE the DCR
    # This fixes the "InvalidOutputTable" error
    deploy_custom_table(args.resource_group, args.workspace, args.table)
    
    # Now create the Data Collection Rule that references the custom table
    dcr_id = deploy_dcr(
        args.resource_group, args.dcr, args.location, dce_uri, args.workspace, args.table
    )

    write_config(args.config, dce_uri, dcr_id, args.table, args.location)

    logger.info("Log Analytics setup completed successfully!")


if __name__ == "__main__":
    main()

Setting default region and VM SKU

You've got a few options to set your preferred region and VM SKU:

1. Edit script defaults: Open monitor_vm_sku_capacity.py and look for:

parser.add_argument('--region', type=str, default='eastus2',  # Change this!
                    help='Azure region to check (default: eastus2)')
parser.add_argument('--sku', type=str, default='Standard_D16ds_v5',  # And this!
                    help='VM SKU to check (default: Standard_D16ds_v5)')

2. Specify on command line:

python monitor_vm_sku_capacity.py --region westus2 --sku Standard_D8ds_v5

3. Edit config file: After running the setup script, it creates a config.json with these values:

{
  "region": "eastus2",
  "target_sku": "Standard_D16ds_v5",
  "check_zones": true,
  ...
}

Finding Available Regions and SKUs

If you're wondering which regions and SKUs to monitor, here's how to get that info:

Using Azure CLI

# List all regions
az account list-locations --query "[].name" -o tsv

# List all VM SKUs in a region 
az vm list-skus --location eastus2 --resource-type virtualMachines --query "[].name" -o tsv  

# Get detailed info about a specific SKU
az vm list-skus --location eastus2 --size Standard_D16ds_v5 -o table

Using Azure Portal

Just go to the VM creation page in the portal and click "See all sizes" - you'll get a nice visual list of all available options. I sometimes just take a screenshot of this for reference.

Using this tool

So here's how you use this thing. I tried to make it as simple as possible:

1. Set up Log Analytics first (optional but recommended):

python setup_log_analytics.py

This builds all the Log Analytics stuff and spits out a config file you can use in the next step. The default options should work fine for most people, but you can customize if needed.

2. Run the monitoring script:

python monitor_vm_sku_capacity.py --config config.json

If you don't want to mess with Log Analytics, you can just run it directly:

python monitor_vm_sku_capacity.py --region eastus2 --sku Standard_D16ds_v5

The output will look something like this (way prettier if you have the rich package installed):

================================================================================
AZURE VM SKU CAPACITY MONITOR - 2024-05-20 14:32:45
================================================================================

Status:       AVAILABLE
SKU:          Standard_D16ds_v5
Region:       eastus2
Subscription: My Azure Subscription (12345678-1234-1234-1234-123456789012)

Available Zones:
  - 1
  - 2
  - 3

VM SKU Specifications:
  vCPUs: 16
  MemoryGB: 64
  MaxDataDiskCount: 32
  PremiumIO: True
  AcceleratedNetworkingEnabled: True

Or if the VM is unavailable:

================================================================================
AZURE VM SKU CAPACITY MONITOR - 2024-05-20 14:32:45
================================================================================

Status:       NOT AVAILABLE
SKU:          Standard_D16ds_v5
Region:       eastus2
Subscription: My Azure Subscription (12345678-1234-1234-1234-123456789012)
Details:      SKU Standard_D16ds_v5 is not available in region eastus2

Available Zones:
  None

Restrictions:
  Type:           Zone
  Reason:         NotAvailableForSubscription
  Affected Values: eastus2

VM SKU Specifications:
  vCPUs: 16
  MemoryGB: 64
  MaxDataDiskCount: 32
  PremiumIO: True
  AcceleratedNetworkingEnabled: True

Alternative SKUs:
  - Standard_D16as_v5 (vCPUs: 16, Memory: 64 GB, Family: standardDasv5Family, Similarity: 100%)
  - Standard_D16s_v5 (vCPUs: 16, Memory: 64 GB, Family: standardDsv5Family, Similarity: 100%)
  - Standard_D16s_v4 (vCPUs: 16, Memory: 64 GB, Family: standardDsv4Family, Similarity: 100%)
  - Standard_F16s_v2 (vCPUs: 16, Memory: 32 GB, Family: standardFSv2Family, Similarity: 80%)
  - Standard_E16s_v5 (vCPUs: 16, Memory: 128 GB, Family: standardEsv5Family, Similarity: 80%)

Setting up scheduled checks

I don't like missing things, so I set mine up to run every hour using cron:

# Open crontab editor
crontab -e

# Add this line to run it every hour
0 * * * * cd /path/to/scripts && source venv/bin/activate && python monitor_vm_sku_capacity.py --config config.json >> vm_sku_monitor.log 2>&1

Checking your data in Log Analytics

If you set up Log Analytics, you can run all sorts of cool queries:

// Basic query - see everything
VMSKUCapacity_CL
| order by TimeGenerated desc

// Find when capacity changed
VMSKUCapacity_CL
| where sku_name == "Standard_D16ds_v5" and region == "eastus2"
| project TimeGenerated, is_available
| order by TimeGenerated desc


// Simple dashboard
VMSKUCapacity_CL
| summarize LastStatus=arg_max(TimeGenerated, is_available), 
            LastChecked=max(TimeGenerated) 
  by sku_name, region
| extend Status = iff(LastStatus == true, "Available", "Not Available")
| project sku_name, region, Status, LastChecked

You can set up alerts too. That way Azure tells YOU when capacity changes, instead of you finding out during a failed deployment!

Troubleshooting

Some common problems I've run into:

  1. "Could not automatically detect subscription ID":
    • Make sure you're logged in with az login
    • Or just provide it explicitly with --subscription-id
  2. Log Analytics permission errors:
    • Make sure you ran the permission commands from the prerequisites section
    • Azure's permissions can be weirdly slow - wait 10-15 minutes and try again
  3. Python environment issues:
    • Always use a virtual environment! I learned this one the hard way when I messed up my system Python
    • Make sure all the packages are installed with pip install azure-identity azure-mgmt-compute azure-mgmt-subscription azure-monitor-ingestion rich

Next Steps

  1. Create a dashboard  to visualize VM SKU availability over time
  2. Set up alerts  to notify you when specific SKUs become available
  3. Integrate with your CI/CD pipeline to automatically select available SKUs
  4. For a serverless, fully managed option, create an Azure Function version of the monitoring script

Advanced: Bulk-Deploy Feasibility Check

Want to know up front “can I spin up N VMs of SKU X in region Y?”
We combine:

  1. Hardware-level: Resource SKUs API (is the SKU unrestricted?)
  2. Subscription-level: Usage API (enough free vCPU cores for N instances?)

Prerequisites already covered above:

az login
USER_PRINCIPAL=$(az ad signed-in-user show --query userPrincipalName -o tsv)

az group create --name vm-sku-monitor-rg --location eastus2

az role assignment create \
  --assignee "$USER_PRINCIPAL" \
  --role "Monitoring Metrics Publisher" \
  --scope "/subscriptions/$(az account show --query id -o tsv)/resourcegroups/vm-sku-monitor-rg"

python3 -m venv venv && source venv/bin/activate

pip install azure-identity azure-mgmt-compute azure-mgmt-subscription rich


 File: monitor_vm_sku_capacity_bulk.py

#!/usr/bin/env python
"""
Azure VM SKU Capacity & Quota Monitor (with Zone support)

Checks:
  1) Whether your target SKU is available in a region or zone
  2) Whether your subscription has enough free vCPU quota to deploy N VMs
Optionally logs results into Azure Log Analytics.
"""

import argparse
import datetime
import json
import logging
import subprocess
from typing import List, Tuple, Dict, Any

from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.subscription import SubscriptionClient

# Rich for prettier tables
try:
    from rich.console import Console
    from rich.table import Table
    from rich import box
    RICH_AVAILABLE = True
except ImportError:
    RICH_AVAILABLE = False

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("vm_sku_capacity_monitor")


def parse_arguments():
    p = argparse.ArgumentParser(
        description="Azure VM SKU Capacity & Quota Monitor (with zone support)"
    )
    p.add_argument("--region",        type=str,   default="eastus2",
                   help="Azure region to check")
    p.add_argument("--sku",           type=str,   default="Standard_D16ds_v5",
                   help="VM SKU to check")
    p.add_argument("--zone",          type=str,   default=None,
                   help="(Optional) Availability zone to check (e.g. '1')")
    p.add_argument("--count",         type=int,   default=1,
                   help="Number of VMs you plan to deploy")
    p.add_argument("--log-analytics", action="store_true",
                   help="Enable logging to Azure Log Analytics")
    p.add_argument("--endpoint",      type=str,
                   help="Data Collection Endpoint URI")
    p.add_argument("--rule-id",       type=str,
                   help="Data Collection Rule ID")
    p.add_argument("--stream-name",   type=str, default="Custom-VMSKUCapacity_CL",
                   help="Log Analytics stream name")
    p.add_argument("--debug",         action="store_true",
                   help="Enable debug logging")
    p.add_argument("--config",        type=str,
                   help="Path to JSON config file")
    p.add_argument("--subscription-id", type=str,
                   help="Azure Subscription ID")
    return p.parse_args()


def load_configuration(args) -> Dict[str, Any]:
    cfg = {
        "region": args.region,
        "zone": args.zone,
        "target_sku": args.sku,
        "desired_count": args.count,
        "subscription_id": args.subscription_id,
        "log_analytics": {
            "enabled": args.log_analytics,
            "endpoint": args.endpoint,
            "rule_id": args.rule_id,
            "stream_name": args.stream_name
        }
    }
    if args.config:
        try:
            with open(args.config) as f:
                j = json.load(f)
                # merge known keys
                for k in ("region","zone","target_sku","desired_count","subscription_id"):
                    if k in j: cfg[k] = j[k]
                cfg["log_analytics"].update(j.get("log_analytics", {}))
                logger.info(f"Loaded configuration from {args.config}")
        except Exception as e:
            logger.error(f"Failed loading config {args.config}: {e}")
    # CLI args override file
    if args.region:     cfg["region"] = args.region
    if args.zone:       cfg["zone"] = args.zone
    if args.sku:        cfg["target_sku"] = args.sku
    if args.count:      cfg["desired_count"] = args.count
    if args.subscription_id:
        cfg["subscription_id"] = args.subscription_id
    return cfg


def get_subscription_id(explicit: str) -> str:
    if explicit:
        return explicit
    # Try Azure CLI
    try:
        out = subprocess.run(
            "az account show --query id -o tsv",
            shell=True, check=True,
            stdout=subprocess.PIPE, text=True
        ).stdout.strip()
        if out:
            return out
    except:
        pass
    # Fallback: Azure SDK
    cred = DefaultAzureCredential()
    subs = list(SubscriptionClient(cred).subscriptions.list())
    return subs[0].subscription_id if subs else None


def check_sku_availability(
    compute: ComputeManagementClient,
    region: str, sku: str, zone: str = None
) -> Tuple[bool, str, List[str], Dict[str, Any]]:
    """
    Returns:
      is_available (bool),
      reason (str or None),
      supported_zones (list of str),
      capabilities (dict of name→value)
    """
    skus = list(compute.resource_skus.list())
    entry = next(
        (s for s in skus
         if s.name.lower() == sku.lower()
         and region.lower() in [loc.lower() for loc in s.locations]),
        None
    )
    if not entry:
        return False, "NotFound", [], {}

    # Find all zones where this SKU is sold in that region
    supported_zones = []
    for loc_info in entry.location_info or []:
        if loc_info.location.lower() == region.lower():
            supported_zones = loc_info.zones or []
            break

    # Determine restrictions
    if zone:
        # 1) If SKU doesn’t support the requested zone
        if zone not in supported_zones:
            return False, "UnsupportedZone", supported_zones, {}
        # 2) Check zone-level restrictionInfo.zones
        restricted = [
            r for r in entry.restrictions
            if r.restriction_info.zones and zone in r.restriction_info.zones
        ]
    else:
        # Region-level check
        restricted = [
            r for r in entry.restrictions
            if region.lower() in [l.lower() for l in r.restriction_info.locations]
        ]

    is_avail = len(restricted) == 0
    reason   = restricted[0].reason_code if restricted else None

    # Pull out SKU capabilities (vCPUs, MemoryGB, etc.)
    caps = {c.name: c.value for c in entry.capabilities or []}

    return is_avail, reason, supported_zones, caps


def check_quota(
    compute: ComputeManagementClient,
    region: str, vcpus_needed: int, count: int
) -> Tuple[int,int,bool]:
    usage = list(compute.usage.list(location=region))
    core = next((u for u in usage if u.name.value.lower()=="cores"), None)
    free = (core.limit - core.current_value) if core else 0
    required = vcpus_needed * count
    return free, required, free >= required


def display(rdata: Dict[str, Any]):
    if RICH_AVAILABLE:
        c = Console()
        c.print(f"\n[bold underline]SKU Capacity & Quota (Zone) Check "
                f"({datetime.datetime.now():%Y-%m-%d %H:%M:%S})[/]\n")

        # Availability table
        t1 = Table(box=box.SIMPLE)
        t1.add_column("SKU"); t1.add_column("Region"); t1.add_column("Zone")
        t1.add_column("Available"); t1.add_column("Reason")
        t1.add_row(
            rdata["target_sku"], rdata["region"],
            rdata["zone"] or "-",
            "✅" if rdata["is_available"] else "❌",
            rdata["reason"] or "-"
        )
        c.print(t1)

        # Supported zones
        t0 = Table(box=box.SIMPLE)
        t0.add_column("Supported Zones")
        t0.add_row(", ".join(rdata["supported_zones"]) or "None")
        c.print(t0)

        # Quota table
        t2 = Table(box=box.SIMPLE)
        t2.add_column("Desired VMs", justify="right")
        t2.add_column("vCPUs/VM",   justify="right")
        t2.add_column("Free Cores", justify="right")
        t2.add_column("Needs Cores",justify="right")
        t2.add_column("Quota OK?",  justify="center")
        t2.add_row(
            str(rdata["desired_count"]),
            str(rdata["vcpus"]),
            str(rdata["free_cores"]),
            str(rdata["required_cores"]),
            "✅" if rdata["quota_ok"] else "❌"
        )
        c.print(t2)

    else:
        print(f"\nSKU {rdata['target_sku']} in {rdata['region']} "
              f"zone {rdata['zone'] or '-'}: "
              f"Available={rdata['is_available']} (Reason={rdata['reason']})")
        print("Supported zones:", ", ".join(rdata["supported_zones"]) or "None")
        print(f"Quota: need {rdata['required_cores']} cores, "
              f"have {rdata['free_cores']} → OK={rdata['quota_ok']}")


def main():
    args = parse_arguments()
    if args.debug:
        logger.setLevel(logging.DEBUG)

    cfg = load_configuration(args)
    cfg["subscription_id"] = get_subscription_id(cfg.get("subscription_id"))
    logger.info(f"Checking SKU {cfg['target_sku']} x{cfg['desired_count']} "
                f"in {cfg['region']} zone {cfg['zone']}")

    cred = DefaultAzureCredential()
    compute = ComputeManagementClient(cred, cfg["subscription_id"])

    # 1) SKU + zone availability
    is_avail, reason, zones, caps = check_sku_availability(
        compute, cfg["region"], cfg["target_sku"], cfg["zone"]
    )
    vcpus = int(caps.get("vCPUs", 0))

    # 2) Subscription quota check
    free, required, ok = check_quota(
        compute, cfg["region"], vcpus, cfg["desired_count"]
    )

    result = {
        "target_sku":      cfg["target_sku"],
        "region":          cfg["region"],
        "zone":            cfg["zone"],
        "supported_zones": zones,
        "desired_count":   cfg["desired_count"],
        "is_available":    is_avail,
        "reason":          reason,
        "vcpus":           vcpus,
        "free_cores":      free,
        "required_cores":  required,
        "quota_ok":        ok
    }

    display(result)

    # (Optional) send to Log Analytics…
    # [omitted for brevity]


if __name__ == "__main__":
    main()


Run the bulk-deploy checker (region-level check)

python monitor_vm_sku_capacity_bulk.py \
  --region centralus \
  --sku Standard_B2s_v2 \
  --count 10 

(Optionally add the parameter  --log-analytics --endpoint <DCE-URI> --rule-id <DCR-ID> to send it to Log Analytics)

Example output

SKU Capacity & Quota (Zone) Check (2025-06-20 12:49:58)


  SKU               Region      Zone   Available   Reason
 ─────────────────────────────────────────────────────────
  Standard_B2s_v2   centralus   -      ✅          -


  Supported Zones
 ─────────────────
  1, 3, 2


  Desired VMs   vCPUs/VM   Free Cores   Needs Cores   Quota OK?
 ───────────────────────────────────────────────────────────────
           10          2          100            20      ✅

Run the bulk-deploy checker (zone-level heck)

python monitor_vm_sku_capacity_bulk.py \
  --region centralus \
  --zone 2 \
  --sku Standard_B2s_v2 \
  --count 10 

Example output

SKU Capacity & Quota (Zone) Check (2025-06-20 12:42:22)


  SKU               Region      Zone   Available   Reason
 ─────────────────────────────────────────────────────────
  Standard_B2s_v2   centralus   2      ✅          -


  Supported Zones
 ─────────────────
  1, 3, 2


  Desired VMs   vCPUs/VM   Free Cores   Needs Cores   Quota OK?
 ───────────────────────────────────────────────────────────────
           10          2          100            20      ✅

 

Final Thoughts

This monitoring solution has proven to be a valuable asset for Azure infrastructure management. Organizations using this tool can present data-driven insights on VM availability patterns during planning sessions, enabling more informed decisions about infrastructure scaling strategies.

This solution effectively reduces unplanned downtime and deployment failures by providing proactive notifications about resource constraints before they impact production systems. 

Happy monitoring!

Updated Jun 20, 2025
Version 9.0
No CommentsBe the first to comment