This article reviews a common pattern of streaming data (i.e. real-time message ingestion) in Synapse SQL dedicated pool. It opens a discussion on the simple standard way to implement this, as well as the challenges and drawbacks. It then presents an alternate solution which enables optimal performance and greatly reduces maintenance tasks when using clustered column store indexes. This is aimed at developers, DBAs, architects, and anyone who works with streams of data that are captured in real-time.
Description of this scenario is as follow: we have messages produced by sensors (simulated by Azure container instances) sent to an Event Hub, then processed by an ASA job which redirects its output into a Synapse table named “rawdata”. At this stage there are two options:
This sample deploys within one resource group:
In case ASA job loads data directly in the fact table and assuming it has a clustered column store index, what happens is:
See illustration below which shows this:
In case ASA job loads the telemetry data into a staging table named "rawdata" then upsert the fact table, we can see that what happens is the following:
See illustration below which shows this:
This task is constituted of:
{
"name": "processrawdata",
"type": "SqlPoolStoredProcedure",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"sqlPool": {
"referenceName": "sqlpool2",
"type": "SqlPoolReference"
},
"typeProperties": {
"storedProcedureName": "[dbo].[processrawdata]"
}
}
{
"name": "tumblingwindow10minutes",
"properties": {
"annotations": [],
"runtimeState": "Stopped",
"pipeline": {
"pipelineReference": {
"referenceName": "processrawdata",
"type": "PipelineReference"
}
},
"type": "TumblingWindowTrigger",
"typeProperties": {
"frequency": "Minute",
"interval": 10,
"startTime": "2022-01-14T12:58:00Z",
"delay": "00:00:00",
"maxConcurrency": 50,
"retryPolicy": {
"intervalInSeconds": 30
},
"dependsOn": []
}
}
}
ALTER PROC [dbo].[processrawdata] AS
BEGIN
begin TRANSACTION
--We ensure the staging data is locked before its data gets either insert or update the fact table to avoid inconsistencies
UPDATE rawdata SET partitionid=0 WHERE 1=0
--Now the fact update can take place as well as the stating data when processed
MERGE dbo.rawdata_fact AS macible
USING dbo.rawdata AS masource
ON (macible.eventId = masource.eventId
and macible.[Type]=masource.[Type]
and macible.DeviceId=masource.DeviceId
and macible.DeviceSequenceNumber=masource.DeviceSequenceNumber
and macible.CreatedAt=masource.CreatedAt)
WHEN MATCHED
THEN UPDATE SET
macible.EventId=masource.EventId,
macible.[Type]=masource.[Type],
macible.DeviceId=masource.DeviceId,
macible.DeviceSequenceNumber=masource.DeviceSequenceNumber,
macible.CreatedAt=masource.CreatedAt,
macible.[Value]=masource.[Value],
macible.ComplexData=masource.ComplexData,
macible.EnqueuedAt=masource.EnqueuedAt,
macible.ProcessedAt=masource.ProcessedAt,
macible.StoredAt=masource.StoredAt
WHEN NOT MATCHED BY TARGET THEN
INSERT ([EventId]
,[Type]
,[DeviceId]
,[DeviceSequenceNumber]
,[CreatedAt]
,[Value]
,[ComplexData]
,[EnqueuedAt]
,[ProcessedAt]
,[StoredAt]
,[PartitionId])
VALUES
(masource.[EventId]
,masource.[Type]
,masource.[DeviceId]
,masource.[DeviceSequenceNumber]
,masource.[CreatedAt]
,masource.[Value]
,masource.[ComplexData]
,masource.[EnqueuedAt]
,masource.[ProcessedAt]
,masource.[StoredAt]
,masource.[PartitionId]);
delete from rawdata;
commit TRAN;
END
Then we can a simple view on to of both tables so we have the latest data as well as the processed one:
CREATE VIEW [dbo].[v_rawdata]
AS select * from rawdata
union ALL
select * from rawdata_fact;
And monitor in near real time the average temperature and CO2 values:
select type, avg(value) as average
from v_rawdata
group by type
We can stream data directly from Stream Analytics into a Synapse SQL dedicated pool. We can avoid the clustered store index maintenance by using an intermediate table and a scheduled pipeline.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.