New AutoGen Core provides a streamlined approach to developing event-driven, distributed, scalable, and resilient AI agent systems. This guide will walk you through creating your first multi agent workflow, structuring data models, and expanding your solution to include multiple agents
Introduction
Modern travellers expect quick and personalized assistance when planning their journeys. This involves booking flights, hotels, car rentals, and even discovering activities at their destination. Traditionally, this requires multiple searches and interactions with different platforms, which can be cumbersome and time-consuming. To streamline this process, a multi-agent chatbot system is designed to handle a comprehensive travel planning request. The goal is to manage different components of a travel itinerary through specialized agents that work together seamlessly, ensuring that each part of the journey is well-coordinated and efficiently executed.
Problem Statement
Imagine a user planning a complex trip—they need to book a flight, find a hotel, rent a car, and look for activities to enjoy during their stay. Handling all these tasks in a cohesive and intelligent manner requires a system that can:
- Understand user requests (e.g., greetings, specific travel details).
- Identify the correct components of the plan (e.g., hotel booking, flight search).
- Coordinate multiple tasks that need to happen in parallel or sequentially.
- Aggregate all the gathered information into a cohesive travel plan.
The challenge is to build a system that effectively routes each user query to the correct service, maintains session state, and provides a streamlined response to the user without requiring manual intervention. This is where the concept of multiple specialized agents comes in.
In this guide, we will build a travel chatbot that can help users with questions about destinations, travel planning, and booking cars, flights, and hotels.
Topics covered:
- Setting up the foundation (data types, user input handling)
- Creating and integrating agents
- Using structured outputs and LLMs for enhanced responses
- Managing communication (publish vs. send)
- Implementing routing and handling complex tasks
- Expanding with specialized agents
- Logging, error handling, and deployment
- Integration to Teams
Layout and Role of Agents
- UserProxyAgent: This agent serves as the main interface between the user and the rest of the system. It receives messages from the user and sends responses back. It ensures that the user's requests are correctly routed and that the responses from different agents are delivered back to the user via WebSocket connections.
- Planner Agent (Router): This agent is the brain that interprets the user's initial message and decides on the next steps. It is responsible for creating a TravelPlan that outlines which subtasks need to be handled and whether those tasks can be assigned to individual agents or require coordination through the GroupChatManager. If the user's message is just a greeting, this agent responds immediately to acknowledge and engage the user.
- GroupChatManager: This agent plays a vital role when a travel plan contains multiple subtasks that need to be handled by different specialized agents. The GroupChatManager is responsible for coordinating these subtasks—sending specific requests to the appropriate agents and aggregating the responses into a cohesive final travel plan that can be sent back to the user.
- Specialized Agents (Flight, Hotel, Car Rental, Activities): These agents are responsible for executing specific parts of the travel plan:
- FlightAgent handles flight bookings based on the provided travel dates and destinations.
- HotelAgent takes care of hotel reservations, providing the best options for accommodations.
- CarRentalAgent helps the user rent a car if needed for their trip.
- ActivitiesAgent suggests activities based on the user's destination and interests.
- TravelPlan and Routing Decisions: When the user provides a complex request, the Planner Agent creates a TravelPlan detailing all required subtasks (e.g., booking a hotel, reserving flights, etc.). If there is only one subtask, it is directly assigned to the corresponding agent. However, if multiple subtasks are present, they are routed to the GroupChatManager for effective coordination.
High level overview of the flow
Create Initial Data Types for Interaction
Define data types for agent communication, such as `EndUserMessage` for taking input messages from the user and `AgentStructuredResponse` for structured responses. Defining data types early is crucial for scalability and maintainability, as it helps in standardizing communication between agents, making the system easier to extend and modify as requirements evolve. Defining data types helps in routing messages between different agents and building logic.
from pydantic import BaseModel
class EndUserMessage(BaseModel):
content: str
source: str
class AgentStructuredResponse(BaseModel):
agent_type: str
data: dict
message: str
Set Up a User Agent to Accept Input
Create a user proxy agent that interfaces with users and relays input to other agents. See the below code where we have `handle_user_message` that will be execute everytime an `EndUserMessage` is available. `UserProxyAgent` is subclass of `RoutedAgent` facilitates communication between agents, enabling seamless routing of messages. This modular approach enhances the architecture by allowing agents to focus on their specific tasks while easily interacting with other agents e.g you can have different message processing depending on the type of message:
from autogen_core.components import RoutedAgent, message_handler
class UserProxyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("UserProxyAgent")
@message_handler
async def handle_user_message(self, message: EndUserMessage, ctx) -> None:
await self.publish_message(message, ctx.topic_id)
Agents in AutoGen Core communicate exclusively through serializable messages, which can be defined using Pydantic's BaseModel or dataclasses. Message handlers process these messages, and the RoutedAgent class simplifies routing messages by type. Direct messaging allows request/response interactions, while broadcasting publishes messages to topics without expecting responses. Refer to this link for a more detailed overview
Below is an example of how we are leveraging send_message to call agents We leverage this in travel_group_chat.py to consolidate messages from different agents into a single response
# You can send message to a specific topic like below
self.send_message(
TravelRequest(
source="GroupChatManager",
content=task.task_details,
original_task=message.main_task,
),
AgentId(type=task.assigned_agent, key=self._session_id),
)
Create Your First Agent
Lets create destination agent that will send info about destination as a response and as an added bonus we will have the output in a structure format so we can easily develop frontend to display the info as a card or other format than just plain text.
GPT-4o 2024-08-06 model supports structured output (and it is different to JSON output. Refer to link for more details) and achieves 100% on the JSON schema. No more parsing json error :)
Using structured outputs simplifies downstream processing and in this case, we can use the data types we to ensure we have a consistent output for downstream processing or to send to frontend to display. e.g. Define responses for travel bookings using Pydantic models:
We are using Azure Open AI Gpt-4o model that supports structured output and we want to listen only the topic_type `destination_info`
@type_subscription(topic_type="destination_info")
class DestinationAgent(RoutedAgent):
def __init__(self, model_client: AzureOpenAIChatCompletionClient) -> None:
super().__init__("DestinationAgent")
self._system_messages: List[LLMMessage] = [
SystemMessage("You are a helpful AI assistant that helps with destination information.")
]
self._model_client = model_client
@message_handler
async def handle_message(self, message: EndUserMessage, ctx: MessageContext) -> None:
response_content = await self._model_client.create(
[
UserMessage(
content=f"Provide info for {message.content}",
source="DestinationAgent",
)
],
extra_create_args={"response_format": DestinationInfo},
)
destination_info_structured = DestinationInfo.model_validate(
json.loads(response_content.content)
)
await self.publish_message(
AgentStructuredResponse(
agent_type=self.id.type,
data=destination_info_structured,
message=message.content,
),
DefaultTopicId(type="user_proxy", source=ctx.topic_id.source),
)
Integrate Existing Agents
Autogen is extendible and supports agents built using different frameworks. Let's look at how to bring in an existing LLama Index agent that uses Wikipedia as a tool.
@type_subscription("default_agent")
class LlamaIndexAgent(RoutedAgent):
def __init__(self, llama_index_agent: AgentRunner, memory: Optional[BaseMemory] = None) -> None:
super().__init__("LlamaIndexAgent")
self._llama_index_agent = llama_index_agent
self._memory = memory
@message_handler
async def handle_user_message(self, message: EndUserMessage, ctx: MessageContext) -> None:
self._session_id = ctx.topic_id.source
# Retrieve historical messages if memory is available
history_messages: List[ChatMessage] = []
if self._memory:
history_messages = self._memory.get(input=message.content)
# Get response from LlamaIndex agent
response = await self._llama_index_agent.achat(
message=message.content,
history_messages=history_messages
)
# Store messages in memory if available
if self._memory:
self._memory.put(ChatMessage(role=MessageRole.USER, content=message.content))
self._memory.put(ChatMessage(role=MessageRole.ASSISTANT, content=response.response))
# Compile resources from response
resources = [
Resource(content=source_node.get_text(), score=source_node.score, node_id=source_node.id_)
for source_node in response.source_nodes
]
# Publish response message
await self.publish_message(
AgentStructuredResponse(
agent_type="default_agent",
data=None,
message=f"\n{response.response}\n",
),
DefaultTopicId(type="user_proxy", source=self._session_id),
)
Planning Agent
A planning agent or intent detection agent can determine user intent and respond accordingly. With structured output, you can create a list of subtasks and assign them to the appropriate agent.
Below is a sample data model you can use for planning:
# Enum to Define Agent Types
class AgentEnum(str, Enum):
FlightBooking = "flight_booking"
HotelBooking = "hotel_booking"
CarRental = "car_rental"
ActivitiesBooking = "activities_booking"
DestinationInfo = "destination_info"
DefaultAgent = "default_agent"
GroupChatManager = "group_chat_manager"
# Travel SubTask Model
class TravelSubTask(BaseModel):
task_details: str
assigned_agent: AgentEnum
class Config:
use_enum_values = True # To serialize enums as their values
# Travel Plan Model
class TravelPlan(BaseModel):
main_task: str
subtasks: List[TravelSubTask]
is_greeting: bool
Managing Simple vs. Complex Tasks
In real-world scenarios, optimizing user experience by responding quickly is crucial. For simple tasks that a single agent can handle effectively, ensure they are routed appropriately.
- Simple Task: A FlightAgent handles a specific flight query.
- Complex Task: A TravelRouterAgent delegates tasks like booking flights, hotels, and car rentals to specialized agents.
To manage this, we use a SemanticRouterAgent to route messages effectively.
@type_subscription(topic_type="router")
class SemanticRouterAgent(RoutedAgent):
def __init__(self, name: str, model_client: AzureOpenAIChatCompletionClient, agent_registry: AgentRegistry, session_manager: SessionStateManager) -> None:
super().__init__("SemanticRouterAgent")
self._name = name
self._model_client = model_client
self._registry = agent_registry
self._session_manager = session_manager
@message_handler
async def route_message(self, message: EndUserMessage, ctx: MessageContext) -> None:
session_id = ctx.topic_id.source
# Add the current message to session history
self._session_manager.add_to_history(session_id, message)
# Analyze conversation history for better context
history = self._session_manager.get_history(session_id)
travel_plan: TravelPlan = await self._get_agents_to_route(message, history)
if travel_plan.is_greeting:
logger.info("User greeting detected")
await self.publish_message(
AgentStructuredResponse(
agent_type="default_agent",
data=Greeter(
greeting="Greetings, Adventurer! 🌍 Let's get started!"
),
message=f"User greeting detected: {message.content}",
),
DefaultTopicId(type="user_proxy", source=ctx.topic_id.source),
)
return
Expanding with Additional Agents
Add more agents dedicated to tasks like flight booking, car rental, and hotel booking. Specialized agents improve system performance by distributing workload and making individual components more efficient. Defining corresponding data types for each agent facilitates better communication, making the system easier to extend and maintain.
Logging and Using the Aspire Dashboard
Use logging to trace messages and troubleshoot issues during development. Autogen provides built-in support for exporting logs to the Aspire dashboard via OpenTelemetry. Refer to the documentation for setup details link . Sample tracing
Deploying with FastAPI and Azure Container Apps
One common way to interact with chatbots is via WebSockets, which help with real-time message sending and receiving. You can create a WebSocket manager to track connections and respond to users.
class WebSocketConnectionManager:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
def add_connection(self, session_id: str, websocket: WebSocket) -> None:
self.connections[session_id] = websocket
async def handle_websocket(self, websocket: WebSocket, session_id: str):
await websocket.accept()
self.add_connection(session_id, websocket)
try:
while True:
user_message_text = await websocket.receive_text()
chat_id = str(uuid.uuid4())
user_message = EndUserMessage(content=user_message_text, source="User")
logger.info(f"Received message with chat_id: {chat_id}")
# Publish the user's message to the agent
await agent_runtime.publish_message(
user_message, DefaultTopicId(type="user_proxy", source=session_id)
)
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Exception in WebSocket connection {session_id}: {str(e)}")
connection_manager = WebSocketConnectionManager()
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for handling user chat messages.
Args:
websocket (WebSocket): The WebSocket connection.
"""
session_id = str(uuid.uuid4())
await connection_manager.handle_websocket(websocket, session_id)
A sample implementation is available at https://github.com/lordlinus/travel-chatbot
Azure Container Apps is a serverless platform designed to run containerized applications and microservices without the need to manage complex infrastructure.
The sample implementation contains Dockerfile to package the backend and run as an endpoint. Refer to this link for instructions on how to deploy fast api application using Azure Container Apps
Microsoft Teams Integration
In addition to the backend AutoGen implementation described above, this project includes a Microsoft Teams integration layer using the Bot Framework. This enables users to interact with the travel assistant directly within Teams, making it easily accessible in a familiar collaboration environment.
Teams Features:
- Real-time messaging through Bot Framework integration
- Interactive suggested actions for common queries
- Structured message formatting for travel information
- Persistent WebSocket connections for real-time responses
- Emoji support and rich text formatting
The Teams integration seamlessly connects to the AutoGen backend through WebSockets, maintaining the sophisticated agent system while providing a user-friendly interface in Teams. Users can access all travel planning features, destination information, and booking assistance without leaving their Teams workspace.
Code Overview
# 1. Teams Bot - Handles user interactions
class MyBot(ActivityHandler):
async def on_message_activity(self, turn_context: TurnContext):
# Forward to AutoGen backend via WebSocket
if self.ws_handler:
await self.ws_handler.send_message(turn_context.activity.text)
# Show interactive buttons
await turn_context.send_activity(self.get_suggested_actions())
# 2. WebSocket Handler - Manages real-time communication
class WebSocketHandler:
async def process_websocket_message(self, message: str):
message_data = json.loads(message)
formatted_text = self.message_formatter.format_message(message_data)
await self.bot_handler.send_response(formatted_text)
# 3. Message Formatter - Structures responses for Teams display
class MessageFormatter:
def format_destination_info(self, data: dict) -> str:
info = data.get('data', {})
return f"""🌏 {info.get('city')}, {info.get('country')}
📝 {info.get('description')}
⏰ Best Time: {info.get('best_time_to_visit')}"""
Hope this blog post provides an overview of how to get started with the new Autogen Core API and feel free to drop your comments below.
Link to sample code for Autogen travel chatbot: https://github.com/lordlinus/travel-chatbot
Link to Teams integration code implemeted by hieunhu: https://github.com/hieumoscow/travelautogenbf
Thanks to @Setu_Chokshi , Priya_Kedia and @Rafal_Rutyna for helping to review and refine this repo and content.
Reference:
- Core concepts - https://github.com/microsoft/autogen/tree/staging/python/packages/autogen-core/docs/src/user-guide/core-user-guide/core-concepts
- Design Patterns - https://github.com/microsoft/autogen/tree/staging/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns
- Cookbook - https://github.com/microsoft/autogen/tree/staging/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook