Processing ordered IoT streams in Azure
Published Sep 09 2019 03:33 PM 4,651 Views
Microsoft

Why are Ordered Events Important?

While many IoT data streams can, and should, be treated as Discrete events, some need to be treated as Series events, where events must be processed in order throughout the solution pipeline.

 

What are Series events? Series events are ordered and interrelated. The consumer needs the sequenced series of events to analyze what happened.

 

An example of Series events is streaming temperature values or GPS coordinates from a moving vehicle. These are time-ordered and often must be processed in order. For example, if the average temperature over the last 5 minutes is trending up, trigger an alert. Another example, if a vehicle's GPS coordinates are streamed to a geofence solution, and you process those events out-of-order, you could have a situation where solution detects the vehicle is leaving the geofence before it had entered the geofence.

 

Other examples include real-time fraud detection and systems that utilize Event Sourcing.

Ways to Handle Ordered Events

Before we delve into the complexities let's discuss other ways of handling ordered events, such as:

 

  1. Do not order events
    • Sounds obvious but often developers assume all events must be ordered. Not so. Many IoT data streams can be treated as Discrete. 
    • Analyze your streaming IoT data and determine which are Discrete events vs Series Events. Try to make most events discrete, as scale is easier to achieve with Discrete events.
  2. Store events out of order but return queries in ordered form
    • The challenge is that you need to land your data first (e.g. in Cosmos DB, Azure Data Explorer, etc.), then have your microservices query (poll) for this data in ordered form.
    • This introduces latency in the solution as the data needs to land first before processing it.
    • If your solution can tolerate this latency then this is a perfectly viable approach.

Failing these options, you will need to process events in order as they arrive. Herein lies the challenge.

Ordered Events and CAP Theorem

Processing ordered events, at scale, is difficult. Why? Blame CAP Theorem.

clipboard_image_0.png

  • Consistency: Every read receives the most recent write or an error.
  • Availability: Every request receives a (non-error) response – without the guarantee that it contains the most recent write.
  • Partition tolerance: The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes.

Much has been written about CAP Theorem and databases, with some database systems allowing you to choose between Consistency and Availability. Azure Cosmos DB is a great example of such as system. It supports tunable consistency levels. However, CAP Theorem can also be applied to IoT data streams and the trade-offs necessary to process ordered events.

 

As with databases, event ingestion services such as Azure IoT Hub, Azure Event Hub, Kafka etc. utilize partitions in order to scale out.

 

CAP theorem implies that in the presence of a network partition, one has to choose between consistency and availability.

 

So, think of Partition as a constant. With Partition as a constant, the choice becomes Consistency or Availability.

 

But what exactly is a partition? According to the Event Hub documentation, a partition is an ordered sequence of events. As newer events arrive, they are added to the end of this sequence. A partition can be thought of as a "commit log."

 

Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The more partitions, the more parallelism.

The Tradeoff

Here's where the tradeoff comes into play. If you require consistency (where streaming events must be processed in order) you need to ensure that all events from that device arrive in the same partition. In doing so you are trading off availability, as you now have a reliance on individual partitions. You are choosing Consistency over Availability in terms of CAP Theorem.

Achieving Consistency with Event Hub and IoT Hub

In order to guarantee consistency, all events coming from the same device must go to the same partition. Azure IoT Hub already does this for us behind the scenes. All messages from a specific IoT Hub Device Id always end up on the same partition.

 

IotHub.jpg

 

But with Event Hub, you need to do this in your publisher code, as Event Hub has no concept of a device.

Sending events to an Event Hub Partition

There are two ways to guarantee consistency when sending events to Event Hub:

  1. Specify a partition key when sending an event, or
  2. Specify a partition ID to send events to a specific partition.

The first way is to specify a partition key when sending using the SendAsync method of the EventHubClient. Normally this would be a unique device ID or device name. A hash algorithm is run over the partition key to ensure all events with the same partition key land on the same partition.

 

eventhub-sending.jpg

 

The following code example shows 100 events landing on the same partition by specifying a partition key (e.g. “Device1”). Note: The Event Hub was created with 4 partitions, numbered 0 to 3.

 

 

private static async Task SendEventsToEventHub(int numEventsToSend)
{
    for (var i = 0; i < numEventsToSend; i++)
    {
        try
        {
            var eventMsg = $"Event {i}";
            Console.WriteLine($"Sending event: {eventMsg}");
            await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(eventMsg)),"Device1");
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
        }

        await Task.Delay(10);
    }

    Console.WriteLine($"{numEventsToSend} events sent.");
}

 

 

