Blog Post

AI - Azure AI services Blog
19 MIN READ

From Zero to Hero: Building Your First Voice Bot with GPT-4o Real-Time API using Python

mrajguru's avatar
mrajguru
Icon for Microsoft rankMicrosoft
Oct 12, 2024

Voice technology is transforming how we interact with machines, making conversations with AI feel more natural than ever before. With the public beta release of the Realtime API powered by GPT-4o, developers now have the tools to create low-latency, multimodal voice experiences in their apps, opening up endless possibilities for innovation.

 

Gone are the days when building a voice bot required stitching together multiple models for transcription, inference, and text-to-speech conversion. With the Realtime API, developers can now streamline the entire process with a single API call, enabling fluid, natural speech-to-speech conversations. This is a game-changer for industries like customer support, education, and real-time language translation, where fast, seamless interactions are crucial.

 

 

In this blog, we’ll guide you through the process of building your first real-time voice bot from scratch using the GPT-4o Realtime Model. We’ll cover key features of the Realtime API, how to set up a WebSocket connection for voice streaming, and how to leverage the API’s ability to handle interruptions and make function calls. By the end, you’ll be ready to create a voice bot that responds to users with near-human accuracy and emotion. Whether you're a beginner or an experienced developer, this blueprint will help you get started with creating immersive voice interactions that are both responsive and engaging. Ready to dive in? Let’s get started!

 

Key Features

 

  • Low-Latency Streaming: Enables real-time audio input and output, facilitating natural and seamless conversations.
  • Multimodal Support: Handles both text and audio inputs and outputs, allowing for versatile interaction modes.
  • Preset Voices: Supports six predefined voices, ensuring quality and consistency in responses.
  • Function Calling: Allows the voice assistant to perform actions or retrieve context-specific information dynamically.
  • Safety and Privacy: Incorporates multiple layers of safety protections, including automated monitoring and adherence to privacy policies.

 

How GPT-4o Realtime API Works

 

 Traditionally, building a voice assistant required chaining together several models: an automatic speech recognition (ASR) model like Whisper for transcribing audio, a text-based model for processing responses, and a text-to-speech (TTS) model for generating audio outputs. This multi-step process often led to delays and a loss of emotional nuance.

The GPT-4o Realtime API revolutionizes this by consolidating these functionalities into a single API call. By establishing a persistent WebSocket connection, developers can stream audio inputs and outputs directly, significantly reducing latency and enhancing the naturalness of conversations. Additionally, the API's function calling capability allows the voice bot to perform actions such as placing orders or retrieving customer information on the fly.

 

 

 

 

 

 

Building Your Real-Time Voice Bot

 
Let's dive into the step-by-step process of building your own real-time voice bot using the GPT-4o Realtime API.

Prerequisites

 
Before you begin, ensure you have the following:

  • Azure Subscription: Create one for free.
  • Azure OpenAI Resource: Set up in a supported region (East US 2 or Sweden Central).
  • Development Environment: Familiarity with Python and basic asynchronous programming.
  • Client Libraries: Tools like LiveKit, Agora, or Twilio can enhance your bot's capabilities.

 

Setting Up the API

 

  1. Deploy the GPT-4o Realtime Model:
    • Navigate to the Azure AI Studio.
    • Access the Model Catalog and search for gpt-4o-realtime-preview.
    • Deploy the model by selecting your Azure OpenAI resource and configuring the deployment settings.
  2. Configure Audio Input and Output:
    • The API supports various audio formats, primarily pcm16.
    • Set up your client to handle audio streaming, ensuring compatibility with the API's requirements.

 

This project demonstrates how to build a sophisticated real-time conversational AI system using Azure OpenAI. By leveraging WebSocket connections and an event-driven architecture, the system provides responsive and context-aware customer support in any language. This approach can be adapted to various languages and use cases, making it a versatile solution for businesses looking to enhance their customer service capabilities. The project consists of three main components:

  • Realtime API: Handles WebSocket connections to Azure OpenAI's real-time API.
  • Tools: Defines various customer support functions like checking order status, processing returns, and more.
  • Application: Manages the interaction flow and integrates the real-time client with UI Layer.

 

