Define AI pipelines in SQL and let HorizonDB handle chunking, embeddings, retries, checkpoints, and fresh results next to your data.
By Abe Omorogbe, Navya Teja Gajula, Binnur Gorer, B Harsha Kashyap, Krishnakumar Ravi (KK) from Microsoft PostgreSQL AI team
If you’ve ever shipped a RAG app, this will feel familiar. Your data lives in Postgres. But the pipeline that turns that data into vectors lives somewhere else, spread across external services, queues, and retry logic. And when the embedding API hiccups mid-batch? That’s a 2 a.m. production incident. You didn’t set out to build your own embedding service. You just wanted to search your documents.
And RAG is only the beginning. The moment AI works on your data: extraction, summarization, reranking, keeping embeddings fresh, or powering agent, you’re back to stitching together more services, queues, and glue code, all outside the database.
AI pipelines in Azure HorizonDB (Preview) removes that entire stack. Define your workflows steps like chunking, embeding, extracting, and generating in SQL, and HorizonDB runs them as AI pipelines next to your data. No orchestrator. No glue code. Just Postgres.
In this post we'll cover:
- The external-orchestrator issue that every AI on Postgres team eventually hits
- What AI pipelines are, and the four-part anatomy that makes them click
- Use cases worth trying: semantic search, knowledge extraction, content generation, smarter reranking, and always-fresh embeddings
- How to watch your pipelines run as live graphs in VS Code
- How to spin up HorizonDB and run your first pipeline today
🚀 Try it on Azure HorizonDB. AI pipelines are built into Microsoft's new PostgreSQL cloud service, no extra infrastructure to stand up. Write ai.create_pipeline(...), call ai.run(...), and it runs. Get started in HorizonDB →
AI preprocessing runs outside the database, far from your data
The standard way to get data into a vector store looks reasonable on a whiteboard: a service reads source rows, calls an embedding API, and writes chunks back to Postgres. However, some interesting issues often occur in production.
- The embedding API fails mid-batch, and there's no shared checkpoint showing which rows were completed. You rerun the job, and the extra API calls increases cost.
- A worker crashes after writing chunks but before flipping the parent row's processed flag. Now your embeddings are quietly inconsistent, and nobody knows.
Every one of these is the same missing primitive: durable, checkpointed execution that lives where your data lives. External orchestrators can do it, but now you're operating a second service just to feed the first one.
AI pipelines move that logic into HorizonDB itself. The source, the steps, the sink, and the full run history are all SQL protected by the same transactions, backups, and point-in-time restore your data already has. The database is already where your data commits. It's a natural place for the pipeline to live too.
Anatomy of an AI pipeline in HorizonDB
AI Pipeline in HorizonDB, the steps are optional and can be adjusted as needed.A pipeline has four parts:
- Source: where rows come from. A table_source(...) over a HorizonDB table, optionally with an incremental_column so the pipeline skips rows it already processed.
- Steps: the AI operations that transform each row, in order. Each step appends columns to the in-flight batch.
- Sink: where results land, ready for use by your AI apps or agent.
- Trigger: 'on_change' (run automatically when source rows change) or 'manual' (run only when you call ai.run()).
Those four parts give the pipeline its shape. The steps are where you define the AI work itself, using composable building blocks:
|
Step |
What it does |
|
ai.chunk() |
Split long text into overlapping chunks |
|
ai.embed() |
Generate vector embeddings |
|
ai.extract() |
Pull structured fields out of text with an LLM |
|
ai.generate() |
Generate text from a prompt (i.e content generation, classify, summarize and more) |
|
ai.rank() |
Score documents against a query |
How the pieces fit together. The ai.* API gives you the AI pipeline shape: sources define where data comes from, steps define the AI work to perform, sinks define where results land, and triggers define when the pipeline runs. Under the covers, HorizonDB turns that definition into a durable execution graph, where each step can be checkpointed, retried, and resumed if something fails.
Built on open source. That durability isn't magic, every AI pipeline compiles down to a graph that runs on pg_durable, Microsoft's open-source durable-execution engine for PostgreSQL (built on the duroxide Rust runtime). The ai.* API is the AI-shaped surface (sources, steps, sinks, triggers) and pg_durable is the general-purpose engine underneath that handles checkpointing, retries, and crash recovery. So, your pipelines stand on a transparent, inspectable foundation you can read, and run on any Postgres 17 & 18. No black box, no lock-in.
Use case 1: Semantic search over your data
This is one of the most popular use cases. Turn a table of documents into searchable vectors, durably, and keep them fresh as the data changes. That last part matters: in production, documents are edited, added, and deleted constantly, and every change needs the right chunks and embeddings updated without reprocessing the entire corpus or leaving stale vectors behind. With AI pipelines, HorizonDB can track those incremental updates for you. Chunk the body, embed each chunk, and land the result in a DiskANN-indexed table.
-- Define the pipeline: source -> chunk -> embed -> sink.
SELECT ai.create_pipeline(
name => 'rag_pipeline',
source => ai.table_source(table_name => 'documents'),
steps => ARRAY[
ai.chunk(input => 'content', chunk_size => 512, overlap => 64),
ai.embed(model => 'default-embedding', input => 'chunk_text', dimensions => 1536)
],
trigger => 'on_change', -- re-embed automatically as rows change
sink => ai.table_sink('rag_pipeline_output')
);
-- Run it
SELECT ai.run('rag_pipeline');
-- Search your data
SELECT chunk_text, embedding <=> azure_openai.create_embeddings('text-embedding-3-small', 'how does vector search work?')::vector AS distance
FROM rag_pipeline_output
ORDER BY distance
LIMIT 3;
📘 Read more details in the AI Pipelines documentation
That's the entire ingestion layer; chunking, embedding, checkpointing, retries, and sink writes in one definition. Because trigger => 'on_change', the pipeline updates embeddings whenever source rows change, processing only what is new or modified instead of redoing the whole corpus. Your vectors stay in sync with your data, and your ingestion work stays efficient as the dataset grows. Point a query at the DiskANN index and you've got production semantic search without a single line of application glue.
That's the whole loop: define, run, inspect. The embedding service you were about to build the queue, the workers, the retry logic, the checkpoint table, the 2 a.m. production incident doesn't happens.
Why it's better than an external service: a failure in ai.embed() never re-runs ai.chunk(), each step is a durable node. If the database restarts mid-run, it resumes from the last checkpointed batch, not row zero.
Use case 2: Turn unstructured text into structured metadata
Support tickets, contracts, product reviews, research papers are full of structure that's locked inside unstructured documents. ai.extract() pulls named fields out of text and merges them into the metadata JSONB column, so you can filter and aggregate on things an LLM read for you.
SELECT ai.create_pipeline(
name => 'extraction_pipeline',
source => ai.table_source(table_name => 'documents'),
steps => ARRAY[
ai.chunk(input => 'content'),
ai.extract(
input => 'chunk_text',
data => ARRAY['topics: string - the main topics discussed',
'entities: string - named people, products, or places']
model => 'my-gpt' -- optional, the default model when AI model management is activate
)
],
sink => ai.table_sink('extraction_pipeline_output')
);
SELECT ai.run('extraction_pipeline');
-- Now query the structured fields the LLM extracted:
SELECT doc_id, metadata->'topics' AS topics, metadata->'entities' AS entities
FROM extraction_pipeline_output;
📘 Read more details in the AI Pipelines documentation
You describe each field as a label: description string in the ai.extract step, and HorizonDB does the rest durably, in bulk, with the same retry-and-resume guarantees.
Each field is a label, either a bare name like product, or the detailed form name: type - description (for example `sentiment: number - sentiment score from 1 to 5`). HorizonDB does the rest, durably, in bulk, with the same retry-and-resume guarantees.
Use case 3: Summarize and rewrite content at scale
ai.generate() runs an LLM prompt against every row, perfect for bulk summarization, classification, tone normalization, or generating titles. Because it's a pipeline, "summarize 4 million documents" becomes a job that survives restarts instead of a script you have to monitor overnight.
SELECT ai.create_pipeline(
name => 'summary_pipeline',
source => ai.table_source(table_name => 'documents'),
steps => ARRAY[
ai.chunk(input => 'content'),
ai.generate(
input => 'chunk_text',
system_prompt => 'Create a concise summary in 50 words or fewer.'
model => 'my-gpt' -- optional, the default model when AI model management is activate
)
],
sink => ai.table_sink('generation_pipeline_output')
);
SELECT ai.run('summary_pipeline');
-- Now query the generated text:
SELECT doc_id, left(generated_text, 100) AS summary_preview
FROM generation_pipeline_output
WHERE generated_text IS NOT NULL
LIMIT 5;
📘 Read more details in the AI Pipelines documentation
Swap the system_prompt and the same shape becomes a classifier ("Label this ticket as billing, bug, or feature request"), a translator, or a headline generator. The instruction goes in system_prompt; the result lands in generated_text.
Use case 4: Keep embeddings fresh, and re-embed cleanly when the model changes
This is where AI pipelines become especially useful. In a real AI app, two things change constantly: your data and your model. AI pipelines are designed to handle both changes directly.
Your data changes. Set incremental_column and an on_change trigger, and the pipeline only embeds new or changed rows, automatically, forever, until you pause or drop it.
SELECT ai.create_pipeline(
name => 'rag_pipeline',
source => ai.table_source(
table_name => 'documents',
incremental_column => 'updated_at' -- only process what changed
),
steps => ARRAY[
ai.chunk(input => 'content'),
ai.embed(model => 'default-embedding', input => 'chunk_text', dimensions => 1536)
],
trigger => 'on_change',
sink => ai.table_sink('rag_pipeline_output')
);
Your model changes. Bump the model or the dimensions, then run a single, resumable backfill, no migration script, no babysitting:
TRUNCATE rag_pipeline_output;
SELECT ai.backfill('rag_pipeline');
📘 Read more details in the AI Pipelines documentation
The backfill runs as one durable instance. If the database restarts mid-backfill, it picks up from the last checkpointed batch instead of starting over. The painful "re-embed everything" migration becomes a one-liner you can actually trust.
Watch your pipelines run as live graphs in VS Code
A pipeline you can see is a pipeline you can trust. Install the PostgreSQL extension for VS Code, connect to HorizonDB, then right-click your database and open Pipelines & Workflows → AI Pipelines.
AI Pipelines in HorizonDBSelect any run and the center pane renders the execution as a color-coded graph:
- Blue 🔵 : source and sink (where data enters and exits)
- Green 🟢 : processing steps (chunk, embed, extract, generate, rank)
- Pink 🟣 : external model and service calls
For each run you can read the status (completed, running, failed), the run ID for traceability, start time and duration for performance, and a link back to the pipeline definition. When a run fails, open the graph and jump straight to the step where execution stopped, no log spelunking.
Get Started: Try It Now
We have a few demoes of AI pipelines in action:
|
Resource |
Link |
|
Microsoft Build AI Pipeline Demo |
Simplify app dev with cloud-native PostgreSQL in Azure HorizonDB | DEM364 |
|
Microsoft Build AI Pipeline GitHub | |
|
Microsoft Mechanic Demo | |
|
Documentation |
Enabling AI pipelines takes minutes: enable to azure_ai, pg_durable, vector and pg_diskann extensions and you can get started.
-- On Azure HorizonDB — the extensions are built in.
CREATE EXTENSION IF NOT EXISTS pg_durable;
CREATE EXTENSION IF NOT EXISTS azure_ai;
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_diskann;
That's it, your PostgreSQL database can now run AI pipelines