While many IoT data streams can, and should, be treated as Discrete events, some need to be treated as Series events, where events must be processed in order throughout the solution pipeline.
What are Series events? Series events are ordered and interrelated. The consumer needs the sequenced series of events to analyze what happened.
An example of Series events is streaming temperature values or GPS coordinates from a moving vehicle. These are time-ordered and often must be processed in order. For example, if the average temperature over the last 5 minutes is trending up, trigger an alert. Another example, if a vehicle's GPS coordinates are streamed to a geofence solution, and you process those events out-of-order, you could have a situation where solution detects the vehicle is leaving the geofence before it had entered the geofence.
Other examples include real-time fraud detection and systems that utilize Event Sourcing.
Before we delve into the complexities let's discuss other ways of handling ordered events, such as:
Failing these options, you will need to process events in order as they arrive. Herein lies the challenge.
Processing ordered events, at scale, is difficult. Why? Blame CAP Theorem.
Much has been written about CAP Theorem and databases, with some database systems allowing you to choose between Consistency and Availability. Azure Cosmos DB is a great example of such as system. It supports tunable consistency levels. However, CAP Theorem can also be applied to IoT data streams and the trade-offs necessary to process ordered events.
As with databases, event ingestion services such as Azure IoT Hub, Azure Event Hub, Kafka etc. utilize partitions in order to scale out.
CAP theorem implies that in the presence of a network partition, one has to choose between consistency and availability.
So, think of Partition as a constant. With Partition as a constant, the choice becomes Consistency or Availability.
But what exactly is a partition? According to the Event Hub documentation, a partition is an ordered sequence of events. As newer events arrive, they are added to the end of this sequence. A partition can be thought of as a "commit log."
Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The more partitions, the more parallelism.
Here's where the tradeoff comes into play. If you require consistency (where streaming events must be processed in order) you need to ensure that all events from that device arrive in the same partition. In doing so you are trading off availability, as you now have a reliance on individual partitions. You are choosing Consistency over Availability in terms of CAP Theorem.
In order to guarantee consistency, all events coming from the same device must go to the same partition. Azure IoT Hub already does this for us behind the scenes. All messages from a specific IoT Hub Device Id always end up on the same partition.
But with Event Hub, you need to do this in your publisher code, as Event Hub has no concept of a device.
There are two ways to guarantee consistency when sending events to Event Hub:
The first way is to specify a partition key when sending using the SendAsync method of the EventHubClient. Normally this would be a unique device ID or device name. A hash algorithm is run over the partition key to ensure all events with the same partition key land on the same partition.
The following code example shows 100 events landing on the same partition by specifying a partition key (e.g. “Device1”). Note: The Event Hub was created with 4 partitions, numbered 0 to 3.
private static async Task SendEventsToEventHub(int numEventsToSend)
{
for (var i = 0; i < numEventsToSend; i++)
{
try
{
var eventMsg = $"Event {i}";
Console.WriteLine($"Sending event: {eventMsg}");
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(eventMsg)),"Device1");
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
}
Console.WriteLine($"{numEventsToSend} events sent.");
}
The issue with this method is you don’t know exactly which partition the event will land on. In this case the sample code shows the events from ‘Device1’ all landing on partition ‘2’.
Registering EventProcessor...
Receiving. Press ENTER to stop worker.
SimpleEventProcessor initialized. Partition: '2'
SimpleEventProcessor initialized. Partition: '0'
SimpleEventProcessor initialized. Partition: '3'
SimpleEventProcessor initialized. Partition: '1'
Message received. Partition: '2', Data: 'Event 0'
Message received. Partition: '2', Data: 'Event 1'
Message received. Partition: '2', Data: 'Event 2'
Message received. Partition: '2', Data: 'Event 3'
Message received. Partition: '2', Data: 'Event 4'
Message received. Partition: '2', Data: 'Event 5'
Message received. Partition: '2', Data: 'Event 6'
Message received. Partition: '2', Data: 'Event 7'
Message received. Partition: '2', Data: 'Event 8'
Message received. Partition: '2', Data: 'Event 9'
The second way is to specify an actual partition ID when sending using the SendAsync method a PartitionSender object. Use this option when you need to specifically control which events land on which partition. In this example I have explicitly specified partition ‘0’ when creating the partition sender object.
static async Task Main(string[] args)
{
// Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
// Typically, the connection string should have the entity path in it, but this simple scenario
// uses the connection string from the namespace.
var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
EntityPath = EventHubName
};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
// Specify a specific partition Id to send the messages to.
partitionSender = eventHubClient.CreatePartitionSender("2");
await SendEventsToEventHub(100);
await eventHubClient.CloseAsync();
Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();
}
// Uses the event hub client to send 100 events to the same event hub partition
private static async Task SendEventsToEventHub(int numEventsToSend)
{
for (var i = 0; i < numEventsToSend; i++)
{
try
{
var eventMsg = $"Event {i}";
Console.WriteLine($"Sending event: {eventMsg}");
await partitionSender.SendAsync(new EventData(Encoding.UTF8.GetBytes(eventMsg)));
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
}
Console.WriteLine($"{numEventsToSend} events sent.");
}
Note: Do not mix PartitionKey and PartitionSender as they are two mutually exclusive concepts.
Now, how do we consume events from IoT Hub or Event Hub? There are currently two .NET Standard client libraries for reading events from IoT Hub and Event Hub:
Reading events from IoT Hub or Event Hub is always accomplished through a Consumer Group. As noted in the link, it is recommended that there is only one active receiver on a partition per consumer group. Otherwise you wind up with duplicate messages being processed.
To consume Series (and Discrete) events from IoT Hub and Event Hub we always recommend using Event Processor Host (EPH). Event Processor Host is used by a number of higher level Azure services (such as Azure Functions - Event Hub Trigger and IoT Hub Trigger) and has a number of advanced, documented capabilities including:
These capabilities are provided through the IEventProcessor interface of the EventProcessorHost class. Consumers register the EventProcessorHost instance with the IoT Hub or Event Hubs service. Registering an event processor class with an instance of EventProcessorHost starts event processing. Registering instructs the Event Hubs service to expect that the consumer app consumes events from some of its partitions, and to invoke the IEventProcessor implementation code whenever it pushes events to consume.
According to the documentation:
"Any given instance of an event processor class will only process events from one partition of one Event Hub."
This ensures there is only one consumer per partition and ensures messages are processed in order by your consumer. There is a great example of how this is accomplished in the Event Host Processor documentation.
There are four (4) methods to implement in IEventProcessor as shown in the following code sample.
public class SimpleEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
}
return context.CheckpointAsync();
}
}
Running the above code will result in the following output.
Registering EventProcessor...
Receiving. Press ENTER to stop worker.
SimpleEventProcessor initialized. Partition: '2'
SimpleEventProcessor initialized. Partition: '3'
SimpleEventProcessor initialized. Partition: '0'
SimpleEventProcessor initialized. Partition: '1'
Message received. Partition: '2', Data: 'Event 0'
Message received. Partition: '2', Data: 'Event 1'
Message received. Partition: '2', Data: 'Event 2'
Message received. Partition: '2', Data: 'Event 3'
Message received. Partition: '2', Data: 'Event 4'
Message received. Partition: '2', Data: 'Event 5'
Message received. Partition: '2', Data: 'Event 6'
Message received. Partition: '2', Data: 'Event 7'
Message received. Partition: '2', Data: 'Event 8'
Message received. Partition: '2', Data: 'Event 9'
As can be seen in the output, this IoT Hub has 4 partitions, and the events are being processed in order on a per-partition basis.
Behind the scenes EPH will also checkpoint its progress in Azure Blob Storage whenever you call CheckpointAsync(). Looking at the underlying storage account reveals four (4) checkpoint files, one per partition.
Opening the partition "2" file reveals the following:
{"Offset":"154864","SequenceNumber":2424,"PartitionId":"2","Owner":"host-3a90d623-e19d-45c0-bdff-be7909269f24","Token":"a854d208-291e-4586-9cb6-2021c74af1cc","Epoch":4}
The Offset, SequenceNumber and PartitionID record your last position in case your consumer crashes and needs to be restarted. The remaining information relates to the lease. Using EPH hides much of this checkpoint complexity for you. But if you decide to use Event Hub Client instead, you will need to develop your own checkpoint code.
If Event Processor Host (EPH) is not suitable for your needs you can use the EventHubClient. Use the EventHubClient.CreateReceiver() method and specify a partition ID, which returns a PartitionReceiver object. You will create one PartitionReceiver per Consumer Group + Partition combination so each partition has only one consumer.
The following code sample demonstrates a single consumer reading from a single partition.
private static async Task ReceiveMessagesFromPartition(string partition, CancellationToken ct)
{
receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partition, EventPosition.FromEnqueuedTime(DateTime.Now));
Console.WriteLine($"Connected to partition {receiver.PartitionId}, consumer group {receiver.ConsumerGroupName}");
while (true)
{
if (ct.IsCancellationRequested)
{
return;
}
// Receive a maximum of 100 messages in this call to ReceiveAsync
var ehEvents = await receiver.ReceiveAsync(100);
// ReceiveAsync can return null if there are no messages
if (ehEvents != null)
{
// Since ReceiveAsync can return more than a single event you will need a loop to process
foreach (var ehEvent in ehEvents)
{
// Decode the byte array segment
var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);
Console.WriteLine($"Received. '{message}'");
}
}
}
}
Running the above code shows that the PartitionReceiver is receiving events, in order, from a single partition.
Press ENTER to exit.
Connected to partition 2, consumer group $Default
Received. 'Event 0'
Received. 'Event 1'
Received. 'Event 2'
Received. 'Event 3'
Received. 'Event 4'
Received. 'Event 5'
Received. 'Event 6'
Received. 'Event 7'
…
Received. 'Event 99'
Congratulations you are now receiving events in order!
Don’t forget to periodically checkpoint (e.g. to blob storage) where you are in the partition stream so if your consumer crashes, a new consumer picks up where the previous consumer left off. The following code sample from here shows how you can determine your position using the SystemProperties SequenceNumber or Offset.
EventHubClient client = EventHubClient.Create("__connectionString__");
PartitionReceiver receiver = client.CreateReceiver("ConsumerGroup1", "1");
IEnumerable receivedEvents = await receiver.ReceiveAsync(BatchSize);
while (true)
{
int batchSize = 0;
if (receivedEvents != null)
{
foreach (EventData receivedEvent in receivedEvents)
{
Console.WriteLine("Message Payload: {0}", Encoding.UTF8.GetString(receivedEvent.Body));
Console.WriteLine("Offset: {0}, SeqNo: {1}, EnqueueTime: {2}",
receivedEvent.SystemProperties.Offset,
receivedEvent.SystemProperties.SequenceNumber,
receivedEvent.SystemProperties.EnqueuedTime);
batchSize++;
}
}
Console.WriteLine("ReceivedBatch Size: {0}", batchSize);
receivedEvents = await receiver.ReceiveAsync();
}
When calling the EventHubClient.CreateReceiver() method you would specify FromOffset or FromSequence number when restarting the consumer.
Now that we are reading events in order, how do we achieve scale?
First, consider a fan-out approach to increase the number of partitions and consumers:
Expanding on the above scenario, imagine a fan out approach that distributes your IoT devices across 6 IoT Hubs, each configured with 32 partitions, will result in 192 partitions. Your event processing consumer microservice can then run across 192 instances, with each instance consuming from a single partition.
In addition to the fan-out approach, try optimizing your event processing code.
In conclusion, the best way to scale your IoT solution is to:
But if your solution requires processing events in order, hopefully you’ll find the guidance in this article helpful.
Links:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.