Environment Setup

 

Create an .env file and update the following environment variables:

 

 

 

AZURE_OPENAI_API_KEY=XXXX
# replace with your Azure OpenAI API Key

AZURE_OPENAI_ENDPOINT=https://xxxx.openai.azure.com/
# replace with your Azure OpenAI Endpoint

AZURE_OPENAI_DEPLOYMENT=gpt-4o-realtime-preview
#Create a deployment for the gpt-4o-realtime-preview model and place the deployment name here. You can name the deployment as per your choice and put the name here.

AZURE_OPENAI_CHAT_DEPLOYMENT_VERSION=2024-10-01-preview
#You don't need to change this unless you are willing to try other versions.

 

 

 

requirements.txt

 

 

 

chainlit==1.3.0rc1
openai
beautifulsoup4
lxml
python-dotenv
websockets
aiohttp

 

 

 

 

Implementing the Realtime Client

 
The heartbeat of your voice bot is the Realtime Client, which manages the WebSocket connection and handles communication with the GPT-4o Realtime API. The RealtimeAPI class is responsible for managing WebSocket connections to OpenAI's real-time API. It handles sending and receiving messages, dispatching events, and maintaining the connection state.

Key features include:
 
  • Connection Management: Establishes and maintains a WebSocket connection. 
  • Event Dispatching: Uses an event-driven architecture to handle incoming and outgoing messages.
  • Audio Processing: Convert audio inputs from base64 to array buffers and vice versa using utility functions. Manage audio streams efficiently to ensure minimal latency and high-quality voice interactions.


Key Components:

  • RealtimeAPI Class:
    • Establishes and maintains the WebSocket connection.
    • Handles sending and receiving messages.
    • Manages event dispatching for various conversation events.

 

 

 

class RealtimeAPI(RealtimeEventHandler):
    def __init__(self):
        super().__init__()
        self.default_url = 'wss://api.openai.com/v1/realtime'
        self.url = os.environ["AZURE_OPENAI_ENDPOINT"]
        self.api_key = os.environ["AZURE_OPENAI_API_KEY"]
        self.api_version = "2024-10-01-preview"
        self.azure_deployment = os.environ["AZURE_OPENAI_DEPLOYMENT"]
        self.ws = None

    def is_connected(self):
        return self.ws is not None

    def log(self, *args):
        logger.debug(f"[Websocket/{datetime.utcnow().isoformat()}]", *args)

    async def connect(self, model='gpt-4o-realtime-preview'):
        if self.is_connected():
            raise Exception("Already connected")
        self.ws = await websockets.connect(f"{self.url}/openai/realtime?api-version={self.api_version}&deployment={model}&api-key={self.api_key}", extra_headers={
            'Authorization': f'Bearer {self.api_key}',
            'OpenAI-Beta': 'realtime=v1'
        })
        self.log(f"Connected to {self.url}")
        asyncio.create_task(self._receive_messages())

    async def _receive_messages(self):
        async for message in self.ws:
            event = json.loads(message)
            if event['type'] == "error":
                logger.error("ERROR", message)
            self.log("received:", event)
            self.dispatch(f"server.{event['type']}", event)
            self.dispatch("server.*", event)

    async def send(self, event_name, data=None):
        if not self.is_connected():
            raise Exception("RealtimeAPI is not connected")
        data = data or {}
        if not isinstance(data, dict):
            raise Exception("data must be a dictionary")
        event = {
            "event_id": self._generate_id("evt_"),
            "type": event_name,
            **data
        }
        self.dispatch(f"client.{event_name}", event)
        self.dispatch("client.*", event)
        self.log("sent:", event)
        await self.ws.send(json.dumps(event))

    def _generate_id(self, prefix):
        return f"{prefix}{int(datetime.utcnow().timestamp() * 1000)}"

    async def disconnect(self):
        if self.ws:
            await self.ws.close()
            self.ws = None
            self.log(f"Disconnected from {self.url}")

 

 

 

 

Reference: init.py 

 

  • RealtimeConversation Class:
    • Manages the state of the conversation.
    • Processes different types of events, such as message creation, transcription completion, and audio streaming.
    • Queues and formats audio and text data for seamless interaction.

 

 

 

class RealtimeConversation:
    default_frequency = config.features.audio.sample_rate
    
    EventProcessors = {
        'conversation.item.created': lambda self, event: self._process_item_created(event),
        'conversation.item.truncated': lambda self, event: self._process_item_truncated(event),
        'conversation.item.deleted': lambda self, event: self._process_item_deleted(event),
        'conversation.item.input_audio_transcription.completed': lambda self, event: self._process_input_audio_transcription_completed(event),
        'input_audio_buffer.speech_started': lambda self, event: self._process_speech_started(event),
        'input_audio_buffer.speech_stopped': lambda self, event, input_audio_buffer: self._process_speech_stopped(event, input_audio_buffer),
        'response.created': lambda self, event: self._process_response_created(event),
        'response.output_item.added': lambda self, event: self._process_output_item_added(event),
        'response.output_item.done': lambda self, event: self._process_output_item_done(event),
        'response.content_part.added': lambda self, event: self._process_content_part_added(event),
        'response.audio_transcript.delta': lambda self, event: self._process_audio_transcript_delta(event),
        'response.audio.delta': lambda self, event: self._process_audio_delta(event),
        'response.text.delta': lambda self, event: self._process_text_delta(event),
        'response.function_call_arguments.delta': lambda self, event: self._process_function_call_arguments_delta(event),
    }
    
    def __init__(self):
        self.clear()

    def clear(self):
        self.item_lookup = {}
        self.items = []
        self.response_lookup = {}
        self.responses = []
        self.queued_speech_items = {}
        self.queued_transcript_items = {}
        self.queued_input_audio = None

    def queue_input_audio(self, input_audio):
        self.queued_input_audio = input_audio

    def process_event(self, event, *args):
        event_processor = self.EventProcessors.get(event['type'])
        if not event_processor:
            raise Exception(f"Missing conversation event processor for {event['type']}")
        return event_processor(self, event, *args)

    def get_item(self, id):
        return self.item_lookup.get(id)

    def get_items(self):
        return self.items[:]

    def _process_item_created(self, event):
        item = event['item']
        new_item = item.copy()
        if new_item['id'] not in self.item_lookup:
            self.item_lookup[new_item['id']] = new_item
            self.items.append(new_item)
        new_item['formatted'] = {
            'audio': [],
            'text': '',
            'transcript': ''
        }
        if new_item['id'] in self.queued_speech_items:
            new_item['formatted']['audio'] = self.queued_speech_items[new_item['id']]['audio']
            del self.queued_speech_items[new_item['id']]
        if 'content' in new_item:
            text_content = [c for c in new_item['content'] if c['type'] in ['text', 'input_text']]
            for content in text_content:
                new_item['formatted']['text'] += content['text']
        if new_item['id'] in self.queued_transcript_items:
            new_item['formatted']['transcript'] = self.queued_transcript_items[new_item['id']]['transcript']
            del self.queued_transcript_items[new_item['id']]
        if new_item['type'] == 'message':
            if new_item['role'] == 'user':
                new_item['status'] = 'completed'
                if self.queued_input_audio:
                    new_item['formatted']['audio'] = self.queued_input_audio
                    self.queued_input_audio = None
            else:
                new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call':
            new_item['formatted']['tool'] = {
                'type': 'function',
                'name': new_item['name'],
                'call_id': new_item['call_id'],
                'arguments': ''
            }
            new_item['status'] = 'in_progress'
        elif new_item['type'] == 'function_call_output':
            new_item['status'] = 'completed'
            new_item['formatted']['output'] = new_item['output']
        return new_item, None

    def _process_item_truncated(self, event):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.truncated: Item "{item_id}" not found')
        end_index = (audio_end_ms * self.default_frequency) // 1000
        item['formatted']['transcript'] = ''
        item['formatted']['audio'] = item['formatted']['audio'][:end_index]
        return item, None

    def _process_item_deleted(self, event):
        item_id = event['item_id']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'item.deleted: Item "{item_id}" not found')
        del self.item_lookup[item['id']]
        self.items.remove(item)
        return item, None

    def _process_input_audio_transcription_completed(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        transcript = event['transcript']
        formatted_transcript = transcript or ' '
        item = self.item_lookup.get(item_id)
        if not item:
            self.queued_transcript_items[item_id] = {'transcript': formatted_transcript}
            return None, None
        item['content'][content_index]['transcript'] = transcript
        item['formatted']['transcript'] = formatted_transcript
        return item, {'transcript': transcript}

    def _process_speech_started(self, event):
        item_id = event['item_id']
        audio_start_ms = event['audio_start_ms']
        self.queued_speech_items[item_id] = {'audio_start_ms': audio_start_ms}
        return None, None

    def _process_speech_stopped(self, event, input_audio_buffer):
        item_id = event['item_id']
        audio_end_ms = event['audio_end_ms']
        speech = self.queued_speech_items[item_id]
        speech['audio_end_ms'] = audio_end_ms
        if input_audio_buffer:
            start_index = (speech['audio_start_ms'] * self.default_frequency) // 1000
            end_index = (speech['audio_end_ms'] * self.default_frequency) // 1000
            speech['audio'] = input_audio_buffer[start_index:end_index]
        return None, None

    def _process_response_created(self, event):
        response = event['response']
        if response['id'] not in self.response_lookup:
            self.response_lookup[response['id']] = response
            self.responses.append(response)
        return None, None

    def _process_output_item_added(self, event):
        response_id = event['response_id']
        item = event['item']
        response = self.response_lookup.get(response_id)
        if not response:
            raise Exception(f'response.output_item.added: Response "{response_id}" not found')
        response['output'].append(item['id'])
        return None, None

    def _process_output_item_done(self, event):
        item = event['item']
        if not item:
            raise Exception('response.output_item.done: Missing "item"')
        found_item = self.item_lookup.get(item['id'])
        if not found_item:
            raise Exception(f'response.output_item.done: Item "{item["id"]}" not found')
        found_item['status'] = item['status']
        return found_item, None

    def _process_content_part_added(self, event):
        item_id = event['item_id']
        part = event['part']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.content_part.added: Item "{item_id}" not found')
        item['content'].append(part)
        return item, None

    def _process_audio_transcript_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.audio_transcript.delta: Item "{item_id}" not found')
        item['content'][content_index]['transcript'] += delta
        item['formatted']['transcript'] += delta
        return item, {'transcript': delta}

    def _process_audio_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            logger.debug(f'response.audio.delta: Item "{item_id}" not found')
            return None, None
        array_buffer = base64_to_array_buffer(delta)
        append_values = array_buffer.tobytes()
        # TODO: make it work
        # item['formatted']['audio'] = merge_int16_arrays(item['formatted']['audio'], append_values)
        return item, {'audio': append_values}

    def _process_text_delta(self, event):
        item_id = event['item_id']
        content_index = event['content_index']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.text.delta: Item "{item_id}" not found')
        item['content'][content_index]['text'] += delta
        item['formatted']['text'] += delta
        return item, {'text': delta}

    def _process_function_call_arguments_delta(self, event):
        item_id = event['item_id']
        delta = event['delta']
        item = self.item_lookup.get(item_id)
        if not item:
            raise Exception(f'response.function_call_arguments.delta: Item "{item_id}" not found')
        item['arguments'] += delta
        item['formatted']['tool']['arguments'] += delta
        return item, {'arguments': delta}

 

 

 

 

  • RealtimeClient Class:
    • Initialization: Sets up system prompts, session configurations, and initializes RealtimeAPI and RealtimeConversation for managing WebSocket connections and conversation events.
    • Connection Management: Handles connecting and disconnecting from the server, waiting for session creation, and updating session settings.
    • Event Handling: Listens for and processes server and client events, dispatching them to appropriate handlers.
      Conversation Management: Manages creation, updating, and deletion of conversation items, including handling input audio and speech events.
    • Tool and Response Management: Supports adding/removing tools, invoking them based on events, sending user messages, creating responses, and managing audio content.

 

 

 

 

class RealtimeClient(RealtimeEventHandler):
    def __init__(self, system_prompt: str):
        super().__init__()
        self.system_prompt = system_prompt
        self.default_session_config = {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": "shimmer",
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": { "model": 'whisper-1' },
            "turn_detection": { "type": 'server_vad' },
            "tools": [],
            "tool_choice": "auto",
            "temperature": 0.8,
            "max_response_output_tokens": 4096,
        }
        self.session_config = {}
        self.transcription_models = [{"model": "whisper-1"}]
        self.default_server_vad_config = {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 200,
        }
        self.realtime = RealtimeAPI()
        self.conversation = RealtimeConversation()
        self._reset_config()
        self._add_api_event_handlers()
        
    def _reset_config(self):
        self.session_created = False
        self.tools = {}
        self.session_config = self.default_session_config.copy()
        self.input_audio_buffer = bytearray()
        return True

    def _add_api_event_handlers(self):
        self.realtime.on("client.*", self._log_event)
        self.realtime.on("server.*", self._log_event)
        self.realtime.on("server.session.created", self._on_session_created)
        self.realtime.on("server.response.created", self._process_event)
        self.realtime.on("server.response.output_item.added", self._process_event)
        self.realtime.on("server.response.content_part.added", self._process_event)
        self.realtime.on("server.input_audio_buffer.speech_started", self._on_speech_started)
        self.realtime.on("server.input_audio_buffer.speech_stopped", self._on_speech_stopped)
        self.realtime.on("server.conversation.item.created", self._on_item_created)
        self.realtime.on("server.conversation.item.truncated", self._process_event)
        self.realtime.on("server.conversation.item.deleted", self._process_event)
        self.realtime.on("server.conversation.item.input_audio_transcription.completed", self._process_event)
        self.realtime.on("server.response.audio_transcript.delta", self._process_event)
        self.realtime.on("server.response.audio.delta", self._process_event)
        self.realtime.on("server.response.text.delta", self._process_event)
        self.realtime.on("server.response.function_call_arguments.delta", self._process_event)
        self.realtime.on("server.response.output_item.done", self._on_output_item_done)

    def _log_event(self, event):
        realtime_event = {
            "time": datetime.utcnow().isoformat(),
            "source": "client" if event["type"].startswith("client.") else "server",
            "event": event,
        }
        self.dispatch("realtime.event", realtime_event)

    def _on_session_created(self, event):
        self.session_created = True

    def _process_event(self, event, *args):
        item, delta = self.conversation.process_event(event, *args)
        if item:
            self.dispatch("conversation.updated", {"item": item, "delta": delta})
        return item, delta

    def _on_speech_started(self, event):
        self._process_event(event)
        self.dispatch("conversation.interrupted", event)

    def _on_speech_stopped(self, event):
        self._process_event(event, self.input_audio_buffer)

    def _on_item_created(self, event):
        item, delta = self._process_event(event)
        self.dispatch("conversation.item.appended", {"item": item})
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})

    async def _on_output_item_done(self, event):
        item, delta = self._process_event(event)
        if item and item["status"] == "completed":
            self.dispatch("conversation.item.completed", {"item": item})
        if item and item.get("formatted", {}).get("tool"):
            await self._call_tool(item["formatted"]["tool"])

    async def _call_tool(self, tool):
        try:
            print(tool["arguments"])
            json_arguments = json.loads(tool["arguments"])
            tool_config = self.tools.get(tool["name"])
            if not tool_config:
                raise Exception(f'Tool "{tool["name"]}" has not been added')
            result = await tool_config["handler"](**json_arguments)
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps(result),
                }
            })
        except Exception as e:
            logger.error(traceback.format_exc())
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "function_call_output",
                    "call_id": tool["call_id"],
                    "output": json.dumps({"error": str(e)}),
                }
            })
        await self.create_response()

    def is_connected(self):
        return self.realtime.is_connected()

    def reset(self):
        self.disconnect()
        self.realtime.clear_event_handlers()
        self._reset_config()
        self._add_api_event_handlers()
        return True

    async def connect(self):
        if self.is_connected():
            raise Exception("Already connected, use .disconnect() first")
        await self.realtime.connect()
        await self.update_session()
        return True

    async def wait_for_session_created(self):
        if not self.is_connected():
            raise Exception("Not connected, use .connect() first")
        while not self.session_created:
            await asyncio.sleep(0.001)
        return True

    async def disconnect(self):
        self.session_created = False
        self.conversation.clear()
        if self.realtime.is_connected():
            await self.realtime.disconnect()

    def get_turn_detection_type(self):
        return self.session_config.get("turn_detection", {}).get("type")

    async def add_tool(self, definition, handler):
        if not definition.get("name"):
            raise Exception("Missing tool name in definition")
        name = definition["name"]
        if name in self.tools:
            raise Exception(f'Tool "{name}" already added. Please use .removeTool("{name}") before trying to add again.')
        if not callable(handler):
            raise Exception(f'Tool "{name}" handler must be a function')
        self.tools[name] = {"definition": definition, "handler": handler}
        await self.update_session()
        return self.tools[name]

    def remove_tool(self, name):
        if name not in self.tools:
            raise Exception(f'Tool "{name}" does not exist, can not be removed.')
        del self.tools[name]
        return True

    async def delete_item(self, id):
        await self.realtime.send("conversation.item.delete", {"item_id": id})
        return True

    async def update_session(self, **kwargs):
        self.session_config.update(kwargs)
        use_tools = [
            {**tool_definition, "type": "function"}
            for tool_definition in self.session_config.get("tools", [])
        ] + [
            {**self.tools[key]["definition"], "type": "function"}
            for key in self.tools
        ]
        session = {**self.session_config, "tools": use_tools}
        if self.realtime.is_connected():
            await self.realtime.send("session.update", {"session": session})
        return True
    
    async def create_conversation_item(self, item):
        await self.realtime.send("conversation.item.create", {
            "item": item
        })

    async def send_user_message_content(self, content=[]):
        if content:
            for c in content:
                if c["type"] == "input_audio":
                    if isinstance(c["audio"], (bytes, bytearray)):
                        c["audio"] = array_buffer_to_base64(c["audio"])
            await self.realtime.send("conversation.item.create", {
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": content,
                }
            })
        await self.create_response()
        return True

    async def append_input_audio(self, array_buffer):
        if len(array_buffer) > 0:
            await self.realtime.send("input_audio_buffer.append", {
                "audio": array_buffer_to_base64(np.array(array_buffer)),
            })
            self.input_audio_buffer.extend(array_buffer)
        return True

    async def create_response(self):
        if self.get_turn_detection_type() is None and len(self.input_audio_buffer) > 0:
            await self.realtime.send("input_audio_buffer.commit")
            self.conversation.queue_input_audio(self.input_audio_buffer)
            self.input_audio_buffer = bytearray()
        await self.realtime.send("response.create")
        return True

    async def cancel_response(self, id=None, sample_count=0):
        if not id:
            await self.realtime.send("response.cancel")
            return {"item": None}
        else:
            item = self.conversation.get_item(id)
            if not item:
                raise Exception(f'Could not find item "{id}"')
            if item["type"] != "message":
                raise Exception('Can only cancelResponse messages with type "message"')
            if item["role"] != "assistant":
                raise Exception('Can only cancelResponse messages with role "assistant"')
            await self.realtime.send("response.cancel")
            audio_index = next((i for i, c in enumerate(item["content"]) if c["type"] == "audio"), -1)
            if audio_index == -1:
                raise Exception("Could not find audio on item to cancel")
            await self.realtime.send("conversation.item.truncate", {
                "item_id": id,
                "content_index": audio_index,
                "audio_end_ms": int((sample_count / self.conversation.default_frequency) * 1000),
            })
            return {"item": item}

    async def wait_for_next_item(self):
        event = await self.wait_for_next("conversation.item.appended")
        return {"item": event["item"]}

    async def wait_for_next_completed_item(self):
        event = await self.wait_for_next("conversation.item.completed")
        return {"item": event["item"]}

 

 

 

 

Adding Tools and Handlers

 
Your voice bot's functionality can be extended by integrating various tools and handlers. These allow the bot to perform specific actions based on user inputs.

  1. Define Tool Definitions:
    • In tool.py, define the capabilities of your bot, such as checking order statuses, processing returns, or updating account information.
    • Each tool includes a name, description, and required parameters.
  2. Implement Handlers:
    • Create asynchronous handler functions for each tool to execute the desired actions.
    • These handlers interact with your backend systems or databases to fulfill user requests.
  3. Integrate Tools with the Realtime Client:
    • Register each tool and its handler with the RealtimeClient in your app.py file.
    • Ensure that the bot can invoke these tools dynamically during conversations.


Key Components:

  • Tool Definitions:
    • Structured descriptions of each tool, including the required parameters and functionalities.

Example:

 

 

 

 

# Function Definitions
check_order_status_def = {
    "name": "check_order_status",
    "description": "Check the status of a customer's order",
    "parameters": {
      "type": "object",
      "properties": {
        "customer_id": {
          "type": "string",
          "description": "The unique identifier for the customer"
        },
        "order_id": {
          "type": "string",
          "description": "The unique identifier for the order"
        }
      },
      "required": ["customer_id", "order_id"]
    }
}

 

 

 

 

  • Handler Functions:
    • Asynchronous functions that execute the logic for each tool.
    • Interact with external systems, databases, or perform specific actions based on user requests

Example:

 

 

 

 

async def check_order_status_handler(customer_id, order_id):
    status = "In Transit"
    
    # Your Business Logic
    estimated_delivery, status, order_date =  fetch_order_details(order_id, customer_id)
    # Read the HTML template
    with open('order_status_template.html', 'r') as file:
        html_content = file.read()

    # Replace placeholders with actual data
    html_content = html_content.format(
        order_id=order_id,
        customer_id=customer_id,
        order_date=order_date.strftime("%B %d, %Y"),
        estimated_delivery=estimated_delivery.strftime("%B %d, %Y"),
        status=status
    )

    # Return the Chainlit message with HTML content
    await cl.Message(content=f"Here is the detail of your order \n {html_content}").send()
    return f"Order {order_id} status for customer {customer_id}: {status}"
  

 

 

 

 

Reference:

 

Integrating with Your Application

 
With the Realtime Client and tools in place, it's time to weave everything into your application.

  1. Initialize OpenAI Realtime:
    • In app.py, set up the connection to the GPT-4o Realtime API using your system prompt and session configurations.
    • Manage user sessions and track interactions seamlessly.
  2. Handle User Interactions:
    • Implement event handlers for chat initiation, message reception, audio processing, and session termination.
    • Ensure that user inputs, whether text or voice, are appropriately processed and responded to in real-time.
  3. Manage Conversation Flow:
    • Utilize the RealtimeConversation class to handle conversation states, manage audio streams, and maintain context.
    • Implement logic to handle interruptions, cancellations, and dynamic responses based on user actions.


Key Components:

  • Initialization:
    • Sets up the OpenAI Realtime Client with the system prompt and configures tools.

 

 

 

 

system_prompt = """Provide helpful and empathetic support responses to customer inquiries for ShopMe in Hindi language, addressing their requests, concerns, or feedback professionally.

Maintain a friendly and service-oriented tone throughout the interaction to ensure a positive customer experience.

# Steps

1. **Identify the Issue:** Carefully read the customer's inquiry to understand the problem or question they are presenting.
2. **Gather Relevant Information:** Check for any additional data needed, such as order numbers or account details, while ensuring the privacy and security of the customer's information.
3. **Formulate a Response:** Develop a solution or informative response based on the understanding of the issue. The response should be clear, concise, and address all parts of the customer's concern.
4. **Offer Further Assistance:** Invite the customer to reach out again if they need more help or have additional questions.
5. **Close Politely:** End the conversation with a polite closing statement that reinforces the service commitment of ShopMe.

# Output Format

Provide a clear and concise paragraph addressing the customer's inquiry, including:
- Acknowledgment of their concern
- Suggested solution or response
- Offer for further assistance
- Polite closing

# Notes
- Greet user with Welcome to ShopMe For the first time only
- always speak in Hindi
- Ensure all customer data is handled according to relevant privacy and data protection laws and ShopMe's privacy policy.
- In cases of high sensitivity or complexity, escalate the issue to a human customer support agent.
- Keep responses within a reasonable length to ensure they are easy to read and understand."""

 

 

 

 

 

  • Event Handlers:
    • Manages chat start, message reception, audio streaming, and session termination events.

First we will i will initialize the real time client discussed before

 

 

 

 

async def setup_openai_realtime(system_prompt: str):
    """Instantiate and configure the OpenAI Realtime Client"""
    openai_realtime = RealtimeClient(system_prompt = system_prompt)
    cl.user_session.set("track_id", str(uuid4()))
    async def handle_conversation_updated(event):
        item = event.get("item")
        delta = event.get("delta")
        """Currently used to stream audio back to the client."""
        if delta:
            # Only one of the following will be populated for any given event
            if 'audio' in delta:
                audio = delta['audio']  # Int16Array, audio added
                await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
            if 'transcript' in delta:
                transcript = delta['transcript']  # string, transcript added
                pass
            if 'arguments' in delta:
                arguments = delta['arguments']  # string, function arguments added
                pass
            
    async def handle_item_completed(item):
        """Used to populate the chat context with transcription once an item is completed."""
        # print(item) # TODO
        pass
    
    async def handle_conversation_interrupt(event):
        """Used to cancel the client previous audio playback."""
        cl.user_session.set("track_id", str(uuid4()))
        await cl.context.emitter.send_audio_interrupt()
        
    async def handle_error(event):
        logger.error(event)

 

 

 

 

 

  • Session Management:
    • Maintains user sessions, handles conversation interruptions, and ensures smooth interaction flow. As you see in the below code the idea is whenever you a receive an audio chunk you should call the real time client with the audio chunk.

 

 

 

