Scaling Up Syslog CEF Collection
Published Feb 20 2020 04:13 PM 24.6K Views

This blog post is authored by Nicholas DiCola

 

In the last few months working on Microsoft Sentinel, many customers have asked me about scaling up syslog CEF collection for getting data into Microsoft Sentinel.  I have created two sample architectures with code deployment for this purpose.  The samples is available at:

 

CEF-VMSS is for deploying native Microsoft Sentinel CEF collection by sending syslog CEF message to rsyslog which then sends the messages to the Log Analytics Agent.

 

Logstash-VMSS is for deploying Logstash on the VMs to do message manipulation which then sends the messages to the Log Analytics Agent.  You may also want to use this architecture and change the input to a source like Kafka.

 

I will not deep dive on all the topics of this architecture.  You can research each on your own and will focus on an overview of the architecture.

 

Virtual Machine Scale Set

The architecture starts with a VMSS which lets you manage and create a group of virtual machines.  These VMs can autoscale up and down additional instances based on schedule or demand.  The sample uses autoscale settings to configure the VMSS to scale up and down based on CPU (demand) of messages being sent.

 

I have included a Load Balancer in-front of the VMSS which will allow you to configure 1 destination IP address (the Public Ip Address) and it will spread the incoming messages across the running instances.

 

There are 2 ARM templates for RedHat and Unbuntu.  The templates deploy everything needed for the architecture.  One key part of the ARM templates is using cloud init to configure the VMSS instances as they are created.  Below is the Unbuntu cloud init files.

 

Cloud Init for CEF-VMSS:

#cloud-config
package_upgrade: true
runcmd:
  - sudo apt-get update
  - sudo wget https://raw.githubusercontent.com/Azure/Azure-Sentinel/master/DataConnectors/CEF/cef_installer.py&&sudo python cef_installer.py

As you can see the cloud init, it installs updates and the Log Analytics Agent using the Microsoft Sentinel CEF script.  The ARM template appends the workspace id and workspace key to the last line so that the agent gets connected to the right workspace

 

Cloud Init for Logstash-VMSS:

#cloud-config
package_upgrade: true
packages:
  - default-jre
runcmd:
  - wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  - sudo apt-get install apt-transport-https
  - echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
  - sudo apt-get update
  - sudo apt-get install logstash
  - sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-syslog
  - sudo /usr/share/logstash/bin/logstash-plugin update
  - wget -q https://raw.githubusercontent.com/Azure/Azure-Sentinel/master/DataConnectors/Logstash-VMSS/logstash.config -O /etc/logstash/config.d/logstash.config
  - echo "update this line with wget -q https://sourceURL -O /etc/logstash/pipelines.yml if you have a custom pipelines file"
  - sudo systemctl start logstash.service
  - sudo wget https://raw.githubusercontent.com/Azure/Azure-Sentinel/master/DataConnectors/CEF/cef_installer.py&&sudo python cef_installer.py

It installs Java, Logstash, Logstash Syslog Output plugin and the Log Analytics Agent using the Microsoft Sentinel CEF script.  The ARM template appends the workspace id and workspace key to the last line so that the agent gets connected to the right workspace.

CEF

CEF is our default way to collect external solutions like firewalls and proxies.  The CEF install script will install the Log Analytics agent, configure rsyslog, and configure the agent for CEF collection.

Logstash

Logstash dynamically ingests, transforms, and ships your data regardless of format or complexity.  It has many input, filter and output plugins.  This can allow you to get data from many sources, manipulate the event data and output to the Log Analytics Agent locally on the machine.  There are many input plugins so this makes it easy to connect to other sources like Kafka.

 

Here is the sample logstash.conf file that is used in the sample architecture:

input {
  tcp {
    port => 5514
    type => syslog
    codec => cef
  }
  udp {
    port => 5514
    type => syslog
    codec => cef
  }
}

filter {
  geoip {
    source => "src"
    target => "srcGeoIP"
    add_field => { "sourceLongitude" => "%{[srcGeoIP][longitude]}" }
    add_field => { "sourceLatitude" => "%{[srcGeoIP][latitude]}" }
  }
  geoip{
    source => "dst"
    target => "dstGeoIP"
    add_field => { "destinationLongitude" => "%{[dstGeoIP][longitude]}" }
    add_field => { "destinationLatitude" => "%{[dstGeoIP][latitude]}" }
  }
  mutate{
    add_field => { "agentReceiptTime" => "%{@timestamp}" }
  }
}

output {
  syslog {
  host => "127.0.0.1"
  port => 25226
  protocol "tcp"
  codec => cef {
    reverse_mapping => true
    delimiter => "\r\n"
    vendor      => "%{deviceVendor}"
    product     => "%{deviceProduct}"
    version     => "%{deviceVersion}"
    signature   => "%{deviceEventClassId}"
    name        => "%{name}"
    severity    => "%{severity}"
    fields => [
      "deviceAction",
      "applicationProtocol",
      "deviceCustomIPv6Address1",
      "deviceCustomIPv6Address1Label",
      "deviceCustomIPv6Address2",
      "deviceCustomIPv6Address2Label",
      "deviceCustomIPv6Address3",
      "deviceCustomIPv6Address3Label",
      "deviceCustomIPv6Address4",
      "deviceCustomIPv6Address4Label",
      "deviceEventCategory",
      "deviceCustomFloatingPoint1",
      "deviceCustomFloatingPoint1Label",
      "deviceCustomFloatingPoint2",
      "deviceCustomFloatingPoint2Label",
      "deviceCustomFloatingPoint3",
      "deviceCustomFloatingPoint3Label",
      "deviceCustomFloatingPoint4",
      "deviceCustomFloatingPoint4Label",
      "deviceCustomNumber1",
      "deviceCustomNumber1Label",
      "deviceCustomNumber2",
      "deviceCustomNumber2Label",
      "deviceCustomNumber3",
      "deviceCustomNumber3Label",
      "baseEventCount",
      "deviceCustomString1",
      "deviceCustomString1Label",
      "deviceCustomString2",
      "deviceCustomString2Label",
      "deviceCustomString3",
      "deviceCustomString3Label",
      "deviceCustomString4",
      "deviceCustomString4Label",
      "deviceCustomString5",
      "deviceCustomString5Label",
      "deviceCustomString6",
      "deviceCustomString6Label",
      "destinationHostName",
      "destinationMacAddress",
      "destinationNtDomain",
      "destinationProcessId",
      "destinationUserPrivileges",
      "destinationProcessName",
      "destinationPort",
      "destinationAddress",
      "destinationUserId",
      "destinationUserName",
      "deviceAddress",
      "deviceHostName",
      "deviceProcessId",
      "endTime",
      "fileName",
      "fileSize",
      "bytesIn",
      "bytesOut",
      "eventOutcome",
      "transportProtocol",
      "requestUrl",
      "deviceReceiptTime",
      "sourceHostName",
      "sourceMacAddress",
      "sourceNtDomain",
      "sourceProcessId",
      "sourceUserPrivileges",
      "sourceProcessName",
     "sourcePort",
      "sourceAddress",
      "startTime",
      "sourceUserId",
      "sourceUserName",
      "agentHostName",
      "agentReceiptTime",
      "agentType",
      "agentId",
      "cefVersion",
      "agentAddress",
      "agentVersion",
      "agentTimeZone",
      "destinationTimeZone",
      "sourceLongitude",
      "sourceLatitude",
      "destinationLongitude",
      "destinationLatitude",
      "categoryDeviceType",
      "managerReceiptTime",
      "agentMacAddress"
      ]
    }
  }
}

The inputs accept both TCP and UDP on port 5514.  I used 5514 because Logstash runs as non-root and requires special configuration to use port 514.  I decided to keep it simple.  On input, its expecting CEF format using “codec => cef” and tags the event as syslog.

 

Once the event is accepted, I have added a few filters.  The first uses the GeoIP plugin which uses the local GeoLite2 database to lookup the source and destination IP addresses.  These are added to a custom field and to align with RFC compliance I then add_field to bring the latitude and longitude into proper fields.  I also mutate to copy the message received time into agentRecievedTime.  This is important as Logstash will send the message to the Log Analytics using its time which will end up as TimeGenerated in Log Analytics.  Doing this will allow you to see the original send time and the time Logstash sent it.  A simple compare will show you how long its take to process.

 

In the output section, I use the syslog plugin to output the message to the agent.  The agent listens on TCP 127.0.0.1:25226.  I set the output plugin to the CEF codec again and there are couple of key important configs.  “reverse_mapping => true” ensures that the message is sent using the short names (src vs sourceAddress) which is required by Log Analytics.  The fields portion requires all fields you want to send. I have included all fields the CEF codec supports.  If the fields doesn’t exist it wont be sent.

Microsoft Sentinel

Once the data is sent to the agent, it will follow all the normal CEF collection ingestion process and end up in CommonSecurityLog.  You can monitor the VMSS event per second using the following query:

CommonSecurityLog
| where _TimeReceived > ago(20m)
| summarize count() by bin(_TimeReceived, 1m), _ResourceId
| extend count =count_ /60
| sort by _TimeReceived desc

This will get all logs in the last 20m and summarize by TimeRecieved and ResourceId.  This gives you the # of event per minute, so you need to create a count column equal to count_ divided by 60 seconds.  Now you can see the EPS per VMSS instance.

If you have performance issues, I recommend you look into Ryslog performance or Logstash performance tuning.

 

Some future improvements I might add:

  • Implement impstats for rsyslog and send data to Log Analytics.  This would allow performance monitoring of rsyslog (dashboarding, queries)
  • Implement GeoIP in rsyslog
  • Implement Logstash monitoring APIs and send data to Log Analytics.  This would allow performance monitoring of Logstash (dashboarding, queries)
  • Create additional sample using fluentd
  • Create additional sample using syslog-ng

Thanks for reading!

14 Comments
Version history
Last update:
‎Nov 02 2021 05:51 PM
Updated by: