Automating Security Operations Using Windows Defender ATP APIs with Python and Jupyter Notebooks
Introduction
Windows Defender ATP provides a great hunting experience out of the box. Analysts can search over process events, network events, logons, registry activity, and more. In this post, I will show how analysts can go even further by using the WDATP APIs. If any of the following scenarios are interesting to your security operations team, then this post is for you:
- Automate common investigative steps such as gathering additional host and network details, triggering host actions like collecting an investigative package, or running anti-virus scans.
- Query WDATP collected data about machines, users, domains, IPs, and files, enrich it with 3rd party services such as VirusTotal, and visualize the results.
- Perform hunting tasks by searching across the rich data collected by WDATP to find new malicious activity.
Normally when one hears the word “APIs”, one thinks of a developer feature that requires compilers and SDKs. This post will show two features that make these APIs approachable to SOC analysts—Python support and Jupyter Notebooks. Even though this post focuses on Python, you can call the APIs from PowerShell, C#, or any language that can call REST endpoints.
Getting Started
The first step is deciding on how you want to access the APIs. WDATP uses standard OAuth2.0 authentication. To get started, perform the following steps:
- Register an application with Azure Active Directory. This allows you to assign the desired permissions to the app. You can create apps that only read alerts, run advanced queries, collect forensics, or many other combinations of permissions.
- Decide on your access model: without a user (like a background service) or on behalf of a user. Following the steps in those links will provide you with an access token to call the APIs.
- Use the token to call Windows Defender ATP APIs.
API Functionality
A list of the exposed APIs can be found here. In this post, we’ll focus on running some advanced hunting queries, and then calling the API to get further information on machines and users.
Advanced Hunting |
Run queries from API. |
Alerts |
Run API calls such as get alerts, alert information by ID, alert related actor information, alert related IP information, and alert related machine information. |
Domain |
Run API calls such as get domain related machines, domain related machines, statistics, and check if a domain is seen in your organization. |
File |
Run API calls such as get file information, file related alerts, file related machines, and file statistics. |
IP |
Run API calls such as get IP related alerts, IP related machines, IP statistics, and check if and IP is seen in your organization. |
Machines |
Run API calls such as find machine information by IP, get machines, get machines by ID, information about logged on users, and alerts related to a given machine ID. |
User |
Run API calls such as get alert related user information, user information, user related alerts, and user related machines. |
Authenticate to WDATP
After registering your app, you have all you need to call the APIs: your WDATP Tenant ID, an App Id and App Secret created during registration. Here is the code to authenticate to the WDATP endpoint:
import json import urllib.request import urllib.parse def wdatp_get_AAD_token(tenantId, appId, appSecret): url = "https://login.windows.net/%s/oauth2/token" % (tenantId)
resourceAppIdUri = 'https://api.securitycenter.windows.com'
body = { 'resource' : resourceAppIdUri, 'client_id' : appId, 'client_secret' : appSecret, 'grant_type' : 'client_credentials' }
data = urllib.parse.urlencode(body).encode("utf-8") req = urllib.request.Request(url, data) response = urllib.request.urlopen(req) jsonResponse = json.loads(response.read()) aadToken = jsonResponse["access_token"] return aadToken |
Get Alerts
This code shows calling the API to get recent alerts.
def wdatp_get_alerts(aadToken): url = "https://api.securitycenter.windows.com/api/alerts" headers = { 'Content-Type' : 'application/json', 'Accept' : 'application/json', 'Authorization' : "Bearer " + aadToken }
req = urllib.request.Request(url, headers=headers) response = urllib.request.urlopen(req) jsonResponse = json.loads(response.read()) return jsonResponse["value"] |
Now that you know the basics of getting up and going, let’s walk through a few different scenarios. To make it easy to repeat these scenarios, we will use a Jupyter notebook to encapsulate them. There is nothing about the APIs that requires Jupyter, but you may find it to be a handy tool when working with Python and the APIs.
Introducing Jupyter Notebooks
If you’re familiar with Jupyter Notebooks, you can skip this paragraph. If not, Jupyter Notebooks are an open source project designed to make interactive computing and sharing re-usable analysis easier. There are over 3 million notebooks shared on GitHub. There is an annual conference and support for both local and cloud versions by every major cloud provider. While many use cases for notebooks surround data science and machine learning, they are an excellent infosec investigative tool for exploration, visualization, and analysis. Let’s jump right in! If you want to follow along, download the notebook from the WDATP GitHub: WDATP API Jupyter Notebook.
Jupyter Basics
Place the notebook you downloaded from GitHub in the notebook directory. You can find the default notebook location by looking at the output from launching Jupyter:
> C:\Anaconda3\Scripts\jupyter.exe notebook --notebook-dir c:\home\jupyter [I 09:29:34.126 NotebookApp] Serving notebooks from local directory: c:\home\jupyter [I 09:29:34.126 NotebookApp] 0 active kernels [C 09:29:34.126 NotebookApp]
Copy/paste this URL into your browser when you connect for the first time, to login with a token: |
To orient first time users, here are the basics. A notebook is composed of input and output cells. In the example below:
- Input line #2 shows printing “hello world”. To execute a cell, put the cursor in the cell and hit Shift+Enter. This will run the cell and move the input focus to the next input cell.
- Line #3 shows performing a calculation. This is a Python 3 notebook, so it is the “Python kernel” that is doing the work.
- Line #4 shows a feature of interactive Python. Python stores the output of the previous command in an automatic variable named underscore (‘_’)
- You can execute shell commands by prefixing them with a bang (‘!’)
- Jupyter has a way to provide convenience commands through a feature called “magics”. You can invoke a magic by prefixing it with % or %%. The timeit magic runs the command in the cell several times and provides an average time.
Installing the sample from GitHub
To install the sample code, run the following cell by hitting Shift+Enter.
Calling the APIs
Initialize the WDATP endpoint by passing in your tenant Id, and app information. Then query for the top 5 alerts. The notebook will automatically format the results in a HTML table.
wdatp_api = WDATP(tenantId, appId, appSecret, vt_api_key)
wdatp_api.alerts(filterstr = "$top=5")
|
|
Getting Alerts
Let’s start by getting some alerts. You can use the standard OData syntax when querying some of the WDATP entities.
By default, this sample saves the results from API calls in a convenient tabular data structure called a Pandas DataFrame. Pandas is an open source package tailored for working with structured data.
# Let's select just the columns we want to view using OData wdatp_api.alerts(filterstr = "$select=alertCreationTime,category,title,description,id,severity&$filter=Severity eq 'High'&$top=5")
# let's assign the API results to a variable so we can process them without making an API call each time df = wdatp_api.alerts(filterstr = "$top=100") print("number of rows returned = %d" % len(df))
# show all the columns in the alert df.columns
# Let's select just the columns we want to view. This uses the Pandas syntax for selecting columns
df[['alertCreationTime','category','description','id','severity']].head(5) |
|
To show the power of using Python and open source packages like Pandas, we’ll show how to do a simple count of alert frequency and render the results in a pie chart. This also shows that the notebook can include pictures and not just text output.
# let's group them by title and count their frequency df['title'] = df['title'].apply(lambda t: 'Windows Defender AV detection' if t.startswith('Windows Defender AV detected ') else t) df1 = df[['id','title']].groupby('title').count().rename(index=str, columns={"id":"Alert Count"}) df1 = df1.sort_values(['Alert Count'], ascending=False) df1 |
# show a quick pie chart using matplotlib %matplotlib inline plt = df1.plot.pie(y=0, figsize=(5, 5), legend=False, autopct='%1.0f%%') |
The Role of Magics
There is a time and place for code, but sometimes you want a simpler way to access data. The API wrapper in this sample has several Jupyter magics created for just this purpose. Magics are just shortcuts to invoking code on your data. Use the %wdatp_alert magic with a valid Alert ID from your WDATP data. The below command also shows assigning the alert data to a variable (alert_df) and then rendering it in a vertical view by invoking the Pandas transpose function on it (alert_df.T).
%wdatp_alert 636740134254933385_-1113968221 |
# output from the previous command is stored in an automatic variable, the understore _ alert_df = _ alert_df.T # Transpose rows and columns for an easy way to view the alert details |
|
The below example shows invoking the wdapt_ip magic. There are two kinds of Jupyter magics. Line magics start with a single % and operate on the input right after the magic name. The %wdapt_alert command we just invoked is an example of that. By prefixing a magic with two % signs, it is invoked as a cell magic and operates on the entire contents of the cell. One feature of %%wdatp_ip is that it will scan through the cell text and use a regular expression to find any data that resembles an IPV4 address. This is handy where you might have an email or webpage with the IPs you need to query. While you can copy and paste them individually, it is sometimes faster to just grab the entire paragraph containing them, paste them into a cell, and the magic will do the work to extract them properly.
%%wdatp_ip Let's check for any communication to some IPs. I can include any text here I want because the Jupyter magic for ip will regex extract IPv4 addresses and look them up 52.239.151.138 65.52.108.90 end of list of IPs |
|
Using the Advanced Hunting API
WDATP collects a rich set of data from endpoints and makes it available for hunting in the portal. You can also perform hunting queries with the API. This is a powerful feature that allows you to build automation around hunting scenarios.
In the example below, we get the most recent alert and then construct a query to get relevant records around the time of the alert from process creation events, network activity, and file creation activity. One could build on this to create triage scripts that automate common steps for responding to alerts.
#get the most recent alert alert_df = wdatp_api.alerts(filterstr= '$top=1')
print("title: %s\ncategory: %s\nalert id: %s" % (alert_df.title.values[0], alert_df.category.values[0], alert_df.id.values[0]))
# build a hunting query that gets data around the time of the alert get_records_qry = ''' let alertId = "%s"; let alert = AlertEvents | where AlertId == alertId | summarize AlertFirstEventTime=min(EventTime) by MachineId; let machineId = toscalar(alert | project MachineId); let timestamp = toscalar(alert | project AlertFirstEventTime); let lookupPeriod = 10m; find in (ProcessCreationEvents, NetworkCommunicationEvents, FileCreationEvents) where EventTime between ((timestamp - lookupPeriod) .. lookupPeriod) and MachineId == machineId | take 1000 ''' % (alert_df.id.values[0])
related_data_df = wdatp_api.query(query=get_records_qry) print ("Number of records found %d" % len(related_data_df)) related_data_df |
Visualizing Data
In the example below we have a query that searches for machines and their associated outbound communication patterns. It then renders the machines and the IP ranges they communicate with in a Chord diagram. This kind of analysis might be handy to take a population of machines that should be communicating similarly because they are all the same role (for example Domain Controllers) and spot outliers.
The code uses the holoviews library to do the visualization.
# this query gets a handful of machines named desktop query = ''' let machine_list = MachineInfo | where EventTime > ago(1d) | where ComputerName startswith 'desktop-' | summarize by ComputerName, MachineId | take 5; NetworkCommunicationEvents | where EventTime > ago(5d) | where isnotempty(MachineId) | where RemoteIPType == 'Public' and RemoteIP contains ('.') and RemotePort in ('80') | extend MaskedIP = strcat(split(RemoteIP,'.')[0],'.', split(RemoteIP,'.')[1], '.*.*') | summarize by MachineId, MaskedIP | join kind=inner (machine_list) on MachineId | summarize by ComputerName, MaskedIP ''' df = wdatp_api.query(query) print ("rows returned = %d" % len(df))
## this does some footwork for holoviews to allow us to format the colors, nodes, and edges.
import holoviews as hv hv.extension('bokeh')
def holoview_charting_objects(df): g1 = list(df[df.columns[0]].astype(str).unique()) g2 = list(df[df.columns[1]].astype(str).unique()) g1d = list(map(lambda x: {'name': x, 'group':1}, g1)) g2d = list(map(lambda x: {'name': x, 'group':2}, g2)) nodes = hv.Dataset(pd.DataFrame(g1d + g2d), 'index')
src=list(map(lambda x: nodes.data[nodes.data['name'] == x]['index'].values[0], list(df[df.columns[0]].values))) dst = list(map(lambda x: nodes.data[nodes.data['name'] == x]['index'].values[0], list(df[df.columns[1]].values))) df2 = pd.DataFrame(list(zip(src,dst)), columns=['source','dest']) return (df2, nodes)
%opts Chord [width=800 height=800] %opts Chord [label_index='name' color_index='index' edge_color_index='source'] %opts Chord (cmap='Category20' edge_cmap='Category20')
import holoviews as hv hv.Chord(holoview_charting_objects(df)) |
|
Holoviews has a wide variety of visualization options. We can render the same data in a Scatter plot. This allows one to quickly identify netblocks that are commonly communicated to by many machines and also see IP blocks that have only one or two machines communicating with them.
# we can also render the same data in a Scatter plot
%opts Scatter [width=800 height=800] (size=10) import holoviews as hv hv.extension('bokeh') hv.Scatter(df).options(show_grid=True) |
Enriching WDATP data by calling other APIs and services
In this section we will show how you can call other APIs to enrich WDATP data--in this case VirusTotal (VT). If you don’t have a VT API key, you can sign up for one for free here. You pass in your VT API key to the sample by creating the WDATP endpoint as follows
wdatp_api = WDATP(tenantId, appId, appSecret, vt_api_key= '<YOUR VT_API_KEY>') |
In this example, we query WDATP for hashes of files to gain additional context. The query collects SHA1 file hashes from WDATP Alerts. By querying what VT knows about the hashes, we may gain additional insight.
Next, we’ll take any file hashes where VT had results and query WDATP data to enrich it. The below shows querying the files API to get global prevalence information and combining the results with a link to the VirusTotal webpage for the file.
## query WDATP data for hashes of EXE files created in the user's AppData folder. Choose 50 of them at random
df_results = wdatp_api.query(query = ''' AlertEvents | where EventTime > ago(1d) | summarize by SHA1 | extend randsortorder=rand() | sort by randsortorder | take 50 ''' ) print("Number of hashes = %d" % len(df_results)) |
## query VirusTotal to see if anyone has submitted them for scanning before. ## List any results with at least one AV engine positive result
vtdf = wdatp_api.vtresults(list(df_results.SHA1)).fillna('') vt_pos = vtdf.loc[vtdf['response_code'] == 1].loc[vtdf['positives'] != 0] vt_pos |
## look up each positive hit in WDATP data and enrich results with WDATP information
wdatp_file_results = [] df_all = pd.DataFrame() for sha1 in list(vt_pos['sha1']): df_file = wdatp_api.files(filehash = sha1) if len(df_file) > 0: wdatp_file_results.append(df_file) if len(wdatp_file_results) > 0: df_all = pd.concat(wdatp_file_results) df_all = df_all.merge(vt_pos, right_on = 'sha1', left_on = 'sha1') df_all |
In closing
In this post we walked through how to get started with the WDATP APIs. Because the APIs are exposed as REST endpoints they are simple to call using Python or other languages. The Jupyter Notebook is a useful open source package that makes it easy to interact with data and save the results in a reusable notebook that is easy to share with others. We look forward to what you’ll do with the APIs and the notebooks you’ll create!
The author would like to thank Eric Hutchins (@killchain) for introducing him to Jupyter notebooks.
When evaluating various solutions, your peers value hearing from people like you who’ve used the product. Review Defender for Endpoint by filling out a Gartner Peer Insights survey and receive a $25 USD gift card (for customers only). Microsoft Privacy Statement