Using external data sources to enrich network logs using Azure storage and KQL
Published Jun 15 2020 06:26 AM 12.9K Views
Microsoft

Network data sources are one of the highest volume data sources hence threat hunting on such data sources becomes often challenging if we do not enrich such datasets. One of the common first steps done is matching against Threat Intel feeds. Apart from that, public ip addresses are difficult to investigate at scale unless we populate additional context around it or correlate with other data sources.

One of the common use-case request we have received from customers is tagging Azure Data Center ip Addresses in network telemetry so they can filter it out and focus on other telemetry to find suspicious traffic.

In this blog, we will show a custom solution to create reference lookup tables of such data sources in blob storage and call within KQL queries using native operators. Ofer has done a blog-post on Approximate, partial and combined lookups which you can reference if you are new to lookup tables. Also, thanks to Dennis Pike from CyberSecurity Solutions Group who contributed with compiling Single data source KQL query.

 

Note:

Microsoft do not recommend filtering or using these ranges as Allowed List for detections. Any such filtering needs to be reviewed and implemented with care so it does not introduce any blind spots in your security monitoring. However, these datasets can be used to enrich existing data sources, provide more context and to segregate or categorize the large portion of the network traffic for analysts/threat hunters while hunting or investigating large data sets.

 

Identification of Reliable Data sources:

Microsoft and other vendors publish various network data sources which are updated regularly (sometimes weekly). These datasets can be used to enrich the existing network data sources. The datasets can be downloaded via public links and can be directly called via externaldata operator if download links are static. These can be extended to Threat Intel feeds such as COVID-19 indicators.

 

Download and ingest datasets:

The datasets may not be always available as static links depending upon how they are being published. Also for certain feeds, the download url has date appended which is dynamic and can not be called with scheduled queries unless normalized on external storage.

 

e.g. For AzureDataCenter IP Ranges – Cloud.

Download Page: https://www.microsoft.com/en-us/download/details.aspx?id=56519  which has download button linking to https://www.microsoft.com/en-us/download/confirmation.aspx?id=56519 When this link is visited from browser, it automatically redirects to the relevant URL with json behind the scenes and auto-downloads.

However, this direct download link is not static and may change its location or date depending on when its updated and available to download.

 https://download.microsoft.com/download/7/1/D/71D86715-5596-4529-9B13-DA13A5DE5B63/ServiceTags_Publi...

 

To find the actual download link programmatically, you can request HTML response for the download link and find href links ending with json.  Below is an example PowerShell console from windows Terminal to parse a HTML response of the original download links and retrieve direct download link to the json.

 

terminal.png

 

For working with static URLs, you can also directly use Azure playbooks which provides friendly UI and step by step process to ingest in AzureSentinel. You can check recent blogpost Using Azure Playbooks to import text-based threat indicators to Azure Sentinel for more details. For complex use-cases where download link is not static, we can automate this via a serverless Azure Function and PowerShell to store this data directly in Azure blob storage. Once these files are stored in blob storage, you can generate a read only shared access link in order to use within KQL queries. You can also ingest these directly in Sentinel as custom log table however dataset is very tiny so we will cover blob storage method in this article. For examples ingestion instructions, check the json ingestion tools dotnet_loganalytics_json_import  , Azure Log Analytics API Clients .

 

Architecture Diagram:

The below diagram explains the data workflow for our use-case. We will feed multiple URLs containing the datasets you want to use as reference tables. In the below example, we are using 4 such datasets. These will be processed via our Azure Function which will parse URLs to find direct download links if required and upload to blob storage.  Once these files are uploaded to blob storage, you can generate read only shared access links to access via native KQL operators such as externaldata. We will also see how to use these operators in queries later part of the article.

 

function-architecture-01.png

 

When to use the Architecture:

Below are certain scenarios when you can consider deploying this into your environment.

  • Centralized Lookup tables with normalized filenames to call in the scheduled KQL queries in Sentinel.
  • When direct URL to download the file is not available and need normalization to use within queries.
  • When you cannot use externaldata operator from KQL queries to access public sites directly but can reach to Azure storage domain. (blob.core.windows.net).
  • When you need to store and access current or archived versions reliably when original source is not available.

 

Deploy the function:

You can deploy the function via two ways either through VS Code or directly via ARM template.

Detailed instructions with both methods can be found on GitHub :

https://github.com/Azure/Azure-Sentinel/blob/master/Tools/UploadToBlobLookupTables/readme.md

 

Generating Shared Access Links from Azure Blob Storage:

In order to access blob storage links, you can generate pre-approved shared access links with read-only permissions.

You can follow the tutorial – Get SAS for a blob Container to generate links for each blob files.

Step -1 : Get Shared Access Signature for the respective File in blob storage.

 

Storageexplorer-SAS-01.png

Step-2: Set desired expiry period and Permissions as Read.

Storageexplorer-SAS-02.png

 

Step 3: Copy the link from the last screen.

Storageexplorer-SAS-03.png

 

 

KQL query with externaldata for Azure IP Ranges:

This query gives us around 5k subnet ranges.

KQL :

let AzureIPRangesPublicCloud = (externaldata(changeNumber:string,cloud:string, values: dynamic)
[@"https://uploadtobloblookuptables.blob.core.windows.net/lookuptables/ServiceTags_Public.json?sv=2019-02-02&st=2020-06-08T17%3A21%3A23Z&se=2020-06-09T17%3A21%3A23Z&sr=b&sp=r&sig=zTATanBOqaDbi2NAQirIMWRJmees2z0CQexk4XQiTb0%3D"]
with (format="multijson")); let AzureSubnetRangelist = AzureIPRangesPublicCloud | mv-expand values | extend platform = parse_json(parse_json(values).properties).platform, systemservice = parse_json(parse_json(values).properties).systemService, region = parse_json(parse_json(values).properties).region, addressPrefixes = parse_json(parse_json(values).properties).addressPrefixes | mv-expand addressPrefixes | project platform, systemservice, region, addressPrefixes;

 

Sample Output:

The above query result into around 5K subnets with contexts such as platform, region and systemservice associated with it. You can selectively pickup from this lookup table depending on how your azure infrastructure. For this blog, we are going to use this table as it is.

 

LookupTable.PNG

 

KQL Query for filtering using Lookup for single datasource:

KQL Query using  single data source to do subnet matching and filter on target/source IP ranges:

 

Datasource:

  • AzureNetworkAnalytics_CL
let AzureIPRangesPublicCloud = (externaldata(changeNumber:string,cloud:string, values: dynamic) 
[@"https://uploadtobloblookuptables.blob.core.windows.net/lookuptables/ServiceTags_Public.json?sv=2019-02-02&st=2020-06-08T17%3A21%3A23Z&se=2020-06-09T17%3A21%3A23Z&sr=b&sp=r&sig=zTATanBOqaDbi2NAQirIMWRJmees2z0CQexk4XQiTb0%3D"] with (format="multijson"));
let AzureSubnetRangelist = AzureIPRangesPublicCloud
| mv-expand values
| extend platform = parse_json(parse_json(values).properties).platform, systemservice = parse_json(parse_json(values).properties).systemService, region = parse_json(parse_json(values).properties).region, addressPrefixes = parse_json(parse_json(values).properties).addressPrefixes
| mv-expand addressPrefixes
| project platform, systemservice, region, addressPrefixes; let lookup = toscalar(AzureSubnetRangelist| summarize l=make_list(addressPrefixes)); let starttime = 1d; let endtime = 1h; let PrivateIPregex = @'^127\.|^10\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[0-1]\.|^192\.168\.'; let AllNSGTrafficLogs = AzureNetworkAnalytics_CL | where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime))) | where SubType_s == "FlowLog" | distinct DestIP_s; let AzureSubnetMatchedIPs=materialize( AllNSGTrafficLogs | mv-apply l=lookup to typeof(string) on ( where ipv4_is_match (DestIP_s, l) ) | project-away l ); AzureNetworkAnalytics_CL | where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime))) | where SubType_s == "FlowLog" | where isnotempty(DestIP_s) | extend DestinationIpType = iff(DestIP_s matches regex PrivateIPregex,"private" ,"public" ) | where DestinationIpType == "public" | where DestIP_s !in ((AzureSubnetMatchedIPs)) | project-reorder TimeGenerated, Type, SrcIP_s, DestIP_s, *

 

Sample Results :

Results showed below are from Traffic NSG logs(AzureNetworkAnalytics_CL) with filtering applied via the lookup table Azure IP ranges in destination IP field.

 

singledatasource.PNG

 

KQL Query for filtering using lookup for multiple datasources:

KQL query using multiple network data sources unioned to do subnet matching and allow listing on target ip ranges, the same can be done for source ip.

DataSources:

  • CommonSecurityLogs
  • VMConnection
  • WireData

 

KQL Query:

