Blog Post

Microsoft Defender for Endpoint Blog
11 MIN READ

Automating Security Operations Using Windows Defender ATP APIs with Python and Jupyter Notebooks

John_Lambert_MSTIC's avatar
Dec 04, 2018

Automating Security Operations Using Windows Defender ATP APIs with Python and Jupyter Notebooks

Introduction

Windows Defender ATP provides a great hunting experience out of the box. Analysts can search over process events, network events, logons, registry activity, and more.  In this post, I will show how analysts can go even further by using the WDATP APIs. If any of the following scenarios are interesting to your security operations team, then this post is for you:

  • Automate common investigative steps such as gathering additional host and network details, triggering host actions like collecting an investigative package, or running anti-virus scans.
  • Query WDATP collected data about machines, users, domains, IPs, and files, enrich it with 3rd party services such as VirusTotal, and visualize the results.
  • Perform hunting tasks by searching across the rich data collected by WDATP to find new malicious activity.

Normally when one hears the word “APIs”, one thinks of a developer feature that requires compilers and SDKs. This post will show two features that make these APIs approachable to SOC analysts—Python support and Jupyter Notebooks.  Even though this post focuses on Python, you can call the APIs from PowerShell, C#, or any language that can call REST endpoints.

Getting Started

The first step is deciding on how you want to access the APIs. WDATP uses standard OAuth2.0 authentication.  To get started, perform the following steps:

  • Register an application with Azure Active Directory. This allows you to assign the desired permissions to the app. You can create apps that only read alerts, run advanced queries, collect forensics, or many other combinations of permissions.
  • Decide on your access model: without a user (like a background service) or on behalf of a user. Following the steps in those links will provide you with an access token to call the APIs.
  • Use the token to call Windows Defender ATP APIs.

API Functionality

A list of the exposed APIs can be found here. In this post, we’ll focus on running some advanced hunting queries, and then calling the API to get further information on machines and users.

Advanced Hunting

Run queries from API.

Alerts

Run API calls such as get alerts, alert information by ID, alert related actor information, alert related IP information, and alert related machine information.

Domain

Run API calls such as get domain related machines, domain related machines, statistics, and check if a domain is seen in your organization.

File

Run API calls such as get file information, file related alerts, file related machines, and file statistics.

IP

Run API calls such as get IP related alerts, IP related machines, IP statistics, and check if and IP is seen in your organization.

Machines

Run API calls such as find machine information by IP, get machines, get machines by ID, information about logged on users, and alerts related to a given machine ID.

User

Run API calls such as get alert related user information, user information, user related alerts, and user related machines.

 

Authenticate to WDATP

After registering your app, you have all you need to call the APIs: your WDATP Tenant ID, an App Id and App Secret created during registration.  Here is the code to authenticate to the WDATP endpoint:

import json

import urllib.request

import urllib.parse

def wdatp_get_AAD_token(tenantId, appId, appSecret):

    url = "https://login.windows.net/%s/oauth2/token" % (tenantId)

 

    resourceAppIdUri = 'https://api.securitycenter.windows.com'

 

    body = {

        'resource' : resourceAppIdUri,

        'client_id' : appId,

        'client_secret' : appSecret,

        'grant_type' : 'client_credentials'

    }

 

    data = urllib.parse.urlencode(body).encode("utf-8")

    req = urllib.request.Request(url, data)

    response = urllib.request.urlopen(req)

    jsonResponse = json.loads(response.read())

    aadToken = jsonResponse["access_token"]

    return aadToken

Get Alerts

This code shows calling the API to get recent alerts.

def wdatp_get_alerts(aadToken):

    url = "https://api.securitycenter.windows.com/api/alerts"

    headers = {

        'Content-Type' : 'application/json',

        'Accept' : 'application/json',

        'Authorization' : "Bearer " + aadToken

    }

 

    req = urllib.request.Request(url, headers=headers)

    response = urllib.request.urlopen(req)

    jsonResponse = json.loads(response.read())

    return jsonResponse["value"]

Now that you know the basics of getting up and going, let’s walk through a few different scenarios. To make it easy to repeat these scenarios, we will use a Jupyter notebook to encapsulate them. There is nothing about the APIs that requires Jupyter, but you may find it to be a handy tool when working with Python and the APIs.

Introducing Jupyter Notebooks

If you’re familiar with Jupyter Notebooks, you can skip this paragraph.  If not, Jupyter Notebooks are an open source project designed to make interactive computing and sharing re-usable analysis easier. There are over 3 million notebooks shared on GitHub. There is an annual conference and support for both local and cloud versions by every major cloud provider. While many use cases for notebooks surround data science and machine learning, they are an excellent infosec investigative tool for exploration, visualization, and analysis. Let’s jump right in!  If you want to follow along, download the notebook from the WDATP GitHub: WDATP API Jupyter Notebook.

Jupyter Basics

Place the notebook you downloaded from GitHub in the notebook directory. You can find the default notebook location by looking at the output from launching Jupyter:

> C:\Anaconda3\Scripts\jupyter.exe notebook --notebook-dir c:\home\jupyter

[I 09:29:34.126 NotebookApp] Serving notebooks from local directory: c:\home\jupyter

[I 09:29:34.126 NotebookApp] 0 active kernels

[C 09:29:34.126 NotebookApp]

 

    Copy/paste this URL into your browser when you connect for the first time,

    to login with a token:

        http://localhost:8888/?token=<access token>

To orient first time users, here are the basics.  A notebook is composed of input and output cells.  In the example below:

  • Input line #2 shows printing “hello world”. To execute a cell, put the cursor in the cell and hit Shift+Enter. This will run the cell and move the input focus to the next input cell.
  • Line #3 shows performing a calculation. This is a Python 3 notebook, so it is the “Python kernel” that is doing the work.
  • Line #4 shows a feature of interactive Python. Python stores the output of the previous command in an automatic variable named underscore (‘_’)
  • You can execute shell commands by prefixing them with a bang (‘!’)
  • Jupyter has a way to provide convenience commands through a feature called “magics”. You can invoke a magic by prefixing it with % or %%. The timeit magic runs the command in the cell several times and provides an average time.

 Jupyter basicsInstalling the sample from GitHub

To install the sample code, run the following cell by hitting Shift+Enter.

Running the WDATP wrapper sample

Calling the APIs

Initialize the WDATP endpoint by passing in your tenant Id, and app information. Then query for the top 5 alerts. The notebook will automatically format the results in a HTML table.

wdatp_api = WDATP(tenantId, appId, appSecret, vt_api_key)

 

wdatp_api.alerts(filterstr = "$top=5")

 

Instantiating the WDATP API endpoint

Getting Alerts

Let’s start by getting some alerts.   You can use the standard OData syntax when querying some of the WDATP entities.

By default, this sample saves the results from API calls in a convenient tabular data structure called a Pandas DataFrame. Pandas is an open source package tailored for working with structured data.

# Let's select just the columns we want to view using OData

wdatp_api.alerts(filterstr = "$select=alertCreationTime,category,title,description,id,severity&$filter=Severity eq 'High'&$top=5")

 

# let's assign the API results to a variable so we can process them without making an API call each time

df = wdatp_api.alerts(filterstr = "$top=100")

print("number of rows returned = %d" % len(df))

 

# show all the columns in the alert

df.columns

 

# Let's select just the columns we want to view. This uses the Pandas syntax for selecting columns

 

df[['alertCreationTime','category','description','id','severity']].head(5)

Alerts to a data frameInteracting with a data frame

To show the power of using Python and open source packages like Pandas, we’ll show how to do a simple count of alert frequency and render the results in a pie chart. This also shows that the notebook can include pictures and not just text output.

# let's group them by title and count their frequency

df['title'] = df['title'].apply(lambda t: 'Windows Defender AV detection' if t.startswith('Windows Defender AV detected ') else t)

df1 = df[['id','title']].groupby('title').count().rename(index=str, columns={"id":"Alert Count"})

df1 = df1.sort_values(['Alert Count'], ascending=False)

df1

# show a quick pie chart using matplotlib

%matplotlib inline

plt = df1.plot.pie(y=0, figsize=(5, 5), legend=False, autopct='%1.0f%%')Dataframe manipulation

 

The Role of Magics

There is a time and place for code, but sometimes you want a simpler way to access data. The API wrapper in this sample has several Jupyter magics created for just this purpose. Magics are just shortcuts to invoking code on your data.  Use the %wdatp_alert magic with a valid Alert ID from your WDATP data. The below command also shows assigning the alert data to a variable (alert_df) and then rendering it in a vertical view by invoking the Pandas transpose function on it (alert_df.T).

%wdatp_alert 636740134254933385_-1113968221

# output from the previous command is stored in an automatic variable, the understore _

alert_df = _

alert_df.T  # Transpose rows and columns for an easy way to view the alert details

Jupyter WDATP Magics

The below example shows invoking the wdapt_ip magic. There are two kinds of Jupyter magics. Line magics start with a single % and operate on the input right after the magic name. The %wdapt_alert command we just invoked is an example of that.  By prefixing a magic with two % signs, it is invoked as a cell magic and operates on the entire contents of the cell.  One feature of %%wdatp_ip is that it will scan through the cell text and use a regular expression to find any data that resembles an IPV4 address. This is handy where you might have an email or webpage with the IPs you need to query. While you can copy and paste them individually, it is sometimes faster to just grab the entire paragraph containing them, paste them into a cell, and the magic will do the work to extract them properly.

%%wdatp_ip 

Let's check for any communication to some IPs. I can include any text here I want because the Jupyter magic for ip will regex extract IPv4 addresses and look them up

52.239.151.138

65.52.108.90

end of list of IPs

WDATP IP Magic

Using the Advanced Hunting API

WDATP collects a rich set of data from endpoints and makes it available for hunting in the portal. You can also perform hunting queries with the API. This is a powerful feature that allows you to build automation around hunting scenarios. 

In the example below, we get the most recent alert and then construct a query to get relevant records around the time of the alert from process creation events, network activity, and file creation activity. One could build on this to create triage scripts that automate common steps for responding to alerts.

#get the most recent alert

alert_df = wdatp_api.alerts(filterstr= '$top=1')

 

print("title: %s\ncategory: %s\nalert id: %s" % (alert_df.title.values[0], alert_df.category.values[0], alert_df.id.values[0]))

 

# build a hunting query that gets data around the time of the alert

get_records_qry = '''

let alertId = "%s";

let alert = AlertEvents | where AlertId == alertId | summarize AlertFirstEventTime=min(EventTime) by MachineId;

let machineId = toscalar(alert | project MachineId);

let timestamp = toscalar(alert | project AlertFirstEventTime);

let lookupPeriod = 10m;

find in (ProcessCreationEvents, NetworkCommunicationEvents, FileCreationEvents)

where EventTime between ((timestamp - lookupPeriod) .. lookupPeriod)

        and MachineId == machineId

| take 1000

''' % (alert_df.id.values[0])

 

related_data_df = wdatp_api.query(query=get_records_qry)

print ("Number of records found %d" % len(related_data_df))

related_data_df

Advanced Hunting API 

 

Visualizing Data

In the example below we have a query that searches for machines and their associated outbound communication patterns. It then renders the machines and the IP ranges they communicate with in a Chord diagram.  This kind of analysis might be handy to take a population of machines that should be communicating similarly because they are all the same role (for example Domain Controllers) and spot outliers.

The code uses the holoviews library to do the visualization. 

# this query gets a handful of machines named desktop

query = '''

let machine_list = MachineInfo | where EventTime > ago(1d) | where ComputerName startswith 'desktop-' | summarize by ComputerName, MachineId | take 5;

NetworkCommunicationEvents

| where EventTime > ago(5d) | where isnotempty(MachineId)

| where RemoteIPType == 'Public' and RemoteIP contains ('.') and RemotePort in ('80')

| extend MaskedIP = strcat(split(RemoteIP,'.')[0],'.', split(RemoteIP,'.')[1], '.*.*')

| summarize by MachineId, MaskedIP

| join kind=inner (machine_list) on MachineId

| summarize by ComputerName, MaskedIP

'''

df = wdatp_api.query(query)

print ("rows returned = %d" % len(df))

 

## this does some footwork for holoviews to allow us to format the colors, nodes, and edges.

 

import holoviews as hv

hv.extension('bokeh')

 

def holoview_charting_objects(df):

    g1 = list(df[df.columns[0]].astype(str).unique())

    g2 = list(df[df.columns[1]].astype(str).unique())

    g1d = list(map(lambda x: {'name': x, 'group':1}, g1))

    g2d = list(map(lambda x: {'name': x, 'group':2}, g2))

    nodes = hv.Dataset(pd.DataFrame(g1d + g2d), 'index')

 

    src=list(map(lambda x: nodes.data[nodes.data['name'] == x]['index'].values[0], list(df[df.columns[0]].values)))

    dst = list(map(lambda x: nodes.data[nodes.data['name'] == x]['index'].values[0], list(df[df.columns[1]].values)))

    df2 = pd.DataFrame(list(zip(src,dst)), columns=['source','dest'])

    return (df2, nodes)

 

%opts Chord [width=800 height=800]

%opts Chord [label_index='name' color_index='index' edge_color_index='source']

%opts Chord (cmap='Category20' edge_cmap='Category20')

 

import holoviews as hv

hv.Chord(holoview_charting_objects(df))

Chord diagram

 

 

Holoviews has a wide variety of visualization options. We can render the same data in a Scatter plot. This allows one to quickly identify netblocks that are commonly communicated to by many machines and also see IP blocks that have only one or two machines communicating with them.

# we can also render the same data in a Scatter plot

 

%opts Scatter [width=800 height=800] (size=10)

import holoviews as hv

hv.extension('bokeh')

hv.Scatter(df).options(show_grid=True)

 Scatterplot

 

Enriching WDATP data by calling other APIs and services

In this section we will show how you can call other APIs to enrich WDATP data--in this case VirusTotal (VT). If you don’t have a VT API key, you can sign up for one for free here. You pass in your VT API key to the sample by creating the WDATP endpoint as follows

wdatp_api = WDATP(tenantId, appId, appSecret, vt_api_key= '<YOUR VT_API_KEY>')

 

In this example, we query WDATP for hashes of files to gain additional context.  The query collects SHA1 file hashes from WDATP Alerts. By querying what VT knows about the hashes, we may gain additional insight.

Next, we’ll take any file hashes where VT had results and query WDATP data to enrich it. The below shows querying the files API to get global prevalence information and combining the results with a link to the VirusTotal webpage for the file.

## query WDATP data for hashes of EXE files created in the user's AppData folder. Choose 50 of them at random

 

df_results = wdatp_api.query(query = '''

AlertEvents

| where EventTime > ago(1d)

| summarize by SHA1

| extend randsortorder=rand() | sort by randsortorder

| take 50

'''

)

print("Number of hashes = %d" % len(df_results))

## query VirusTotal to see if anyone has submitted them for scanning before.

## List any results with at least one AV engine positive result

 

vtdf = wdatp_api.vtresults(list(df_results.SHA1)).fillna('')

vt_pos = vtdf.loc[vtdf['response_code'] == 1].loc[vtdf['positives'] != 0]

vt_pos

## look up each positive hit in WDATP data and enrich results with WDATP information

 

wdatp_file_results = []

df_all = pd.DataFrame()

for sha1 in list(vt_pos['sha1']):

    df_file = wdatp_api.files(filehash = sha1)

    if len(df_file) > 0:

        wdatp_file_results.append(df_file)

if len(wdatp_file_results) > 0:

    df_all = pd.concat(wdatp_file_results)

df_all = df_all.merge(vt_pos, right_on = 'sha1', left_on = 'sha1')

df_all

 VirusTotal API example

 

In closing

In this post we walked through how to get started with the WDATP APIs. Because the APIs are exposed as REST endpoints they are simple to call using Python or other languages. The Jupyter Notebook is a useful open source package that makes it easy to interact with data and save the results in a reusable notebook that is easy to share with others. We look forward to what you’ll do with the APIs and the notebooks you’ll create!

The author would like to thank Eric Hutchins (@killchain) for introducing him to Jupyter notebooks.

Updated Nov 14, 2019
Version 2.0
No CommentsBe the first to comment