kusto language
37 TopicsHarnessing the Power of Left-Anti Joins in the Kusto Query Language
The Kusto query language supports a variety of joins. Left-anti might not be among the most common ones used, but it can be one of the most powerful. The docs state that a left-anti join “returns all records from the left side that do not match any record from the right side.” Let’s walk through two ways that this can be used in your processing pipeline. Late-Arriving Data Let’s say that we have an incoming stream if time-series data that we want to process. We have a function called ProcessData(startTime:datetime, endTime:datetime) that periodically gets executed and written to a table called Output via .set-or-append commands. The function processes data between those two timestamps in the parameters. Since we don’t want to end up with duplicate rows, we can’t rerun with the same time window. We can, however, catch the late arriving data for that time window by implementing ProcessData in such a way that it reprocesses all the data in the previous day and then does a left-anti join against the Output table to only return the results haven’t been recorded yet. Anything new gets written to the Output table by the set-or-append command and the duplicates get thrown away. .create-or-alter function with (folder = "demo", skipvalidation = "true") ProcessData (startTime:datetime, endTime:datetime) { let lookback = 1d; let allData = SourceData | where Timestamp >= startTime - lookback and Timestamp < endTime ; OutputTable | join kind = leftanti (allData) on DeviceId, Timestamp } [Update 2019-02-21] The Kusto docs have a good document on dealing with late arriving data. Changelog Left-anti joins can also be used to create a changelog. Let’s say there is a process that is dumping 500,000 rows of data into a table. Those rows contain information about a set of devices. The table gets dropped and replaced every day. We can make a CreateChangelog() function that gets its results written to the Changelog table via set-or-append commands. We can do a left-anti join with the data we already have in Output and only write the rows that have changed. So the CreateChangelog function body would look something like this: DeviceData | where PreciseTimeStamp >= startTime and PreciseTimeStamp < endTime | project DeviceId, DimensionA | join kind = leftanti( Output | project DeviceId, DimensionA ) on DeviceId | project DeviceId, DimensionA, ProcessedTime=now() Now the Output table has a record of every time that a device was added, removed or modified.13KViews3likes0CommentsFinding Data Gaps Using Window Functions
My team focuses on processing streaming data. When there are gaps in streams for even a minute, it can cause inaccuracies in our output. Over the years, we’ve written a variety of different queries to find gaps in datasets, but I’ll describe one of our favorites below. If you deal with time series data too, be sure to check out the official docs for a lot more information about time series analysis. For a quick visual inspection, I can use a make-series operator and a timechart: T | make-series count() on EventTime in range(ago(7d), now(), 1m) | render timechart With the chart, getting the exact start and end time of the gap means carefully moving my mouse around on the screen, and that’s prone to errors. The query below shows how I can get an ordered list of all the minutes that have data and then find any gaps bigger than five minutes. let newSessionLimit = 5m; let dataFrequency = 1m; T | make-series count() on EventTime in range(ago(7d), now(), dataFrequency) | mv-expand EventTime to typeof(datetime), count_ | where count_ != 0 | order by EventTime asc | extend prevTime = prev(EventTime) | where EventTime - prevTime > newSessionLimit | project GapStart=prevTime+dataFrequency, GapEnd=EventTime GapStart GapEnd 2019-02-06 01:55:00.0000000 2019-02-06 10:16:00.0000000 2019-02-06 16:55:00.0000000 2019-02-06 18:01:00.0000000 2019-02-06 18:55:00.0000000 2019-02-07 00:16:00.0000000 2019-02-07 00:15:00.0000000 2019-02-07 03:01:00.0000000 2019-02-07 04:55:00.0000000 2019-02-08 14:01:00.0000000 The query makes use of one of the window functions: prev(). The window functions operate on serialized/ordered data to help you do operations involving nearby rows. The prev() function references a field in the previous row. This lets us compare the EventTime in our current row with the one right before it and determine if there was a gap big enough to interest us. The window functions also give you the ability to add row numbers and do cumulative sums, but be warned that serializing your data can incur a high performance penalty. Employ them with caution, but when used wisely, they can open up a whole world of analysis that isn’t otherwise available.4.1KViews2likes3CommentsCalculating Data Latency
When using Azure Data Explorer to process near real time data, it’s often important to understand how quickly or slowly the data arrives in the source table. For this post, we’ll assume that our source data has an EventTime field which denotes when the event actually happened on the source entity. The quickest way to determine latency is to look for the latest EventTime and compare it to the current time. If you do this repeatedly, you’ll get a rough idea of how often the table is getting updated and how fresh the data is. MyEventData | summarize max(EventTime) We can do a lot better than that though. In the background, Kusto is keeping track of the time that every row was ready to be queried. That information is available in the ingestion_time() scalar function. Comparing the ingestion time to the EventTime will show the lag for every row: MyEventData | project lag = ingestion_time() - EventTime At this point I can run some basic aggregations like min, avg and max, but let’s do more and build a cumulative distribution function for the latency. This will tell me how much of the data arrives within X minutes of the event time. I'll start by creating a function which calculates the cumulative distribution for a table of two values. This function uses the invoke operator which receives the source of the invoke as a tabular parameter argument. .create-or-alter function CumulativePercentage(T:(x:real,y:real)) { let sum = toreal(toscalar(T | summarize sum(y))); T | order by x asc | summarize x=make_list(x), y=make_list(y/sum * 100) | project x = x, y = series_iir(y, dynamic([1]), dynamic([1,-1])) | mv-expand x to typeof(real), y to typeof(real) } Now we need to get our ingestion data into the format that the CumulativePercentage function requires, invoke that function and render a linechart with the results. MyEventData | project lag = round((ingestion_time() - EventTime)/1m, 1) | summarize count() by lag | project x=toreal(lag), y=toreal(count_) | invoke CumulativePercentage() | render linechart Now I can see that if I wait 2.6 minutes, about 48% of the data will have arrived in Kusto. That information is handy if I’m doing manual debugging on logs, setting up a scheduled job to process to the data, or monitoring the latency of various data sources. [Update 3/12/2019] Replaced mvexpand and makelist with the newer/preferred versions: mv-expand and make_list.3.7KViews2likes0Comments"Query execution has resulted in error (0x80DA0007)"
This message is thrown without any additional information. Query execution has resulted in error (0x80DA0007): . clientRequestId: KustoWebV2;6d9104b6-4a79-44ff-9370-2797851001c9 I tried to run .show queries to check the ClientActivityId and it just has the same Id as above without any information. What does that error means? I followed the query best practices as mentioned in microsoft doc site. Thanks Sundar3.2KViews1like0Comments