Unlike the chat completions API endpoints in GPT-4o, working with the Realtime API is more complex. It involves an intermediate layer that uses WebSockets to connect to the API, and there are various events to manage for building an end-to-end application experience. This post captures these details.
This post is organized into sections that cover how to:
- Connect to the Realtime API
- Handle audio conversations
- Handle text conversations
- Handle tool calling
The sample web application is built using Chainlit.
Connecting to the Realtime API
Refer to the code snippet below to establish a WebSocket connection to the Server (API). After establishing that:
1. Implement the receive function to accept responses from the Server. It is used to handle the response content from the server, be it audio or text. More details on this function are provided later in the post, under each section.
url = f"{base_url}openai/realtime?api-version={api_version}&deployment={model_name}&api-key={api_key}"
async def connect(self):
"""Connects the client using a WS Connection to the Realtime API."""
if self.is_connected():
# raise Exception("Already connected")
self.log("Already connected")
self.ws = await websockets.connect(
url,
additional_headers={
"Authorization": f"Bearer {api_key}",
"OpenAI-Beta": "realtime=v1",
},
)
print(f"Connected to realtime API....")
asyncio.create_task(self.receive())
await self.update_session()
2. Send a client event - update session, to set session level configurations like the system prompt the model should use, the choice of using text or speech or both during the conversation, the neural voice to use in the response, and so forth.
self.system_prompt = system_prompt
self.event_handlers = defaultdict(list)
self.session_config = {
"modalities": ["text", "audio"],
"instructions": self.system_prompt,
"voice": "shimmer",
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {"model": "whisper-1"},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
# "create_response": True, ## do not enable this attribute, since it prevents function calls from being detected
},
"tools": tools_list,
"tool_choice": "auto",
"temperature": 0.8,
"max_response_output_tokens": 4096,
}
Handling audio conversation
1. Capture user voice input
Chainlit provides events to capture the user voice input from the microphone.
.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RTWSClient = cl.user_session.get("openai_realtime")
if openai_realtime:
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
print("RealtimeClient is not connected")
2. Process the user voice input
a) Convert the audio input received in the previous step to a base64 encoded string. Send the Client event input_audio_buffer.append to the Server, with this audio payload.
async def append_input_audio(self, array_buffer):
# Check if the array buffer is not empty and send the audio data to the input buffer
if len(array_buffer) > 0:
await self.send(
"input_audio_buffer.append",
{
"audio": array_buffer_to_base64(np.array(array_buffer)),
},
)
b) Once the Server is done receiving the audio chunks, it sends an
input_audio_buffer.committed event. Once this event is picked up in the receive function,
c) send a Client Event response.create to the Server to elicit a response.
async def receive(self):
async for message in self.ws:
event = json.loads(message)
................................
elif event["type"] == "input_audio_buffer.committed":
# user has stopped speaking. The audio delta input from the user captured till now should now be processed by the server.
# Hence we need to send a 'response.create' event to signal the server to respond
await self.send("response.create", {"response": self.response_config})
.................................
3. Receiving the response audio
Once the response audio events start flowing in from the server:
- Handle the Server event response.audio.delta, by converting the audio chunks from a base64 encoded string to bytes.
- Relay this to the UI to play the audio chunks over the speaker. The dispatch function is used to raise this event (see snippet below).
async def receive(self):
async for message in self.ws:
event = json.loads(message)
............................
if event["type"] == "response.audio.delta":
# response audio delta events received from server that need to be relayed
# to the UI for playback
delta = event["delta"]
array_buffer = base64_to_array_buffer(delta)
append_values = array_buffer.tobytes()
_event = {"audio": append_values}
# send event to chainlit UI to play this audio
self.dispatch("conversation.updated", _event)
elif event["type"] == "response.audio.done":
# server has finished sending back the audio response to the user query
# let the chainlit UI know that the response audio has been completely received
self.dispatch("conversation.updated", event)
..........................
Play the received audio chunks
The Chainlit UI then plays this audio out over the speaker.
async def handle_conversation_updated(event):
"""Used to play the response audio chunks as they are received from the server."""
_audio = event.get("audio")
if _audio:
await cl.context.emitter.send_audio_chunk(
cl.OutputAudioChunk(
mimeType="pcm16", data=_audio, track=cl.user_session.get("track_id")
)
)
Handling text conversation
1. Capture user text input
Apart from handling audio conversation, we can handle the associated transcripts from the audio response, so that the user can have a 'multi modal' way of interacting with the AI Assistant.
Chainlit provides events to capture the user input from the chat interface
.on_message
async def on_message(message: cl.Message):
openai_realtime: RTWSClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.send_user_message_content(
[{"type": "input_text", "text": message.content}]
)
else:
await cl.Message(
content="Please activate voice mode before sending messages!"
).send()
2. Process the user text input
With the user text input received above:
1. Send a Client Event conversation.item.create to the Server with the user text input in the payload.
2. Follow that up with a Client Event response.create event to the Server to elicit a response.
3. Raise a custom event 'conversation.interrupted' to the UI so that it can stop playing any audio response from the previous user query.
async def send_user_message_content(self, content=[]):
if content:
await self.send(
"conversation.item.create",
{
"item": {
"type": "message",
"role": "user",
"content": content,
}
},
)
# this is the trigger to the server to start responding to the user query
await self.send("response.create", {"response": self.response_config})
# raise this event to the UI to pause the audio playback, in case it is doing so already,
# when the user submits a query in the chat interface
_event = {"type": "conversation_interrupted"}
# signal the UI to stop playing audio
self.dispatch("conversation.interrupted", _event)
3. Receiving the text response
- Use the Server Event response.audio_transcript.delta to get the stream of the text data response. This is a transcription of what is already playing as audio on the UI.
- Relay this data to the UI through a custom event, to populate the chat conversation.
- The response text gets streamed and displayed in the Chainlit UI.
async def receive(self):
async for message in self.ws:
..................................
elif event["type"] == "response.audio_transcript.delta":
# this event is received when the transcript of the server's audio response to the user has started to come in.
# send this to the UI to display the transcript in the chat window, even as the audio of the response gets played
delta = event["delta"]
item_id = event["item_id"]
_event = {"transcript": delta, "item_id": item_id}
# signal the UI to display the transcript of the response audio in the chat window
self.dispatch("conversation.text.delta", _event)
elif (
event["type"] == "conversation.item.input_audio_transcription.completed"
):
...............................
Handling Tool calling
As a part of the Session Update event discussed earlier, we pass a payload of the tools (functions) that this Assistant has access to. In this application, I am using a search function implemented using Tavily.
self.session_config = {
"modalities": ["text", "audio"],
"instructions": self.system_prompt,
"voice": "shimmer",
.....................
"tools": tools_list,
"tool_choice": "auto",
"temperature": 0.8,
"max_response_output_tokens": 4096,
}
The function definition and implementation used in this sample application:
tools_list = [
{
"type": "function",
"name": "search_function",
"description": "call this function to bring upto date information on the user's query when it pertains to current affairs",
"parameters": {
"type": "object",
"properties": {"search_term": {"type": "string"}},
"required": ["search_term"],
},
}
]
# Function to perform search using Tavily
def search_function(search_term: str):
print("performing search for the user query > ", search_term)
return TavilySearchResults().invoke(search_term)
available_functions = {"search_function": search_function}
Handling the response from tool calling
When a user request entails a function call, the Server Event response.done does not return an audio. It instead returns the functions that match the intent, along with the arguments to invoke it. In the 'receive' function, check for function call hints in the response.
- Get the function name and arguments from the response
- Invoke the function and get the response
- Send Client Event conversation.item.create to the server with the function call output
- Follow that up with Client Event response.create to elicit a response from the Server that will then be played out as audio and text.
async def receive(self):
async for message in self.ws:
...........................................................
elif event["type"] == "response.done":
...........................................
if "function_call" == output_type:
function_name = (
event.get("response", {})
.get("output", [{}])[0]
.get("name", None)
)
arguments = json.loads(
event.get("response", {})
.get("output", [{}])[0]
.get("arguments", None)
)
tool_call_id = (
event.get("response", {})
.get("output", [{}])[0]
.get("call_id", None)
)
function_to_call = available_functions[function_name]
# invoke the function with the arguments and get the response
response = function_to_call(**arguments)
print(
f"called function {function_name}, and the response is:",
response,
)
# send the function call response to the server(model)
await self.send(
"conversation.item.create",
{
"item": {
"type": "function_call_output",
"call_id": tool_call_id,
"output": json.dumps(response),
}
},
)
# signal the model(server) to generate a response based on the function call output sent to it
await self.send(
"response.create", {"response": self.response_config}
)
...............................................
Reference links:
Watch a short video of this sample application here
The Documentation on the Realtime API is available here
The GitHub Repo for the application in this post is available here