Introduction
My name is Win Yan, a management associate with NCS Pte Ltd, a long-term partner of Microsoft. NCS is a leading technology services firm, operating across Asia Pacific, providing services and solutions in consulting, digital, technology, cybersecurity and more.
In this blog post, I would like to share about our work done to implement an end-to-end integration of our machine learning model with Microsoft Sentinel using its Bring Your Own Machine Learning (BYO-ML) platform. Specifically, our team from the NUS-NCS Joint Laboratory for Cyber Security have developed an algorithm aimed at mitigating the challenge of alert fatigue. We have successfully incorporated this solution into Microsoft Sentinel, facilitating the delivery of our solution to our clients, while harnessing the power and flexibility of cloud-based capabilities.
With the increase in cybersecurity alerts and threats, Security Operation Centre (SOC) teams in many organisations are experiencing ‘alert fatigue’. Analyst are faced with an overwhelming number of alerts to contend with, leading to potential desensitisation, slower response times, and an increased risk of overlooked alerts. To mitigate this problem, our team has developed a graph-based machine learning algorithm. This solution aims to prioritise alerts and detect cyber-attacks, such as lateral movement, by forming network communication subgraphs. This significantly reduces the number of alerts that a SOC team analyses, effectively alleviating the problem of alert fatigue.
While this algorithm can be used in a broad variety of use cases to prioritise security alerts, we have designed this algorithm for SOC tasks focusing on a use case driven by the convergence of Information Technology (IT) and Operational Technology (OT) networks. Our goal is to enhance visibility of the security posture of an organisation by analysing both IT and OT security alerts.
With the objective of bringing our custom designed ML model to our clients, we explored the possibilities offered by integrated Azure services. With the assistance of the MS Sentinel notebook team, we were able to tap into various platforms to make this possible. The process of integration is as described below.
Video Demo
Integration Process
Our end-to-end integration process is broken down into three main phases, beginning with collecting text logs to Log Analytics Workspace (LAW), followed by loading these alerts from storage to Azure Machine Learning (AML) and finally publishing the results in Microsoft Sentinel.
Our data pipeline and architecture are laid out as follows:
The entire process is run on the AML platform and can be launched directly from Sentinel notebooks.
1. Collecting text logs with Azure Monitor Agent
As our algorithm takes in fused IT-OT alerts as input, these alerts are first collected as text logs with Azure Monitor Agent (AMA). We achieve this by creating a custom table, data collection rule, and endpoint. Once the custom logs are received in LAW, they are then exported from LAW to Azure Data Lake Storage (ADLS) via data export.
2. Loading data from ADLS to AML
To integrate our ML model with the MS Sentinel platform, we utilized the BYOML package. Due to limitations in loading our ML model in Synapse Analytics, we rely on both Synapse Spark compute for data wrangling and results publishing and AML compute instance for model loading. All these operations were performed using AML notebook.
Our pipeline construction involved these steps:
a. Data Wrangling with Apache Spark in Azure Machine Learning
Attach your Synapse Spark compute for data wrangling with Apache Spark in AML notebook to access a Spark cluster with required BYOML packages uploaded.
OR Configure Azure ML and Azure Synapse Analytics [deprecated]
If you have created an Azure Synapse workspace before October 2022, you can continue configuring the Synapse workspace to Spark pool as a compute target for Azure ML Notebooks. By creating a linked service between AML and Synapse, it allows us to run the notebook entirely from the AML platform. To hop in and out of your synapse session, ‘%%synapse’ command is required.
b. Loading specified dataset with Spark pool in AML
As the data exported to ADLS is stored in 5-minute buckets, using the azure_sentinel_utilities package, we can load the desired dataset from a certain start time to end time, save it as a single file back to ADLS and finally downloading it in AML workspace for the analysis.
I. Loading selected data from ADLS on Synapse compute
Following code adapted from Azure Sentinel Notebook: Masquerading Process Name Anomaly Algorithm.
#Log Analytics WorkSpace (Sentinel) to write the results
workspaceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsId') # wks_guid
workspaceSharedKey = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsSharedKey')
workspaceResourceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsResourceId') # eg: /subscriptions/<sub_guid>/resourcegroups/<rg_name>/providers/microsoft.operationalinsights/work
#extract storage account and key from connection string
connectionString = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'saConnectionString')
print("Connection String to your storage account is : ", connectionString)
keyPattern = 'DefaultEndpointsProtocol=(\w+);AccountName=(\w+);AccountKey=([^;]+);'
match = re.match(keyPattern, connectionString)
storageAccount = match.group(2)
storageKey = match.group(3)
print("Storage Account is : ", storageAccount)
print("Storage Key is : ", storageKey)
containerName = "CONTAINER_NAME"
basePath = "WorkspaceResourceId={workspaceResourceId}".format(workspaceResourceId=workspaceResourceId)
print("BasePath is : ", basePath)
startTime = dt.datetime.now() - dt.timedelta(days=1)
endTime = dt.datetime.now() - dt.timedelta(days=0)
startTimeStr = startTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("Start Time of Algo run is : ", startTime)
endTimeStr = endTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("End Time of Algo run is : ", endTime)
def logs_schema():
return T.StructType([
T.StructField(name = "TimeGenerated", dataType = T.StringType(), nullable = True),
T.StructField(name = "RawData", dataType = T.StringType(), nullable = True),
T.StructField(name = "Type", dataType = T.StringType(), nullable = True),
T.StructField(name = "TenentID", dataType = T.StringType(), nullable = True),
])
blobManager = storage_blob_manager(connectionString)
raw_df = blobManager.get_raw_df(startTime, endTime, containerName, basePath, security_event_schema(), blobManager.get_blob_service_client(connectionString))
raw_df.show()
print("There are ", raw_df.count(), "logs to process.")
II. Upload compiled dataset to storage
Documentation on the relevant library can be found here.
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
storage_connection_string = "CONNECTION_STRING"
container_name = "CONTAINER_NAME"
filename = "FILE_NAME"
blob_client = BlobClient.from_connection_string(storage_connection_string,container_name,filename+".json")
blob_client.upload_blob(raw_df, overwrite = True)
III. Load dataset from ALDS outside of spark pool
Data compiled is loaded from ADLS and ran on AML compute, outside of Synapse session. Similar work has also been documented here and this is also demonstrated in Azure Sentinel Notebook: Guided Hunting - Detect Low and Slow Password Sprays Using Machine Learning.
def initialize_storage_account(storage_account_name, storage_account_key):
try:
global service_client
service_client = DataLakeServiceClient(
account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name
),
credential=storage_account_key,
)
except Exception as e:
print(e)
def list_directory_contents(container_name, input_path, file_type):
try:
file_system_client = service_client.get_file_system_client(
file_system=container_name
)
paths = file_system_client.get_paths(path=input_path)
pathlist = []
for path in paths:
pathlist.append(path.name) if path.name.endswith(file_type) else pathlist
return pathlist
except Exception as e:
print(e)
def download_file_from_directory(container_name, input_path, input_file):
try:
file_system_client = service_client.get_file_system_client(
file_system=container_name
)
directory_client = file_system_client.get_directory_client(input_path)
local_file = open("output.json", "wb")
file_client = directory_client.get_file_client(input_file)
download = file_client.download_file()
downloaded_bytes = download.readall()
local_file.write(downloaded_bytes)
local_file.close()
except Exception as e:
print(e)
def json_normalize(input_file, output_file):
nwbeaconList = []
with open(input_file) as f:
for jsonObj in f:
nwbeaconDict = json.loads(jsonObj)
nwbeaconList.append(nwbeaconDict)
with open(output_file, "w") as write_file:
json.dump(nwbeaconList, write_file)
# Primary storage info
account_name = '<storage account name>' # fill in your primary account name
container_name = '<container name>' # fill in your container name
subscription_id = '<subscription id>' # fill in your subscription id
resource_group = '<resource group>' # fill in your resource groups for ADLS
workspace_name = '<Microsoft sentinel/log analytics workspace name>' # fill in your workspace name
Input_path = f"WorkspaceResourceId=/subscriptions/{subscription_id}/resourcegroups/{resource_group}/providers/microsoft.operationalinsights/workspaces/{workspace_name}"
adls_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{input_path}/{workspace_name}"
dir_name = "<dir-name>/" #Replace the dirname previously specified to store results from spark
account_key = "<storage-account-key>" # Replace your storage account key
new_path = input_path + dir_name
initialize_storage_account(account_name, account_key)
pathlist = list_directory_contents(container_name, new_path, "json")
input_file = pathlist[0].split("/")[-1]
download_file_from_directory(container_name, new_path, input_file)
json_normalize("output.json", "out_normalized.json")
IV. Graph-based model analysis
With the dataset successfully loaded, we can analysis these large number of alerts with our model, prioritise them in the form of tabular subgraphs as output that will be later visualised.
3. Results publishing back to MS Sentinel
The output from our model is sent back to MS Sentinel to allow for a closed loop end-to-end integration. This allows analysts to leverage both the MS Sentinel platform and our graph analytics model to conduct their SOC alerts analysis. These results are published back to MS Sentinel using MSTICPy, where tabular output data is uploaded from our notebook as a custom table to MS Sentinel using native MSTICPy data uploader. The documentation for the installation of MSTICPy for Managed Spark compute in AML Notebooks can be found here.
Alternatively, results can be published using a custom function as shown (adapted from Azure Sentinel Notebook: Masquerading Process Name Anomaly Algorithm).
def send_results_to_log_analytics(df_to_la):
# The log type is the name of the event that is being submitted. This will show up under "Custom Logs" as log_type + '_CL'
log_type = 'LOG_NAME'
# concatenate columns to form one json record
json_records = df_to_la.withColumn('json_field', F.concat(F.lit('{'),
F.lit(' \"COLUMN_NAME\": \"'), F.lit('\",'),
F.lit('}')
)
)
# combine json record column to create the array
json_body = json_records.agg(F.concat_ws(", ", F.collect_list('json_field')).alias('body'))
if len(json_body.first()) > 0:
json_payload = json_body.first()['body']
json_payload = '[' + json_payload + ']'
payload = json_payload.encode('utf-8')
return log_analytics_client(workspaceId, workspaceSharedKey).post_data(payload, log_type)
else:
return "No json data to send to LA"
print("Sending results to LogAnalytics")
print("Sending ", df.count(), " results to Log Analytics")
send_results_to_log_analytics(df)
print("Done")
Azure Workbook Graph Visualisation
Finally, the network communication graphs are visualised using Azure Workbook through querying the logs ingested and returned back to Sentinel.
The overall network graph above is generated from sample logs, where blue and orange nodes represent the IT and OT domains respectively.
Evidently, despite taking the step further to visualise the logs in the form of a network graph, it can be extremely difficult to highlight or draw any inference from it directly. Furthermore, this network graph is only a simplified representation of the converged IT and OT domains. We can imagine that in reality, the IT/OT converged network system would be larger and more complex.
By running these logs through our custom graph analytics model, the model prioritises the input logs and they are visualised as subgraphs. These subgraphs highlights potential cyberattack(s), hence allowing analyst to go through numerous logs quickly, addressing the problem of alert fatigue.
Three examples of these subgraphs are displayed below.
Summary
In this post, we have given an overview of how NCS has integrated our customized ML model with MS Sentinel to better serve our clients’ business needs. This was made possible with Sentinel’s BYOML package and other Azure platforms, enabling us to implement our solution to address the problem of alert fatigue among SOC teams.
We hope this post has provided useful insights, and we look forward to leveraging new capabilities within Azure to better serve our clients.
Special thanks to Dr Nay Oo (@drnay2430) and Dr Lim Hoon Wei (@hoonwei) from NCS for their invaluable support and guidance. I would also like to express my appreciation to @Chi_Nguyen from Microsoft Sentinel for initiating, supporting, and reviewing this blog. My thanks extend to @Ashwin_Patil , also from Microsoft Sentinel for his insightful feedback, and @JulianGonzalez for his significant contribution in bringing this blog post to fruition.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.