Introduction:
Data backfills are a common operational requirement in modern data platforms. Missing partitions, upstream delays, or failed pipeline runs often require engineers to manually identify gaps, determine the appropriate recovery window, and trigger reprocessing. This approach does not scale well it introduces operational overhead, increases the risk of human error, and requires deep knowledge of data dependencies and pipeline behavior.
In this post, I describe how to build a backfill agent using Azure AI Foundry, Model Context Protocol (MCP), Azure Functions, Synapse, and ADX. The goal is to automate the decision-making process while keeping execution controlled, observable, and governed.
The design separates responsibilities across three layers:
- Decision layer: an LLM-based agent determines whether a backfill is required and defines the recovery scope (e.g., which dates, datasets, or layers)
- Execution layer: an MCP server hosted on Azure Functions exposes controlled operations such as triggering pipelines and querying system state
- State layer: ADX tables maintain backfill control metadata, data availability signals, and execution history
This separation keeps the system flexible while ensuring that all actions are traceable, auditable, and policy-driven.
Importantly, this pattern is not limited to a single dataset or pipeline. It can be applied across all datasets and across all layers of a medallion architecture Bronze, Silver, and Gold with layer-specific validation rules and backfill strategies. For example, Bronze may focus on completeness of ingestion, while Silver and Gold can enforce data quality and business logic constraints before initiating recovery.
The key benefit of a backfill agent is that it shifts backfilling from a manual, reactive process to an automated, intelligent, and consistent workflow. Instead of engineers investigating incidents and triggering reruns, the agent continuously evaluates data state, identifies gaps, and initiates-controlled recovery actions. This reduces operational burden, improves reliability, and ensures faster recovery from data issues while maintaining governance, observability, and strict control over execution.
Architecture Overview
The solution is designed as a controlled orchestration pattern that separates decision-making, execution, and state management. This allows backfill operations to be automated without compromising governance or observability.
The architecture consists of four main components.
Logic Apps Trigger
The workflow is initiated using a Logic App. The trigger can be scheduled or invoked on demand, depending on operational requirements.
It provides the input context required for the backfill evaluation, such as dataset name, processing layer, and scope constraints (for example, maximum number of dates to process).
Azure AI Foundry Agent (Decision Layer)
The Azure AI Foundry agent acts as the decision layer.
It evaluates the request and determines whether a backfill is required, and if so, what scope should be applied. The agent does not interact directly with data systems. Instead, it invokes predefined tools exposed through the MCP server.
This ensures that decision logic is flexible, while execution remains controlled.
Azure Function App – MCP Server (Execution Layer)
The Azure Function App hosts the MCP server and exposes a set of operations to the agent.
These operations include querying missing partitions, triggering Synapse pipelines, retrieving execution status, and updating control tables.
All interactions with external systems (Synapse and ADX) are handled within this layer. It is responsible for input validation, authorization, and enforcing execution rules.
This abstraction ensures that infrastructure actions are not directly performed by the agent.
Synapse Pipelines (Processing Layer)
Backfill execution is handled by a parameterized Synapse pipeline.
The pipeline follows a consistent pattern:
- Data is first written to a staging table
- Validation is performed
- Data is promoted to the main table only if validation succeeds
This approach ensures data quality and prevents partial or invalid data from being published.
Azure Data Explorer(State and Observability Layer)
ADX is used as the central state store.
It maintains control and execution tables that track expected partitions, missing data, pipeline runs, and execution outcomes.
This enables:
- Detection of missing partitions
- Idempotent execution (avoiding duplicate processing)
- Full traceability of backfill operations
The agent relies on this state, accessed via the MCP server, to make decisions.
End-to-End Flow
- The Logic App triggers the workflow and passes the request context.
- The Foundry agent evaluates the request.
- The agent invokes an MCP tool to retrieve missing partitions from ADX.
- Based on the result, the agent determines whether a backfill is required.
- If required, the agent invokes an MCP tool to trigger the Synapse pipeline.
- The pipeline executes the backfill using a staging and validation pattern.
- Execution details are written to ADX.
- The agent returns a summary of the operation.
Analytics Layer:
Azure Synapse Analytics:
In the Synapse workspace, I created a generic parameterized pipeline that has three steps:
1.Copy data from upstream and ingest it to ADX staging table
2. Run Data validation
3 ingest staging data to main dataset table.
the pipeline gets as a parameter dataset name, partitioning date, isbackfill flag and layer and ingest dataset into kusto table.
values for layer : Bronze,Silver or Gold.
Kusto:
In Kusto, the solution relies on the following tables:
- Dataset tables for example, the Customers table in this demo [the same pattern can be extended to support multiple datasets.]
- BackfillControl:
its the central configuration and decision input for the backfill process. It defines which dataset partitions require backfill and provides the metadata needed for the agent to make execution decisions.
each row in this table represents a specific dataset partition (for example, a given date in a specific layer) and its current backfill state. -
BackfillExecutionLog :
this table is used to track the execution of backfill operations. It provides a complete record of when backfills were triggered, their outcome, and the associated pipeline runs, while the BackfillControl table defines what should be processed, the BackfillExecutionLog captures what actually happened.
Code for Creating the tables:
.create table BackfillExecutionLog ( ExecutionId: string, DatasetName: string, Layer: string, PartitionDate: datetime, PipelineName: string, PipelineRunId: string, TriggeredAt: datetime, TriggeredBy: string, ExecutionStatus: string, Reason: string ) .create table BackfillControl ( DatasetName: string, Layer: string, PartitionDate: datetime, BackfillRequired: bool, Status: string, DQStatus: string, RetryCount: int, MaxRetryCount: int, Reason: string )
Output examples:
In this demo, Logic Apps, Synapse, and Kusto are treated as existing systems; the focus is how to expose controlled MCP tools from an Azure Function App and connect them to Azure AI Foundry agent.
Microsoft’s Azure Functions MCP extension lets a Function App expose functions as MCP tools, and Foundry can connect to the deployed MCP endpoint.
Steps:
Step1: Create the local Function App project
in VS code, run the command:
mkdir backfill-kusto-mcp
cd backfill-kusto-mcp
func init . --worker-runtime python --python
Step2: Implement the MCP tools
- add requirements to requirements.txt file:
azure-functions>=1.24.0 azure-identity azure-kusto-data requests python-dotenv - The host.json file defines runtime-level behavior for the Azure Function App.
In this implementation, it is used to configure the MCP extension, logging, and extension bundles.{ "version": "2.0", "extensions": { "mcp": { "system": { "webhookAuthorizationLevel": "Anonymous" } } }, "logging": { "applicationInsights": { "samplingSettings": { "isEnabled": true, "excludedTypes": "Request" }, "enableLiveMetricsFilters": true } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle.Experimental", "version": "[4.*, 5.0.0)" } } -
The local.settings.json file is used to define environment-specific configuration for the Azure Function App during local development.
It contains application settings (environment variables) that are read by the Function App at runtime. These settings are not checked into source control and are replaced by App Settings in Azure after deployment.
For example:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python", "KUSTO_CLUSTER": "https://<ClusterName>.<Region>.kusto.windows.net", "KUSTO_DATABASE": "<DatabaseName>", "BACKFILL_CONTROL_TABLE": "BackfillControl", "BACKFILL_EXECUTION_LOG_TABLE": "BackfillExecutionLog", "SYNAPSE_WORKSPACE": "<SynapseWorkspaceName>", "SYNAPSE_PIPELINE": "<PipelineName>", "AUTH_MODE": "az_login", "AZURE_CLIENT_ID": "", "DEFAULT_DATASET_NAME": "Customers", "DEFAULT_LAYER": "Bronze", "MAX_DATES_DEFAULT": "5" } }For local development, AUTH_MODE is set to az_login.
Before deploying to Azure Functions, change AUTH_MODE to MANAGED_IDENTITY in the Function App application settings.
- The function_app.py defines the main implementation of MCP server
ir:
- Exposes MCP tools (find_backfill_candidates, trigger_backfill, run_backfill_agent, get_backfill_execution_log)
- Reads configuration from environment variables
- Authenticates using Azure CLI (local) or Managed Identity (Azure)
- Queries BackfillControl in Kusto to identify missing partitions
- Triggers Synapse pipelines for backfill
- Writes execution results to BackfillExecutionLog
- Enforces idempotency by checking if a partition was already triggered
Code:import os import uuid import json import logging from datetime import datetime, timezone from urllib.parse import quote import requests import azure.functions as func from azure.identity import ManagedIdentityCredential, AzureCliCredential from azure.kusto.data import KustoClient, KustoConnectionStringBuilder app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) logging.basicConfig(level=logging.INFO) AUTH_MODE = os.getenv("AUTH_MODE", "MANAGED_IDENTITY").lower() KUSTO_CLUSTER = os.getenv("KUSTO_CLUSTER") KUSTO_DATABASE = os.getenv("KUSTO_DATABASE") CONTROL_TABLE = os.getenv("BACKFILL_CONTROL_TABLE", "BackfillControl") EXECUTION_LOG_TABLE = os.getenv("BACKFILL_EXECUTION_LOG_TABLE", "BackfillExecutionLog") SYNAPSE_WORKSPACE = os.getenv("SYNAPSE_WORKSPACE") SYNAPSE_PIPELINE = os.getenv("SYNAPSE_PIPELINE", "Customer Dataset") DEFAULT_DATASET_NAME = os.getenv("DEFAULT_DATASET_NAME", "Customers") DEFAULT_LAYER = os.getenv("DEFAULT_LAYER", "Bronze") def utc_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def log_event(event: str, **properties): logging.info( "MCP_BACKFILL %s", json.dumps( { "event": event, "timestamp_utc": utc_now(), **properties, }, default=str, ), ) def require_setting(name: str, value: str | None): if not value: raise ValueError(f"Missing required app setting: {name}") def escape_kusto_string(value: str | None) -> str: if value is None: return "" return str(value).replace("\\", "\\\\").replace('"', '\\"') def get_credential(): if AUTH_MODE == "az_login": return AzureCliCredential() managed_identity_client_id = os.getenv("AZURE_CLIENT_ID") if managed_identity_client_id: log_event( "using_user_assigned_managed_identity", client_id=managed_identity_client_id, ) return ManagedIdentityCredential(client_id=managed_identity_client_id) log_event("using_system_assigned_managed_identity") return ManagedIdentityCredential() def get_kusto_client() -> KustoClient: require_setting("KUSTO_CLUSTER", KUSTO_CLUSTER) if AUTH_MODE == "az_login": kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER) else: managed_identity_client_id = os.getenv("AZURE_CLIENT_ID") if managed_identity_client_id: kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication( KUSTO_CLUSTER, client_id=managed_identity_client_id, ) else: kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication( KUSTO_CLUSTER ) return KustoClient(kcsb) def execute_kusto_query(query: str): require_setting("KUSTO_DATABASE", KUSTO_DATABASE) client = get_kusto_client() response = client.execute(KUSTO_DATABASE, query) return response.primary_results[0] def execute_kusto_command(command: str): require_setting("KUSTO_DATABASE", KUSTO_DATABASE) client = get_kusto_client() return client.execute_mgmt(KUSTO_DATABASE, command) def find_backfill_candidates_core( dataset_name: str, layer: str, max_dates: int, ) -> list[dict]: dataset = escape_kusto_string(dataset_name) layer_name = escape_kusto_string(layer) query = f""" {CONTROL_TABLE} | where DatasetName == "{dataset}" | where Layer == "{layer_name}" | where BackfillRequired == true | where RetryCount < MaxRetryCount | where Status in ("Missing", "Failed") or DQStatus == "Failed" | top {int(max_dates)} by PartitionDate asc | project DatasetName, Layer, PartitionDate, Status, DQStatus, RetryCount, MaxRetryCount, Reason """ rows = execute_kusto_query(query) return [ { "DatasetName": row["DatasetName"], "Layer": row["Layer"], "PartitionDate": str(row["PartitionDate"])[:10], "Status": row["Status"], "DQStatus": row["DQStatus"], "RetryCount": row["RetryCount"], "MaxRetryCount": row["MaxRetryCount"], "Reason": row["Reason"], } for row in rows ] def was_backfill_already_triggered( dataset_name: str, layer: str, partition_date: str, ) -> bool: dataset = escape_kusto_string(dataset_name) layer_name = escape_kusto_string(layer) query = f""" {EXECUTION_LOG_TABLE} | where DatasetName == "{dataset}" | where Layer == "{layer_name}" | where PartitionDate == datetime({partition_date}) | where ExecutionStatus == "Triggered" | summarize Count = count() """ rows = list(execute_kusto_query(query)) return bool(rows and rows[0]["Count"] > 0) def write_execution_log( execution_id: str, dataset_name: str, layer: str, partition_date: str, pipeline_name: str, pipeline_run_id: str, execution_status: str, reason: str, ): command = f""" .set-or-append {EXECUTION_LOG_TABLE} <| print ExecutionId = "{escape_kusto_string(execution_id)}", DatasetName = "{escape_kusto_string(dataset_name)}", Layer = "{escape_kusto_string(layer)}", PartitionDate = datetime({partition_date}), PipelineName = "{escape_kusto_string(pipeline_name)}", PipelineRunId = "{escape_kusto_string(pipeline_run_id)}", TriggeredAt = datetime({utc_now()}), TriggeredBy = "FoundryMCPBackfillAgent", ExecutionStatus = "{escape_kusto_string(execution_status)}", Reason = "{escape_kusto_string(reason)}" """ execute_kusto_command(command) def trigger_synapse_pipeline( dataset_name: str, layer: str, partition_date: str, ) -> str: require_setting("SYNAPSE_WORKSPACE", SYNAPSE_WORKSPACE) require_setting("SYNAPSE_PIPELINE", SYNAPSE_PIPELINE) credential = get_credential() token = credential.get_token("https://dev.azuresynapse.net/.default").token encoded_pipeline_name = quote(SYNAPSE_PIPELINE, safe="") url = ( f"https://{SYNAPSE_WORKSPACE}.dev.azuresynapse.net" f"/pipelines/{encoded_pipeline_name}/createRun" f"?api-version=2020-12-01" ) payload = { "DatasetName": dataset_name, "Layer": layer, "PartitionDate": partition_date, "IsBackfill": True, } response = requests.post( url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, json=payload, timeout=30, ) log_event( "synapse_create_run_response", status_code=response.status_code, body=response.text[:2000], ) response_json = {} try: response_json = response.json() except Exception: pass if "runId" in response_json: return response_json["runId"] raise Exception( f"Synapse trigger failed. " f"StatusCode={response.status_code}. " f"Body={response.text}" ) def trigger_backfill_core( dataset_name: str, layer: str, partition_date: str, ) -> dict: execution_id = str(uuid.uuid4()) log_event( "trigger_backfill_started", execution_id=execution_id, dataset_name=dataset_name, layer=layer, partition_date=partition_date, ) try: if was_backfill_already_triggered(dataset_name, layer, partition_date): return { "ExecutionId": execution_id, "DatasetName": dataset_name, "Layer": layer, "PartitionDate": partition_date, "Status": "Skipped", "Reason": "Backfill was already triggered for this partition.", } pipeline_run_id = trigger_synapse_pipeline( dataset_name=dataset_name, layer=layer, partition_date=partition_date, ) write_execution_log( execution_id=execution_id, dataset_name=dataset_name, layer=layer, partition_date=partition_date, pipeline_name=SYNAPSE_PIPELINE, pipeline_run_id=pipeline_run_id, execution_status="Triggered", reason="Triggered by Foundry MCP backfill agent", ) return { "ExecutionId": execution_id, "DatasetName": dataset_name, "Layer": layer, "PartitionDate": partition_date, "PipelineName": SYNAPSE_PIPELINE, "PipelineRunId": pipeline_run_id, "Status": "Triggered", } except Exception as ex: error_message = str(ex) log_event( "trigger_backfill_failed", execution_id=execution_id, dataset_name=dataset_name, layer=layer, partition_date=partition_date, error=error_message, ) try: write_execution_log( execution_id=execution_id, dataset_name=dataset_name, layer=layer, partition_date=partition_date, pipeline_name=SYNAPSE_PIPELINE or "", pipeline_run_id="", execution_status="FailedToTrigger", reason=error_message, ) except Exception as log_ex: log_event( "failed_to_write_execution_log", execution_id=execution_id, original_error=error_message, log_error=str(log_ex), ) return { "ExecutionId": execution_id, "DatasetName": dataset_name, "Layer": layer, "PartitionDate": partition_date, "Status": "FailedToTrigger", "Error": error_message, } def run_backfill_agent_core( dataset_name: str, layer: str, max_dates: int, ) -> list[dict]: log_event( "run_backfill_agent_started", dataset_name=dataset_name, layer=layer, max_dates=max_dates, ) candidates = find_backfill_candidates_core( dataset_name=dataset_name, layer=layer, max_dates=max_dates, ) results = [] for candidate in candidates: result = trigger_backfill_core( dataset_name=candidate["DatasetName"], layer=candidate["Layer"], partition_date=candidate["PartitionDate"], ) results.append(result) log_event( "run_backfill_agent_completed", dataset_name=dataset_name, layer=layer ) return results def get_execution_log_core( dataset_name: str, limit: int, ) -> list[dict]: dataset = escape_kusto_string(dataset_name) query = f""" {EXECUTION_LOG_TABLE} | where DatasetName == "{dataset}" | top {int(limit)} by TriggeredAt desc | project ExecutionId, DatasetName, Layer, PartitionDate, PipelineName, PipelineRunId, TriggeredAt, TriggeredBy, ExecutionStatus, Reason """ rows = execute_kusto_query(query) return [ { "ExecutionId": row["ExecutionId"], "DatasetName": row["DatasetName"], "Layer": row["Layer"], "PartitionDate": str(row["PartitionDate"])[:10], "PipelineName": row["PipelineName"], "PipelineRunId": row["PipelineRunId"], "TriggeredAt": str(row["TriggeredAt"]), "TriggeredBy": row["TriggeredBy"], "ExecutionStatus": row["ExecutionStatus"], "Reason": row["Reason"], } for row in rows ] @app.mcp_tool() @app.mcp_tool_property(arg_name="dataset_name", description="Dataset name, for example Customers.") @app.mcp_tool_property(arg_name="layer", description="Layer name, for example Bronze.") @app.mcp_tool_property(arg_name="max_dates", description="Maximum number of dates to return.") def find_backfill_candidates( dataset_name: str = DEFAULT_DATASET_NAME, layer: str = DEFAULT_LAYER, max_dates: int = 5, ) -> list[dict]: return find_backfill_candidates_core(dataset_name, layer, max_dates) @app.mcp_tool() @app.mcp_tool_property(arg_name="dataset_name", description="Dataset name, for example Customers.") @app.mcp_tool_property(arg_name="layer", description="Layer name, for example Bronze.") @app.mcp_tool_property(arg_name="partition_date", description="Partition date in yyyy-MM-dd format.") def trigger_backfill( dataset_name: str, layer: str, partition_date: str, ) -> dict: return trigger_backfill_core(dataset_name, layer, partition_date) @app.mcp_tool() @app.mcp_tool_property(arg_name="dataset_name", description="Dataset name, for example Customers.") @app.mcp_tool_property(arg_name="layer", description="Layer name, for example Bronze.") @app.mcp_tool_property(arg_name="max_dates", description="Maximum number of dates to trigger.") def run_backfill_agent( dataset_name: str = DEFAULT_DATASET_NAME, layer: str = DEFAULT_LAYER, max_dates: int = 5, ) -> list[dict]: return run_backfill_agent_core(dataset_name, layer, max_dates) @app.mcp_tool() @app.mcp_tool_property(arg_name="dataset_name", description="Dataset name, for example Customers.") @app.mcp_tool_property(arg_name="limit", description="Maximum number of execution log rows to return.") def get_backfill_execution_log( dataset_name: str = DEFAULT_DATASET_NAME, limit: int = 10, ) -> list[dict]: return get_execution_log_core(dataset_name, limit)
Step3: . Run locally
1. Activate virtual environment:
python -m venv .sally-env
.\.sally-env\Scripts\activate
2. Install dependencies :
pip install -r requirements.txt
npm install -g azurite
3. Open 2 terminals, in one terminal run:
azurite
4. in the second terminal:
Login to Azure:
az login
start the function app:
func start
P.S make sure to change auth in local.settings.json file to
"AUTH_MODE": "az_login"
Step4: Create Azure resources and deploy
# LOGIN
az login
# VARIABLES
$RG="rg-backfill-kusto-mcp-demo"
$LOCATION="westeurope"
$STORAGE="stbackfillmcp$((Get-Random -Minimum 10000 -Maximum 99999))"
$FUNCAPP="func-backfill-kusto-mcp-$((Get-Random -Minimum 10000 -Maximum 99999))"
# CREATE RESOURCE GROUP
az group create --name $RG --location $LOCATION
# CREATE STORAGE ACCOUNT
az storage account create `
--name $STORAGE `
--resource-group $RG `
--location $LOCATION `
--sku Standard_LRS
# CREATE FUNCTION APP
az functionapp create `
--resource-group $RG `
--consumption-plan-location $LOCATION `
--runtime python `
--runtime-version 3.11 `
--functions-version 4 `
--name $FUNCAPP `
--storage-account $STORAGE `
--os-type Linux
# ENABLE MANAGED IDENTITY
az functionapp identity assign `
--resource-group $RG `
--name $FUNCAPP
# GET PRINCIPAL ID
$FUNC_PRINCIPAL_ID = az functionapp identity show `
--resource-group $RG `
--name $FUNCAPP `
--query principalId `
--output tsv
Write-Host "Function App Principal ID: $FUNC_PRINCIPAL_ID"
# CONFIGURE APP SETTINGS
az functionapp config appsettings set `
--resource-group $RG `
--name $FUNCAPP `
--settings `
AUTH_MODE=MANAGED_IDENTITY `
KUSTO_CLUSTER="https://<ClusterName>.<Region>.kusto.windows.net" `
KUSTO_DATABASE="<DatabaseName>" `
BACKFILL_CONTROL_TABLE="BackfillControl" `
BACKFILL_EXECUTION_LOG_TABLE="BackfillExecutionLog" `
SYNAPSE_WORKSPACE="<SynapseWorkspaceName>" `
SYNAPSE_PIPELINE="<PipelineName>" `
DEFAULT_DATASET_NAME="Customers" `
DEFAULT_LAYER="Bronze" `
MAX_DATES_DEFAULT="5"
# DEPLOY FUNCTION APP (RUN FROM PROJECT FOLDER)
func azure functionapp publish $FUNCAPP
# GET MCP ENDPOINT
$MCP_ENDPOINT="https://$FUNCAPP.azurewebsites.net/runtime/webhooks/mcp"
Write-Host "MCP Endpoint: $MCP_ENDPOINT"
# GET MCP KEY
$MCP_KEY = az functionapp keys list `
--resource-group $RG `
--name $FUNCAPP `
--query "systemKeys.mcp_extension" `
--output tsv
Write-Host "MCP Key: $MCP_KEY"
# TEST MCP TOOL
$body = @{
jsonrpc = "2.0"
id = "1"
method = "tools/call"
params = @{
name = "find_backfill_candidates"
arguments = @{
dataset_name = "Customers"
layer = "Bronze"
max_dates = 1
}
}
} | ConvertTo-Json -Depth 10
Invoke-RestMethod `
-Uri $MCP_ENDPOINT `
-Method POST `
-Headers @{
Accept = "application/json, text/event-stream"
"x-functions-key" = $MCP_KEY
} `
-ContentType "application/json" `
-Body $body
After deployment, the Function App’s managed identity must be granted the appropriate permissions in both Kusto and Synapse with Function app principal id , this allows the Function App to query Kusto tables and trigger Synapse pipelines without issues.
Step5: . Connect the MCP server to Azure AI Foundry
- Go to Azure AI Foundry portal
- Navigate to your Project
- Open your Agent
- Add MCP as a tool :
- Go to Tools
- Click Add Tool
- Select:
Custom → Model Context Protocol (MCP)
Configure custom MCP and click on Save
MCP endpoint:
https://<function-app-name>.azurewebsites.net/runtime/webhooks/mcp
Step6: Define Agent instructions
You are a Backfill Reliability Agent.
You MUST use the backfill_agent MCP tool.
Do NOT ask the user for candidate dates.
When asked to run backfill:
Find dataset name
1. Call find_backfill_candidates with dataset_name layer max_dates
2. Then call run_backfill_agent with dataset_name, layer= max_dates
Return the PipelineRunId.
Note:
The instructions are very generic; you need to modify it based on your business scenario.
Step7: Test prompt
Now in Synapse Monitor:
Search for PipelineRunId: df1b1920-09dd-415b-bbe9-d810d8505f58:
Future Enhancements:
The backfill agent automates recovery by detecting missing or failed data and triggering controlled reprocessing via MCP.
It can scale across all datasets and medallion layers (Bronze, Silver, Gold) with layer-specific rules.
The design can evolve into a multi-agent workflow for example, if backfill fails multiple times, a notification agent can automatically send emails or create incidents for upstream teams.
Overall, this shift backfilling from a manual, reactive task to an automated, governed, and intelligent data operations process.
Links:
Tutorial: Host an MCP server on Azure Functions | Microsoft Learn
Quickstart: Set up Microsoft Foundry resources - Microsoft Foundry | Microsoft Learn
Would love to hear your Feedback: Sally Dabbah | LinkedIn