let AzureIPRangesPublicCloud = (externaldata(changeNumber:string, cloud:string, values: dynamic)
[@"https://uploadtobloblookuptables.blob.core.windows.net/lookuptables/ServiceTags_Public.json?sv=2019-02-02&st=2020-06-08T17%3A21%3A23Z&se=2020-06-09T17%3A21%3A23Z&sr=b&sp=r&sig=zTATanBOqaDbi2NAQirIMWRJmees2z0CQexk4XQiTb0%3D"]
with (format="multijson"));
let AzureSubnetRangelist = AzureIPRangesPublicCloud
| mv-expand values
| extend addressPrefixes = parse_json(parse_json(values).properties).addressPrefixes
| project addressPrefixes; 
let lookup = toscalar(AzureSubnetRangelist| summarize l=make_set(addressPrefixes));
let starttime = 2h;
let endtime = 1h;
let PrivateIPregex = @'^127\.|^10\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[0-1]\.|^192\.168\.';
let AllDestIPs = materialize(union isfuzzy=true
(
VMConnection
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(DestinationIp) and isnotempty(SourceIp)
| extend DestinationIpType = iff(DestinationIp matches regex PrivateIPregex,"private" ,"public" )
| where DestinationIpType == "public" | extend DeviceVendor = "VMConnection"
| extend DestinationIP = DestinationIp, SourceIP = SourceIp
| distinct DestinationIP
),
(
CommonSecurityLog 
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(DestinationIP) and isnotempty(SourceIP)
| extend DestinationIpType = iff(DestinationIP matches regex PrivateIPregex,"private" ,"public" )
| where DestinationIpType == "public"
| distinct DestinationIP
),
(
WireData 
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(RemoteIP) and isnotempty(LocalIP)
| extend DestinationIpType = iff(RemoteIP matches regex PrivateIPregex,"private" ,"public" )
| where DestinationIpType == "public" 
| extend DestinationIP = RemoteIP , SourceIP = LocalIP | extend DeviceVendor = "WireData"
| distinct DestinationIP
)
);
let AzureSubnetMatchedIPs=materialize(
AllDestIPs
| mv-apply l=lookup to typeof(string) on
(
 where ipv4_is_match (DestinationIP, l)
)
| project-away l
);
let TrafficLogsNonAzure = (union isfuzzy=true
(
VMConnection
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(DestinationIp) and isnotempty(SourceIp)
| extend DestinationIP = DestinationIp, SourceIP = SourceIp
| where DestinationIP !in ((AzureSubnetMatchedIPs))
),
(
CommonSecurityLog 
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(DestinationIP) and isnotempty(SourceIP)
| where DestinationIP !in ((AzureSubnetMatchedIPs))
),
(
WireData 
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| where isnotempty(RemoteIP) and isnotempty(LocalIP)
| extend DestinationIP = RemoteIP , SourceIP = LocalIP | extend DeviceVendor = "WireData"
| where DestinationIP !in ((AzureSubnetMatchedIPs))
)
);
TrafficLogsNonAzure 
| project-away Type, RemoteIP, LocalIP, DestinationIp, SourceIp
| project-reorder TimeGenerated, DeviceVendor, SourceIP, DestinationIP, *

 

Note: The above query checks multiple high volume tables and lookup query will be CPU and memory intensive. It is recommended to run it in smaller intervals (hourly) or split it per datasource to review the results.

 

Sample results:

lookuptable-multidatasource.PNG

 

 

Happy Hunting !! 

 

References:

  • Approximate Partial and combined lookups in Azure Sentinel

https://techcommunity.microsoft.com/t5/azure-sentinel/approximate-partial-and-combined-lookups-in-az...

 

  • Centralized Lookup Tables for enrichment in KQL with Azure Function to upload to blob storage

https://github.com/Azure/Azure-Sentinel/pull/720

 

  • lookup operator

https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/lookupoperator?pivot=azuremonitor

 

  • Externaldata Operator

https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/externaldata-operator?pivot=azuremo...

 

  • materialize function 

https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/materializefunction

5 Comments
Copper Contributor

Thanks for the great information and examples!  Do you know if there is a way and the syntax  for using a variable for the StorageConnectionString rather than a static string "http://...." ? 

Microsoft

Thanks @Secuerskydev . Generally you can try to define variable with let statement but doesn`t seem like it is supported with externaldata.

 

Could you tell us your use-case about need to use variable with externaldata and we can check if there are alternate options to do that.

 

Here are some examples if you wanted to check of using let statement.

https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/letstatement

Copper Contributor

@Ashwin_Patil thank you for the quick response.   I haven't been able to get let/variable to work for externaldata's StorageConnectionString.

 

We "deployed" external data table to enrich IPaddress by joining external tables (via externaldata) for more static data such as country, isp, domain, proxy type,  domain/proxy usage/reputation, etc.  Our analyst/clients are looking for a similar use case to enrich more dynamic data such as IP and URL reputation data within kql queries for dashboards/workbooks and hunting queries.   Our reputation and intel sources (VirusTotal, etc.) and tools (ThreatQ, etc.) are API based and usually require parameters such as IPs such as http://ip-api.com/csv/24.48.0.1fields=status,country,countryCode,regionName,city,zip,lat,lon,isp,org...

 

We do use jupyter notebook but the team would like the ability to do "inline" queries as well.   

 

Thanks! 

Microsoft

Hi @Secuerskydev  , I have checked internally with kql team and it seems this is not supported yet. externaldata() operator expects its argument to be a scalar string so we can`t parameterize it via regular functions. You can post this as an idea on LA feedbacks if you wish for further cosideration.

 

I can think the possible workaround for this use case will be to store entire offline databases in storage blob when available and filter or join with it to enrich data via inline kql queries. or use logic app/notebooks to do on-demand enrichment by querying the API.

Copper Contributor

Thanks you for checking with the kql team!   Unfortunately these API sources are metered 3rd party sources that change frequently.  I will add it as an idea.   

@Ashwin_Patil Thanks again! 

Version history
Last update:
‎Nov 02 2021 05:58 PM
Updated by: