Microsoft Defender ATP advanced hunting is a query-based threat-hunting tool that lets you explore up to 30 days of raw data. You can proactively inspect events in your network to locate interesting indicators and entities. The flexible access to data facilitates unconstrained hunting for both known and potential threats. Advanced hunting is based on the Kusto query language. You can use Kusto syntax and operators to construct queries that locate information in the schema specifically structured for advanced hunting.
In some scenarios customers would like to centralize their logs from Microsoft Defender ATP with their other logs into Azure Data Explorer or keep the logs accessible for a longer period or build a customer solutions and visualization around this data. In this article, I am going to provide step by step instructions on how to stream Microsoft Defender ATP advance hunting events to Azure Data Explorer using Event Hub.
Before I begin, a few words about the platform. Azure Data Explorer (ADX) is a lightning fast service optimized for data exploration. It supplies users with instant visibility into very large raw datasets in near real-time to analyze performance, identify trends and anomalies, and diagnose problems. In addition to these amazing capabilities, customers can choose their own data retention period.
Let’s get started with this integration.
First, you are going to setup the streaming of Microsoft Defender hunting events to either a Storage Account (Blob) or to Event Hub.
For this article, I am going to demonstrate on how to integrate with Event Hub. Integration with Storage account is very similar and uses Event Grid integration.
Let’s focus on event hub message schema to understand in which format you are going to receive the data and how to use that to design the schema in next step. The schema of the events in Event hub is something like –
{
"records":
[
{
"time": "<The time WDATP received the event>"
"tenantId": "<The Id of the tenant that the event belongs to>"
"category": "<The Advanced Hunting table name with 'AdvancedHunting-' prefix>"
"properties": { <WDATP Advanced Hunting event as Json> }
}...
]
}
Now that hunting events are being streamed to Event Hub, you are going to setup Azure Data Explorer data connection to build to pipeline to ingest messages into a table.
Before you create the data connection, let’s review the schema that you are going to create to setup the ingestion. Since you are going to get a JSON array and each array can have data for different event, you will need to first get the data into a Staging table (you can give this table a different name) and then fork the data to its individual tables during ingestion by using update policy. I would suggest creating the staging table with following schema and ingestion mapping -
// Raw column is going to get data JSON array in each row
.create table Staging (Raw: dynamic)
// This mapping would map the root element of JSON and ingest it into Raw column
.create table Staging ingestion json mapping 'StagingMapping' '[{"column":"Raw","path":"$","datatype":"dynamic","transform":null}]'
Once the table is created, you can follow the documentation here to setup the data connection in your Azure Data Explorer Cluster. Few notes here –
Once the data connection is created, you should start receiving data in Staging table. You can run the following query to review all the different events you are receiving the data for (ideally this list would contain all tables that you have selected while setting up streaming configuration from Microsoft defender ATP to event hub) –
Staging
| mv-expand Raw.records
| project Properties=Raw_records.properties, Category=Raw_records.category
| summarize by tostring(Category)
Now that you have data available in Staging table, you need to fork this data in its own individual table. This exercise would be divided into three parts –
I am going to provide instructions for DeviceAlertEvents. You can follow similar approach for other events.
For this, I am going to take an example of one of the tables in Microsoft Defender ATP – “AdvancedHunting-DeviceAlertEvents”
First, lets filter the records from Staging table, that belongs to this specific event –
Staging
| mv-expand Raw.records
| project Properties=Raw_records.properties, Category=Raw_records.category
| where Category == "AdvancedHunting-DeviceAlertEvents"
| project AlertId=tostring(Properties.AlertId),
Timestamp=todatetime(Properties.Timestamp),
DeviceId=tostring(Properties.DeviceId),
DeviceName=tostring(Properties.DeviceName),
Severity=tostring(Properties.Severity),
Category=tostring(Properties.Category),
Title=tostring(Properties.Title),
FileName=tostring(Properties.FileName),
SHA1=tostring(Properties.SHA1),
RemoteUrl=tostring(Properties.RemoteUrl),
RemoteIp=tostring(Properties.RemoteIp),
ReportId=tolong(Properties.ReportId),
Table=tostring(Properties.Table)
To find the correct mapping of each column, I used the reference that is documented here. You will find this schema reference for other tables as well.
Also learn more about mv-expand operator in KQL here.
Now that you have your query ready to filter records for DeviceAlertEvents, you are going to create a function in your database.
// This function will be created in a folder called UpdatePolicies
.create function with (docstring = "Filters data for Device Alert Events for ingestion", folder = "UpdatePolicies") FilterDeviceAlertEvents()
{
Staging
| mv-expand Raw.records
| project Properties=Raw_records.properties, Category=Raw_records.category
| where Category == "AdvancedHunting-DeviceAlertEvents"
| project AlertId=tostring(Properties.AlertId),
Timestamp=todatetime(Properties.Timestamp),
DeviceId=tostring(Properties.DeviceId),
DeviceName=tostring(Properties.DeviceName),
Severity=tostring(Properties.Severity),
Category=tostring(Properties.Category),
Title=tostring(Properties.Title),
FileName=tostring(Properties.FileName),
SHA1=tostring(Properties.SHA1),
RemoteUrl=tostring(Properties.RemoteUrl),
RemoteIp=tostring(Properties.RemoteIp),
ReportId=tolong(Properties.ReportId),
Table=tostring(Properties.Table)
}
Now that you have the function ready which can populate the columns that are required to create DeviceAlertEvents table.
// This control command is going to perform two tasks
// 1. Create a table with the same schema as the output of FilterDeviceAlertEvents
// 2. Populate the data that is already available in Staging table (doing data back fill)
.set-or-append DeviceAlertEvents <| FilterDeviceAlertEvents()
Learn more about .set-or-append here.
This is your final step to fork the data into DeviceAlertEvents table during ingestion. With the update policy, any time a new ingestion is going to run on Staging table, the function FilterDeviceAlertEvents is going to be executed and if there is result, that result set is going to be ingested into DeviceAlertEvents table.
// This command is setting the update policy and using Staging as a source for ingestion and FilterDeviceAlertEvents as the query to run for each ingestion
.alter table DeviceAlertEvents policy update
@'[{"IsEnabled": true, "Source": "Staging", "Query": "FilterDeviceAlertEvents()", "IsTransactional": true, "PropagateIngestionProperties": true}]'
For the most part, you are done with streaming Microsoft Defender ATP hunting events in Azure Data Explorer. Before I finish this article, I wanted to provide some more information on how to manage the
Once you have all the tables and functions created for all hunting events, you practically don’t want to retain any data in Staging table outside the ingestion cycle. This would also save cost by not storing any data in the Staging table. To do this, you need to define the retention policy on Staging table as zero –
// Data will not be retained on this table after ingestion
.alter-merge table Staging policy retention softdelete = 0s
Ingestion latency (Batching) is the time taken for ingestion before data is available for you to query. By default, if there is no policy defined, Azure Data Explorer will use a default value of 5 minutes as the maximum delay time or 1000 items or total size of 1G for batching.
You can set batching policy at the database level or set a different policy for each table based on your business scenario. If the policy is not set for a certain entity, it will look for a higher hierarchy level policy, if all are set to null the default value will be used.
To do this, you can update the Ingestion Batching policy using the following command –
// Set IngestionBatching policy on table `Staging` (in database context) to batch ingress data by 30 seconds, 500 files, or 1GB (whatever comes first)
.alter table Staging policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:30", "MaximumNumberOfItems": 500, "MaximumRawDataSizeMB": 1024}'
Please follow the ingestion best practices to optimize your throughput.
With this you should have data being streamed from Microsoft Defender ATP hunting events to Azure Data Explorer and you can now specify different retention for your tables that meets your business requirement.
You can use following resources to learn more about Azure Data Explorer and its query language:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.