The issue with this method is you don’t know exactly which partition the event will land on. In this case the sample code shows the events from ‘Device1’ all landing on partition ‘2’.

 

 

Registering EventProcessor...
Receiving. Press ENTER to stop worker.
SimpleEventProcessor initialized. Partition: '2'
SimpleEventProcessor initialized. Partition: '0'
SimpleEventProcessor initialized. Partition: '3'
SimpleEventProcessor initialized. Partition: '1'
Message received. Partition: '2', Data: 'Event 0'
Message received. Partition: '2', Data: 'Event 1'
Message received. Partition: '2', Data: 'Event 2'
Message received. Partition: '2', Data: 'Event 3'
Message received. Partition: '2', Data: 'Event 4'
Message received. Partition: '2', Data: 'Event 5'
Message received. Partition: '2', Data: 'Event 6'
Message received. Partition: '2', Data: 'Event 7'
Message received. Partition: '2', Data: 'Event 8'
Message received. Partition: '2', Data: 'Event 9'

 

 

The second way is to specify an actual partition ID when sending using the SendAsync method a PartitionSender object. Use this option when you need to specifically control which events land on which partition. In this example I have explicitly specified partition ‘0’ when creating the partition sender object.

 

 

static async Task Main(string[] args)
{
    // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
    // Typically, the connection string should have the entity path in it, but this simple scenario
    // uses the connection string from the namespace.
    var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
    {
        EntityPath = EventHubName
    };

    eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

    // Specify a specific partition Id to send the messages to.
    partitionSender = eventHubClient.CreatePartitionSender("2");

    await SendEventsToEventHub(100);

    await eventHubClient.CloseAsync();

    Console.WriteLine("Press ENTER to exit.");
    Console.ReadLine();
}

// Uses the event hub client to send 100 events to the same event hub partition
private static async Task SendEventsToEventHub(int numEventsToSend)
{
    for (var i = 0; i < numEventsToSend; i++)
    {
        try
        {
            var eventMsg = $"Event {i}";
            Console.WriteLine($"Sending event: {eventMsg}");
            await partitionSender.SendAsync(new EventData(Encoding.UTF8.GetBytes(eventMsg)));
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
        }

        await Task.Delay(10);
    }

    Console.WriteLine($"{numEventsToSend} events sent.");
}

 

 

Note: Do not mix PartitionKey and PartitionSender as they are two mutually exclusive concepts.

 

Consuming ordered events from a Partition

Now, how do we consume events from IoT Hub or Event Hub? There are currently two .NET Standard client libraries for reading events from IoT Hub and Event Hub:

 

  • Azure.EventHubs.Processor: Contains the Event Processor Host (EPH) class that provide high level functionality to read events from all partitions, and enables keeping track of processed events via checkpoints. The EPH interface is the easiest way to read from an IoT Hub or Event Hub and always the best place to start.
  • Azure.EventHubs: Provides more granular operations than EPH and is somewhat more complex to use. It does not automatically provide checkpoints. You'll need to add this yourself.

Reading events from IoT Hub or Event Hub is always accomplished through a Consumer Group. As noted in the link, it is recommended that there is only one active receiver on a partition per consumer group. Otherwise you wind up with duplicate messages being processed.

 

Event Processor Host

To consume Series (and Discrete) events from IoT Hub and Event Hub we always recommend using Event Processor Host (EPH). Event Processor Host is used by a number of higher level Azure services (such as Azure Functions - Event Hub Trigger and IoT Hub Trigger) and has a number of advanced, documented capabilities including:

 

  • Scale - Create multiple consumers, with each consumer taking ownership of reading from a few Event Hubs partitions.
  • Load balancing - Increase or reduce the consumers dynamically. For example, when a new sensor type (for example, a carbon monoxide detector) is added to each home, the number of events increases. In that case, the operator (a human) increases the number of consumer instances. Then, the pool of consumers can re-balance the number of partitions they own, to share the load with the newly added consumers.
  • Seamless resume on failure - If a consumer (consumer A) fails (for example, the virtual machine hosting the consumer suddenly crashes), then other consumers must be able to pick up the partitions owned by consumer A and continue. Also, the continuation point, called a checkpoint or offset, should be at the exact point at which consumer A failed, or slightly before that. EPH saves checkpoints to Azure blob storage.

These capabilities are provided through the IEventProcessor interface of the EventProcessorHost class. Consumers register the EventProcessorHost instance with the IoT Hub or Event Hubs service. Registering an event processor class with an instance of EventProcessorHost starts event processing. Registering instructs the Event Hubs service to expect that the consumer app consumes events from some of its partitions, and to invoke the IEventProcessor implementation code whenever it pushes events to consume.

 

