In the article "Azure Sentinel correlation rules: Active Lists out; make_list() in," I presented a rule that detects several sign-in failures to Azure AD, alongside a sign-in success to AWS from the same IP address. An inherent challenge with the rule was that the time window is defined by rule execution: if the rule is executed every hour, and an attack is split across two rule executions, no alert will be triggered.
In this blog post, we will see how to modify this rule to analyze a sliding time window and work across rule execution boundaries. While enhancing the rule time window support, we will also add support for delayed events.
This post of part of a series of blog posts on writing rules in Azure Sentinel:
Let start with defining the time variables for the detection:
let rule_frequency = 1h; // how often is the rule executed
let max_session = 24h; // the maximum length of a detected session
let aad_delay = 1h; // the maximum delay we anticipate for events for each source
let aws_delay = 1h;
The rule attempts to detect a sequence of events (1), let's call them a session, that took less than "max_session" and of which the last occurred within the current rule run (2). When events are expected to be delayed, we will still look for such a session (3) but require that the last event was ingested (4) within the current rule run.
The rest is just the mechanics of how to do the above.
The rule logic itself starts, as did the simpler version, with filtering Azure AD Sign-in failures. However, if the original version did not explicitly filter on time, this one adds a time condition (green). The time condition goes back as much as needed to capture sessions that ended at the beginning of the current run window, even if delayed.
let signin_threshold = 5;
let suspicious_signins =
SigninLogs
| where TimeGenerated > ago (max_session + rule_frequency + aad_delay)
| where ResultType !in ("0", "50125", "50140")
| where IPAddress != "127.0.0.1"
Next comes the KQL magic that makes sliding windows detection easy:
| sort by IPAddress, TimeGenerated asc
| extend aad_first_time = row_window_session (TimeGenerated, max_session, max_session, IPAddress != prev(IPAddress))
| summarize session_count = count(), aad_last_time = max(TimeGenerated), aad_last_ingest=max(ingestion_time()) by aad_first_time , IPAddress
Once we have sessions, we need to check if they indeed indicate a detection:
The last line, project-rename, is just makeup. It renames IPAddress to make it easy to identify it as the AAD failure session IP address. This comes in handy after the join when multiple tables add their fields to the result set.
| where session_count >= signin_threshold
| where aad_last_time > ago (rule_frequency + aad_delay)
| project-rename aad_ip_address = IPAddress;
Note that unlike the simpler version, the result of this query part is a table and not a list, and we will use the join operator to correlate it to the AWS events. Why? We need to keep the timestamps and test them after the "join." It would also help us provide more data to the analyst.
While the rule we explore correlates Azure sign-in failures with a successful sign-in on AWS, the more common form is simple aggregation. i.e., detection more than X events in a time window. The section we already covered address such a general use case, with the following small modifications:
After finding successful AWS logins (red), we match them on IP addresses to the sessions we identified earlier using join (blue). For matches, implying we have a session of Azure failures and a successful AWS login for the same address, we test the following:
The "project-rename" and "extend" operators (orange) are again cosmetic only and make the AWS fields easy to use and identify in the joined table.
AWSCloudTrail
| where TimeGenerated > ago(max_session + rule_frequency + delay)
| where EventName == "ConsoleLogin"
| extend LoginResult = tostring(parse_json(ResponseElements).ConsoleLogin)
| where LoginResult == "Success"
| project-rename aws_time = TimeGenerated, aws_ip_address = SourceIpAddress
| extend aws_ingest_time = ingestion_time()
| join suspicious_signins on $left. aws_ip_address == $right.aad_ip_address
| where max_of(aws_ingest_time, aad_last_ingest) > ago(rule_frequency)
| where aws_time between (aad_first_time ..aad_last_time)
That’s it. The rest would be just result and entity preparation.
An analytics rule is not just a query. There are several parameters that you need to set to make it work. The two critical ones are:
Also, you should configure alert aggregation based on the source IP address. A sliding window-based rule will alert when new events are extending an existing session. While it is possible to suppress such alerting, it adds information, so generating an alert that will be grouped to the same incidents is the best practice. Aggregation should be by IP address and aad_first_time, to which you should allocate one of the unused entities.
Aggregation rules and more advanced relations-based rules require careful consideration of time window management to avoid missing alerts or duplicate alerting. This article provides a recipe that should help you implement such rules correctly using a sliding window analysis.
I hope you found it useful!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.