In our data-driven world, the ability to interact with and query your database using natural language is a game changer. This blog post explores the implementation of an Analytics Multi-Agent System, detailing how to build a SQL agent using the Semantic Kernel framework to interact with a PostgreSQL database, enhanced with data visualizations to provide clear and actionable insights.
In today’s data-driven landscape, the ability to access and interpret information in a human-readable format is increasingly valuable. Being able to interact with and query your database in natural language is the game changer. In this post, we’ll walk through how to build a SQL agent using the Semantic Kernel framework to interact with a PostgreSQL database containing DVD rental data. I’ll explain how to define Semantic Kernel functions through plugins and how to incorporate them into agents. We’ll also look at how to set up these agents and guide them with well-structured instructions.
Our example uses a PostgreSQL database that stores detailed information about DVD rentals. This sample database contains 15 tables and can be found here: PostgreSQL Sample Database. With a natural language interface, users can simply ask questions like, “What are the most rented DVDs this month?” and the agent will generate and execute the relevant SQL query. This approach allows non-technical users to quickly gain insights from their data without needing to write SQL themselves.
We’ll use Semantic Kernel agent framework for the enterprise readiness that it offers that's backed by security enhancing features, ensuring responsible AI solution at scale.
Define Semantic Kernel functions
To enable our agent to generate and execute SQL queries, we first need to define the necessary functions. The easiest way to provide a Semantic Kernal AI Agent with capabilities is to wrap native code into a plugin. You can learn more here.
In our SQL Agent we will define two plugins:
QueryPostgresPlugin
This plugin takes a SQL query as input, sanitizes it by removing unnecessary characters, and then executes it against a PostgreSQL database using the psycopg2 library. Below is an example demonstrating how this can be done.
class QueryPostgresPlugin:
"""
Plugin to query a PostgreSQL database using a SQL query.
The SQL query is provided as an input parameter.
"""
def __init__(self, connection_string: str) -> None:
self._connection_string = connection_string
@staticmethod
def __clean_sql_query__(sql_query: str) -> str:
"""Clean the SQL query to remove unnecessaryl characters."""
return sql_query.replace(";", "").replace("\n", " ")
(
name="query_postgres",
description="Query a PostgreSQL database using a SQL query and return the results as a string.")
async def query_postgres(
self,
sql_query: Annotated[str, "SQL query to be executed"]) -> Annotated[str, "The results of the SQL query as a formatted string"]:
"""
Executes the SQL query against PostgreSQL using psycopg2 and returns the results.
Args:
sql_query: The SQL query to be executed.
Returns:
A string representation of the query results or an error message.
"""
def run_query():
try:
conn = psycopg2.connect(self._connection_string)
cur = conn.cursor()
# clean the SQL query.
query = self.__clean_sql_query__(sql_query)
cur.execute(query)
# retrieve column names before fetching rows.
col_names = [desc[0] for desc in cur.description] if cur.description else []
rows = cur.fetchall()
cur.close()
conn.close()
if not rows:
return "No results found."
# convert rows to a list of dictionaries if column names are available.
results = [dict(zip(col_names, row)) for row in rows] if col_names else rows
return str(results)
except Exception as e:
return f"Error executing query: {e}"
# run the synchronous query code in a thread.
result = await asyncio.to_thread(run_query)
return result
It then returns the query results either as a formatted string or as structured data. By abstracting away the details of database connectivity and query execution, this plugin allows our agent to focus solely on generating accurate queries based on user intent.
Second plugin we define is the:
GetSchemaPlugin
Before generating any queries, the agent must first understand the underlying database structure. This plugin connects to PostgreSQL and retrieves schema details (such as table names, column names, and data types) from the information_schema.columns view. Having this context is essential for the agent to construct accurate and complex SQL queries, ensuring that all table and column references are valid.
class GetSchemaPlugin:
"""
Plugin to retrieve the schema of tables from a PostgreSQL database.
It returns table schema, table name, column name, and data type for each column.
"""
def __init__(self, connection_string: str) -> None:
self._connection_string = connection_string
(
name="get_schema",
description="Retrieves the schema of tables from the PostgreSQL database. "
"Returns table names, column names, and data types as a formatted string.")
async def get_schema(
self,
_: Annotated[str, "Unused parameter for compatibility"]
) -> Annotated[str, "The schema details as a formatted string"]:
"""
Connects to PostgreSQL using psycopg2, retrieves schema details from the information_schema.columns view,
and returns the results as a formatted string.
"""
def run_schema_query():
try:
# connect to the PostgreSQL database.
conn = psycopg2.connect(self._connection_string)
cur = conn.cursor()
# query to retrieve schema details excluding internal schemas.
query = """
SELECT table_schema, table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY table_schema, table_name, ordinal_position;
"""
cur.execute(query)
rows = cur.fetchall()
cur.close()
conn.close()
if not rows:
return "No schema information found."
# format the output.
schema_details = []
for table_schema, table_name, column_name, data_type in rows:
schema_details.append(f"{table_schema}.{table_name} - {column_name} ({data_type})")
return "\n".join(schema_details)
except Exception as e:
return f"Error retrieving schema: {e}"
result = await asyncio.to_thread(run_schema_query)
return result
Both plugins encapsulate the low-level details of database operations, enabling the agent to generate natural language-based queries that interact with the database seamlessly.
Define Semantic Kernel Agents
Once the plugins are defined, the next step is to define our agent and integrate them. There are currently three types of agents available from Semantic Kernel Agent Framework: Azure AI Agent, Chat Completion Agent and Azure Assistant Agent. Each provides distinct features tailored to different use cases. The Azure AI Agent integrates with Azure AI Agent Services, enabling end-to-end observability via the AI Foundry interface and access to a broad ecosystem of tools and connectors. The Chat Completion Agent is optimized for generating conversational responses, while the Azure Assistant Agent uses the same wire protocol as Assistant API, allowing the use of advanced capabilities such as the Code Interpreter and File Search.
In our example, we’ll use Chat Completion Agent to interact with the PostgreSQL database, and an Azure Assistant Agent to visualize the retrieved data. This setup involves three core components for each agent, along with an orchestration layer that coordinates their interaction.
- Defining the Agent
We start by creating an instance of the Chat Completion Agent using the Semantic Kernel framework. This agent is provided with detailed guidance on how to interpret user queries, when to fetch schema information, and how to construct the appropriate SQL statements in response.
agent = ChatCompletionAgent(
kernel=kernel,
name="SQLAssistantAgent",
instructions="""
You are a helpful assistant that retrieves data from a PostgreSQL database.
When a user asks for information, get schema using GetSchemaPlugin. Look for relevant tables and columns that will answer user question and generate a SQL query.
Generate SQL query you are going to execute and then use the QueryPostgresPlugin's query_postgres function.
The tables reside in the public schema of the database so use following format: public."table_name", for example: 'SELECT * FROM public."actor" LIMIT 5'.
Always return the result of the SQL query as a json.
""",
plugins=[schema_plugin, query_plugin],
)
- Specifying detailed instructions
Instructions are critical to guide the agent’s behavior, especially when generating SQL queries. We aim to provide clear guidance, including one-shot or few-shot examples if needed, as well as context about the dataset, such as descriptions of ambiguous column names or any known irregularities. Since agents are not inherently time-aware, it’s important to supply temporal context explicitly, enabling them to generate queries that reference the current date or time period when appropriate.
Clear instructions ensure that the agent understands its role and properly orchestrates the plugin calls. - Integrating the Plugins
The agent is then configured with instances of both the QueryPostgresPlugin and the GetSchemaPlugin. When the user poses a question like “How many DVDs were rented this month,” the agent first consults the schema plugin to gather information about relevant tables and columns. It then constructs and executes a corresponding SQL query using the query plugin.
Now let’s apply the same approach to the Azure Assistant Agent. We’ll create an agent equipped with the Code Interpreter tool, enabling it to generate and execute Python code for visualizing the data received from the SQL Assistant Agent. Additionally, we need to define file download functions to support this workflow. A complete example is available here.
definition = await client.beta.assistants.create(
model=model,
name="CodeRunner",
instructions="""
SQL Assistant Agent will provide you the data in JSON format. Never ask if the user wants a chart or graph, start analyzing the data and generating them automatically.
1. Parse the JSON into Python data structures.
2. Execute the Python code needed to generate a chart or graph.
3. Save the chart as a PNG file.
4. Upload PNG via the code‐interpreter tool and return its file reference (FileReferenceContent).
Always format your **final answer** in markdown—do not simply return code blocks.
Include the actual image reference alongside any analysis.
Return file path for that the chart was saved to.
""",
tools=code_interpreter_tool,
tool_resources=code_interpreter_tool_resources,
)
code_agent = AzureAssistantAgent(
client=client,
definition=definition,
)
Group Chat Orchestration
To orchestrate the interaction between the SQL Assistant Agent and the CodeRunner Agent, we’ll use an Agent Group Chat. Within this setup, we define both a selection strategy and a termination strategy to manage the flow of conversation between agents. This ensures that control passes smoothly from one agent to another and that the dialogue concludes appropriately once the required tasks are completed.
# Define a termination function where the reviewer signals completion with "yes".
termination_keyword = "YES"
termination_function = KernelFunctionFromPrompt(
function_name="termination",
prompt=f"""
Determine whether the conversation is complete.
The conversation is considered complete the moment CodeRunner uploads a chart image file. Or when the CodeRunner agent provided final answer in markdown format.
If the last message ({{$lastmessage}}) contains any file reference (e.g. a FileReferenceContent item), for example like this:
1. I have created a bar chart representing the top 5 most rented movies. You can download the chart using the link below.
2. Below is a bar chart visualizing the top 5 most rented out movies. Please find the file of the chart attached
3. "Here is the chart you requested: "
respond with exactly '{termination_keyword}' and nothing else.
""",
)
# Create the AgentGroupChat with selection and termination strategies.
chat = AgentGroupChat(
agents=[agent, code_agent],
termination_strategy=KernelFunctionTerminationStrategy(
agents=[agent, code_agent],
function=termination_function,
kernel=kernel,
result_parser=lambda result: termination_keyword in str(result.value[0]).lower(),
history_variable_name="lastmessage",
maximum_iterations=10,
history_reducer=history_reducer,
),
)
group_chat = AgentGroupChat(
agents=[agent, code_agent])
Results
And there you have it - a working multi-agent setup that lets you ask questions in natural language and automatically visualize the results.
Now let’s ask a question in natural language and see how the agents respond. The question: "What is the total payment revenue generated, and how does it break down by film category?"
Response:
AuthorRole.ASSISTANT - CodeRunner: 'The total payment revenue generated is $61,312.04.
Additionally, here is a breakdown of payment revenue by film category, visualized in the chart above. High revenue contributors include Sports, Sci-Fi, and Animation, while categories such as Music and Travel generate comparatively lower income.
And, of course, we have a clear visual to accompany the analysis:
Thank you for your time!
Full example can be found in this Github repo.