According to the documentation

 

"Any given instance of an event processor class will only process events from one partition of one Event Hub."

 

This ensures there is only one consumer per partition and ensures messages are processed in order by your consumer. There is a great example of how this is accomplished in the Event Host Processor documentation.

 

There are four (4) methods to implement in IEventProcessor as shown in the following code sample.

 

 

   public class SimpleEventProcessor : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }

        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }

        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
            }

            return context.CheckpointAsync();
        }
    }

 

 

Running the above code will result in the following output.

 

 

Registering EventProcessor...
Receiving. Press ENTER to stop worker.
SimpleEventProcessor initialized. Partition: '2'
SimpleEventProcessor initialized. Partition: '3'
SimpleEventProcessor initialized. Partition: '0'
SimpleEventProcessor initialized. Partition: '1'
Message received. Partition: '2', Data: 'Event 0'
Message received. Partition: '2', Data: 'Event 1'
Message received. Partition: '2', Data: 'Event 2'
Message received. Partition: '2', Data: 'Event 3'
Message received. Partition: '2', Data: 'Event 4'
Message received. Partition: '2', Data: 'Event 5'
Message received. Partition: '2', Data: 'Event 6'
Message received. Partition: '2', Data: 'Event 7'
Message received. Partition: '2', Data: 'Event 8'
Message received. Partition: '2', Data: 'Event 9'

 

 

As can be seen in the output, this IoT Hub has 4 partitions, and the events are being processed in order on a per-partition basis.

 

Behind the scenes EPH will also checkpoint its progress in Azure Blob Storage whenever you call CheckpointAsync(). Looking at the underlying storage account reveals four (4) checkpoint files, one per partition. 

 

blob1.png

 

Opening the partition "2" file reveals the following:

 

 

{"Offset":"154864","SequenceNumber":2424,"PartitionId":"2","Owner":"host-3a90d623-e19d-45c0-bdff-be7909269f24","Token":"a854d208-291e-4586-9cb6-2021c74af1cc","Epoch":4}

 

 

The Offset, SequenceNumber and PartitionID record your last position in case your consumer crashes and needs to be restarted. The remaining information relates to the lease. Using EPH hides much of this checkpoint complexity for you. But if you decide to use Event Hub Client instead, you will need to develop your own checkpoint code.

 

Event Hub Client

If Event Processor Host (EPH) is not suitable for your needs you can use the EventHubClient. Use the EventHubClient.CreateReceiver() method and specify a partition ID, which returns a PartitionReceiver object. You will create one PartitionReceiver per Consumer Group + Partition combination so each partition has only one consumer.

event-hub-consumer1.jpg

 

The following code sample demonstrates a single consumer reading from a single partition.

 

 

private static async Task ReceiveMessagesFromPartition(string partition, CancellationToken ct)
{

    receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partition, EventPosition.FromEnqueuedTime(DateTime.Now));

    Console.WriteLine($"Connected to partition {receiver.PartitionId}, consumer group {receiver.ConsumerGroupName}");

    while (true)
    {
        if (ct.IsCancellationRequested)
        {
            return;
        }

        // Receive a maximum of 100 messages in this call to ReceiveAsync
        var ehEvents = await receiver.ReceiveAsync(100);

        // ReceiveAsync can return null if there are no messages
        if (ehEvents != null)
        {
            // Since ReceiveAsync can return more than a single event you will need a loop to process
            foreach (var ehEvent in ehEvents)
            {
                // Decode the byte array segment
                var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);
                Console.WriteLine($"Received. '{message}'");
            }
        }
    }
}

 

 

Running the above code shows that the PartitionReceiver is receiving events, in order, from a single partition.

 

 

Press ENTER to exit.
Connected to partition 2, consumer group $Default
Received. 'Event 0'
Received. 'Event 1'
Received. 'Event 2'
Received. 'Event 3'
Received. 'Event 4'
Received. 'Event 5'
Received. 'Event 6'
Received. 'Event 7'
…
Received. 'Event 99'

 

 

Congratulations you are now receiving events in order!

 

Don’t forget to periodically checkpoint (e.g. to blob storage) where you are in the partition stream so if your consumer crashes, a new consumer picks up where the previous consumer left off. The following code sample from here shows how you can determine your position using the SystemProperties SequenceNumber or Offset.

 

 

EventHubClient client = EventHubClient.Create("__connectionString__");
PartitionReceiver receiver = client.CreateReceiver("ConsumerGroup1", "1");
IEnumerable receivedEvents = await receiver.ReceiveAsync(BatchSize);
     
while (true)
{
    int batchSize = 0;
    if (receivedEvents != null)
    {
        foreach (EventData receivedEvent in receivedEvents)
        {
            Console.WriteLine("Message Payload: {0}", Encoding.UTF8.GetString(receivedEvent.Body));
            Console.WriteLine("Offset: {0}, SeqNo: {1}, EnqueueTime: {2}", 
                receivedEvent.SystemProperties.Offset, 
                receivedEvent.SystemProperties.SequenceNumber, 
                receivedEvent.SystemProperties.EnqueuedTime);
            batchSize++;
        }
    }
         
    Console.WriteLine("ReceivedBatch Size: {0}", batchSize);
    receivedEvents = await receiver.ReceiveAsync();
}

 

 

When calling the EventHubClient.CreateReceiver() method you would specify FromOffset or FromSequence number when restarting the consumer.

Achieving Scale

Now that we are reading events in order, how do we achieve scale?

 

First, consider a fan-out approach to increase the number of partitions and consumers:

 

  1. Create your IoT Hub and/or Event Hub with lots of partitions to increase parallelism
    • By default, each IoT Hub and Event Hub supports up to 32 partitions. This value must be specified at creation but this can be increased to 128 by creating a support case or by creating with an ARM template.
    • For a larger number of partitions, consider Event Hub Dedicated, which offers 2,000 partitions per CU.
  2. Distribute your telemetry over multiple IoT Hubs and/or Event Hubs.
    • When provisioning IoT devices using DPS, setup a policy to distribute those device over multiple IoT Hubs.
    • Use IoT Message Routing to fan out messages to different Event Hubs endpoints based in the message payload. E.g. GPS event are routed to Event Hub A while temperature events are routed to Event Hub B.
  3. Ensure there is one consumer per partition per IoT Hub and/or Event Hub to maximize parallelism.
  4. Ensure you configure IoT Hub and Event Hub to meet your throughput requirements. Don't confuse the number of partitions (degree of downstream parallelism) with throughput. 
    • Each IoT Hub Unit provides additional capacity depending on the SKU. 
    • Each Event Hub Throughput Unit provides: 
      • Ingress: Up to 1 MB per second or 1000 events per second (whichever comes first).
      • Egress: Up to 2 MB per second or 4096 events per second.

 

event-hub-consumer2.jpg

 

Expanding on the above scenario, imagine a fan out approach that distributes your IoT devices across 6 IoT Hubs, each configured with 32 partitions, will result in 192 partitions. Your event processing consumer microservice can then run across 192 instances, with each instance consuming from a single partition.

 

In addition to the fan-out approach, try optimizing your event processing code.

 

  1. Lighten the work performed by your event processing code.
    • Put the bare minimum code in place to process the event in order. Move tasks to other services that aren’t dependent on ordered events.
  2. Read events in batches
    • Calling IoT Hub or Event Hub for a single event is expensive.
    • ReceiveAsync() allows you to read in batches events from a specific partition.
    • Experiment with different batch sizes.
  3. If using EventProcessorHost, set the following EventProcessorOptions:
    • MaxBatchSize - this is the maximum size of the collection the user wants to receive in an invocation of ProcessEventsAsync. Note that this is not the minimum, only the maximum. If there are not this many messages to be received the ProcessEventsAsync will execute with as many as were available.
    • PrefetchCount - this is a value used by the underlying AMQP channel to determine the upper limit of how many messages the client should receive. This value should be greater than or equal to MaxBatchSize.
  4. Decrease the frequency of checkpoints.
    • Checkpointing too frequently (e.g. after each event is processed) will decrease the performance of your consumer.
    • Checkpointing infrequently (e.g. after event 100 events are consumed) will cause events to be re-read from the partition when the consumer restarts after a crash, thereby causing duplicate messages to be processed.
    • You will need to weigh what more important to you...scale or preventing duplicate events from being processed.
  5. Utilize asynchronous programming within your event processing code.
    • However, be cautious as this may also cause events to become out of order.

In conclusion, the best way to scale your IoT solution is to:

  • Avoid or minimize the use of time ordered events; and/or
  • Save events out of order but return results in order.

But if your solution requires processing events in order, hopefully you’ll find the guidance in this article helpful.

 

Links:

Version history
Last update:
‎Oct 18 2019 08:13 PM
Updated by: