Blog Post

Azure AI Foundry Blog
7 MIN READ

Working with the Realtime API of gpt-4o in python

srikantan's avatar
srikantan
Icon for Microsoft rankMicrosoft
Dec 30, 2024

Unlike the chat completions API endpoints in GPT-4o, working with the Realtime API is more complex. It involves an intermediate layer that uses WebSockets to connect to the API, and there are various events to manage for building an end-to-end application experience. This post captures these details.

This post is organized into sections that cover how to:

  • Connect to the Realtime API
  • Handle audio conversations
  • Handle text conversations
  • Handle tool calling

The sample web application is built using Chainlit.

Connecting to the Realtime API

Refer to the code snippet below to establish a WebSocket connection to the Server (API). After establishing that:

1. Implement the receive function to accept responses from the Server. It is used to handle the response content from the server, be it audio or text. More details on this function are provided later in the post, under each section.

   url = f"{base_url}openai/realtime?api-version={api_version}&deployment={model_name}&api-key={api_key}"
   async def connect(self):
        """Connects the client using a WS Connection to the Realtime API."""
        if self.is_connected():
            # raise Exception("Already connected")
            self.log("Already connected")
        self.ws = await websockets.connect(
            url,
            additional_headers={
                "Authorization": f"Bearer {api_key}",
                "OpenAI-Beta": "realtime=v1",
            },
        )
        print(f"Connected to realtime API....")
        asyncio.create_task(self.receive())

        await self.update_session()

2. Send a client event - update session, to set session level configurations like the system prompt the model should use, the choice of using text or speech or both during the conversation, the neural voice to use in the response, and so forth.

        self.system_prompt = system_prompt
        self.event_handlers = defaultdict(list)
        self.session_config = {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": "shimmer",
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": {"model": "whisper-1"},
            "turn_detection": {
                "type": "server_vad",
                "threshold": 0.5,
                "prefix_padding_ms": 300,
                "silence_duration_ms": 500,
                # "create_response": True,  ## do not enable this attribute, since it prevents function calls from being detected
            },
            "tools": tools_list,
            "tool_choice": "auto",
            "temperature": 0.8,
            "max_response_output_tokens": 4096,
        }

Handling audio conversation

1. Capture user voice input

Chainlit provides events to capture the user voice input from the microphone.

.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
    openai_realtime: RTWSClient = cl.user_session.get("openai_realtime")
    if openai_realtime:
        if openai_realtime.is_connected():
            await openai_realtime.append_input_audio(chunk.data)
        else:
            print("RealtimeClient is not connected")
2. Process the user voice input

a) Convert the audio input received in the previous step to a base64 encoded string. Send the Client event input_audio_buffer.append to the Server, with this audio payload.

    async def append_input_audio(self, array_buffer):
        # Check if the array buffer is not empty and send the audio data to the input buffer
        if len(array_buffer) > 0:
            await self.send(
                "input_audio_buffer.append",
                {
                    "audio": array_buffer_to_base64(np.array(array_buffer)),
                },
            )

b) Once the Server is done receiving the audio chunks, it sends an

input_audio_buffer.committed event. Once this event is picked up in the receive function,

c) send a Client Event response.create to the Server to elicit a response.

async def receive(self):
        async for message in self.ws:
            event = json.loads(message)
      ................................
            elif event["type"] == "input_audio_buffer.committed":
                # user has stopped speaking. The audio delta input from the user captured till now should now be processed by the server.
                # Hence we need to send a 'response.create' event to signal the server to respond
                await self.send("response.create", {"response": self.response_config})
      .................................
3. Receiving the response audio

Once the response audio events start flowing in from the server:

  1. Handle the Server event response.audio.delta, by converting the audio chunks from a base64 encoded string to bytes.   
  2. Relay this to the UI to play the audio chunks over the speaker. The dispatch function is used to raise this event (see snippet below).
async def receive(self):
        async for message in self.ws:
            event = json.loads(message)
            ............................
            if event["type"] == "response.audio.delta":
                # response audio delta events received from server that need to be relayed
                # to the UI for playback
                delta = event["delta"]
                array_buffer = base64_to_array_buffer(delta)
                append_values = array_buffer.tobytes()
                _event = {"audio": append_values}
                # send event to chainlit UI to play this audio
                self.dispatch("conversation.updated", _event)
            elif event["type"] == "response.audio.done":
                # server has finished sending back the audio response to the user query
                # let the chainlit UI know that the response audio has been completely received
                self.dispatch("conversation.updated", event)
           ..........................
Play the received audio chunks

The Chainlit UI then plays this audio out over the speaker.

    async def handle_conversation_updated(event):
        """Used to play the response audio chunks as they are received from the server."""
        _audio = event.get("audio")
        if _audio:
            await cl.context.emitter.send_audio_chunk(
                cl.OutputAudioChunk(
                    mimeType="pcm16", data=_audio, track=cl.user_session.get("track_id")
                )
            )

 

Handling text conversation

1. Capture user text input

Apart from handling audio conversation, we can handle the associated transcripts from the audio response, so that the user can have a 'multi modal' way of interacting with the AI Assistant.

Chainlit provides events to capture the user input from the chat interface

.on_message
async def on_message(message: cl.Message):
    openai_realtime: RTWSClient = cl.user_session.get("openai_realtime")
    if openai_realtime and openai_realtime.is_connected():
        await openai_realtime.send_user_message_content(
            [{"type": "input_text", "text": message.content}]
        )
    else:
        await cl.Message(
            content="Please activate voice mode before sending messages!"
        ).send()
2. Process the user text input
With the user text input received above:
1. Send a Client Event conversation.item.create to the Server with the user text input in the payload.

2. Follow that up with a Client Event  response.create event to the Server to elicit a response.

3. Raise a custom event 'conversation.interrupted' to the UI so that it can stop playing any audio response from the previous user query.

    async def send_user_message_content(self, content=[]):
        if content:
            await self.send(
                "conversation.item.create",
                {
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": content,
                    }
                },
            )
            # this is the trigger to the server to start responding to the user query
            await self.send("response.create", {"response": self.response_config})
            
            # raise this event to the UI to pause the audio playback, in case it is doing so already, 
            # when the user submits a query in the chat interface
            _event = {"type": "conversation_interrupted"}
            # signal the UI to stop playing audio
            self.dispatch("conversation.interrupted", _event)
3. Receiving the text response
  1. Use the Server Event response.audio_transcript.delta to get the stream of the text data response. This is a transcription of what is already playing as audio on the UI.
  2. Relay this data to the UI through a custom event, to populate the chat conversation.
  3. The response text gets streamed and displayed in the Chainlit UI.
    async def receive(self):
        async for message in self.ws: 
..................................      
 elif event["type"] == "response.audio_transcript.delta":
                # this event is received when the transcript of the server's audio response to the user has started to come in.
                # send this to the UI to display the transcript in the chat window, even as the audio of the response gets played
                delta = event["delta"]
                item_id = event["item_id"]
                _event = {"transcript": delta, "item_id": item_id}
                # signal the UI to display the transcript of the response audio in the chat window
                self.dispatch("conversation.text.delta", _event)
            elif (
                event["type"] == "conversation.item.input_audio_transcription.completed"
            ):
...............................

Handling Tool calling

As a part of the Session Update event discussed earlier, we pass a payload of the tools (functions) that this Assistant has access to. In this application, I am using a search function implemented using Tavily.

        self.session_config = {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": "shimmer",
          .....................
            "tools": tools_list,
            "tool_choice": "auto",
            "temperature": 0.8,
            "max_response_output_tokens": 4096,
        }

The function definition and implementation used in this sample application:

tools_list = [
    {
        "type": "function",
        "name": "search_function",
        "description": "call this function to bring upto date information on the user's query when it pertains to current affairs",
        "parameters": {
            "type": "object",
            "properties": {"search_term": {"type": "string"}},
            "required": ["search_term"],
        },
    }
]


# Function to perform search using Tavily
def search_function(search_term: str):
    print("performing search for the user query > ", search_term)
    return TavilySearchResults().invoke(search_term)


available_functions = {"search_function": search_function}
Handling the response from tool calling

When a user request entails a function call, the Server Event response.done does not return an audio. It instead returns the functions that match the intent, along with the arguments to invoke it. In the 'receive' function, check for function call hints in the response.

  1. Get the function name and arguments from the response
  2. Invoke the function and get the response
  3. Send Client Event conversation.item.create to the server with the function call output
  4. Follow that up with Client Event response.create to elicit a response from the Server that will then be played out as audio and text.
    async def receive(self):
        async for message in self.ws:            
...........................................................
elif event["type"] == "response.done":
            ...........................................
                        if "function_call" == output_type:
                            function_name = (
                                event.get("response", {})
                                .get("output", [{}])[0]
                                .get("name", None)
                            )
                            arguments = json.loads(
                                event.get("response", {})
                                .get("output", [{}])[0]
                                .get("arguments", None)
                            )
                            tool_call_id = (
                                event.get("response", {})
                                .get("output", [{}])[0]
                                .get("call_id", None)
                            )

                            function_to_call = available_functions[function_name]
                            # invoke the function with the arguments and get the response
                            response = function_to_call(**arguments)
                            print(
                                f"called function {function_name}, and the response is:",
                                response,
                            )
                            # send the function call response to the server(model)
                            await self.send(
                                "conversation.item.create",
                                {
                                    "item": {
                                        "type": "function_call_output",
                                        "call_id": tool_call_id,
                                        "output": json.dumps(response),
                                    }
                                },
                            )
                            # signal the model(server) to generate a response based on the function call output sent to it
                            await self.send(
                                "response.create", {"response": self.response_config}
                            )
...............................................

 

Reference links:

Watch a short video of this sample application here

The Documentation on the Realtime API is available here

The GitHub Repo for the application in this post is available here

Updated Dec 27, 2024
Version 1.0