Blog Post

AI - Azure AI services Blog
2 MIN READ

Implementing Event Hub Logging for Azure OpenAI Streaming APIs

hieunhu's avatar
hieunhu
Icon for Microsoft rankMicrosoft
Nov 14, 2024

Azure OpenAI's streaming responses use Server-Sent Events (SSE), which support only one subscriber. This creates a challenge when using APIM's Event Hub Logger as it would consume the stream, preventing the actual client from receiving the response.

This solution introduces a lightweight Azure Function proxy that enables Event Hub logging while preserving the streaming response for clients.

With token usage data being available in both stream & non-stream AOAI API, we can monitor this the right way!

Architecture

Client → APIM → Azure Function Proxy → Azure OpenAI
                                           ↓
                                   Event Hub

Technical Implementation

Streaming Response Handling

The core implementation uses FastAPI's StreamingResponse to handle Server-Sent Events (SSE) streams with three key components:

1. Content Aggregation

async def process_openai_stream(response, messages, http_client, start_time):
    content_buffer = []
    async def generate():
        for chunk in response:
            if chunk.choices[0].delta.content:
                content_buffer.append(chunk.choices[0].delta.content)
            yield f"data: {json.dumps(chunk.model_dump())}\n\n"

This enables real-time streaming to clients while collecting the complete response for logging. The content buffer maintains minimal memory overhead by storing only text content.

2. Token Usage Collection

if hasattr(chunk, 'usage') and chunk.usage:
    log_data = {
        "type": "stream_completion",
        "content": "".join(content_buffer),
        "usage": chunk.usage.model_dump(),
        "model": model_name,
        "region": headers.get("x-ms-region", "unknown")
    }
    log_to_eventhub(log_data)

Token usage metrics are captured from the final chunk, providing accurate consumption data for cost analysis and monitoring.

3. Performance Tracking

@app.route(route="openai/deployments/{deployment_name}/chat/completions")
async def aoaifn(req: Request):
    start_time = time.time()
    response = await process_request()
    latency_ms = int((time.time() - start_time) * 1000)
    log_data["latency_ms"] = latency_ms

End-to-end latency measurement includes request processing, OpenAI API call, and response handling, enabling performance monitoring and optimization.

Demo

Function Start

API Call

Event Hub

Setup

  1. Deploy the Azure Function
  2. Configure environment variables:
    AZURE_OPENAI_KEY= 
    AZURE_OPENAI_API_VERSION=2024-08-01-preview 
    AZURE_OPENAI_BASE_URL=https://.openai.azure.com/ 
    AZURE_EVENTHUB_CONN_STR= 
  3. Update APIM routing to point to the Function App

Extension scenarios:

  1. APIM Managed Identity Auth token passthrough
  2. PII Filtering: Integration with Azure Presidio for real-time PII detection and masking in logs
  3. Cost Analysis: Token usage mapping to Azure billing metrics
  4. Latency based routing: AOAI Endpoint ranking could be built based on Latency metrics
  5. Monitoring Dashboard: Real-time visualisation of:
    • Token usage per model/deployment
    • Response latencies
    • Error rates
    • Regional distribution

Implementation available on GitHub.

Updated Nov 15, 2024
Version 8.0
  • sterankin's avatar
    sterankin
    Copper Contributor

    hieunhuNo matter what I try, the code does not log to the eventhub, in fact it does not even display the logging.info before.  I even removed the if statement to ensure it always hit that code block in in the finally statement.  Is there any reason this code execution would not execute or why it would not be shown in the logs:

    [Information]   Processing OpenAI proxy request
    [Information]   Request body: {"messages": [{"role": "user", "content": "Tell me a joke"}], "stream": true, "stream_options": {"include_usage": true}}
    [Information]   Extra args being passed to OpenAI: {"stream_options": {"include_usage": true}}
    [Information]   HTTP Request: POST https://redacted/openai/deployments/gpt-4o-2024-08-06/chat/completions?api-version=2024-02-01 "HTTP/1.1 200 OK"
    [Information]   Processing OpenAI proxy request
    [Information]   Request body: {"messages": [{"role": "user", "content": "Tell me a joke"}], "stream": true, "stream_options": {"include_usage": true}}
    [Information]   Extra args being passed to OpenAI: {"stream_options": {"include_usage": true}}
    [Information]   INFO:     ::ffff:169.254.129.7:0 - "POST /api/openai/deployments/gpt-4o-2024-08-06/chat/completions?api-version=2024-02-01 HTTP/1.1" 200 OK
    [Information]   HTTP Request: POST https://redacted/openai/deployments/gpt-4o-2024-08-06/chat/completions?api-version=2024-02-01 "HTTP/1.1 200 OK"
    [Information]   Executed 'Functions.aoaifn' (Succeeded, Id=ce277664-c1e0-4f09-92bf-d865a49fd4e6, Duration=29449ms)
    [Information]   INFO:     ::ffff:169.254.129.7:0 - "POST /api/openai/deployments/gpt-4o-2024-08-06/chat/completions?api-version=2024-02-01 HTTP/1.1" 200 OK
    [Information]   Executed 'Functions.aoaifn' (Succeeded, Id=2ef710a7-04ac-4c31-8df6-2eb504648561, Duration=32232ms)

     

  • hi hieunhu , regarding the extension scenarios, are there plans to implement any of these extensions? I am specifically interested in 'Cost Analysis: Token usage mapping to Azure billing metrics.' This would be significantly useful for tracking AOAI token consumption from streaming requests.