if openai_realtime:            
    if openai_realtime.is_connected():
        await openai_realtime.append_input_audio(chunk.data)
    else:
        logger.info("RealtimeClient is not connected")

 

 

 

Reference: app.py 

Testing and Deployment

 
Once your voice bot is built, thorough testing is essential to ensure reliability and user satisfaction.

  1. Local Testing:
    • Use the AI Studio Real-time audio playground to interact with your deployed model.
    • Test various functionalities, including speech recognition, response generation, and tool execution.
  2. Integration Testing:
    • Ensure that your application seamlessly communicates with the Realtime API.
    • Test the event handlers and tool integrations to verify correct behavior under different scenarios.
  3. Deployment:
    • Deploy your application to a production environment, leveraging cloud services for scalability.
    • Monitor performance and make adjustments as needed to handle real-world usage.

 

Conclusion

 
Building a real-time voice bot has never been more accessible, thanks to the GPT-4o Realtime API. By consolidating speech-to-speech functionalities into a single, efficient interface, developers can craft engaging and natural conversational experiences without the complexity of managing multiple models. Whether you're enhancing customer support, developing educational tools, or creating interactive applications, the GPT-4o Realtime API provides a robust foundation to bring your voice bot visions to life. 

Embark on your development journey today and explore the endless possibilities that real-time voice interactions can offer your users!

 
Feel free to refer to the Azure OpenAI GPT-4o Realtime API documentation for more detailed information on setup, deployment, and advanced configurations.

 

Github Link: https://github.com/monuminu/AOAI_Samples/tree/main/realtime-assistant-support

 

Thanks

Manoranjan Rajguru

https://www.linkedin.com/in/manoranjan-rajguru/

Updated Oct 14, 2024
Version 7.0