This post was co-authored by Ramachandran G, Sr Software Eng Manager, Azure Data Explorer.
What is Protobuf?
When we serialize, we’re converting from an in-memory representation of data that’s good for analysis and/or computation into a form that’s better for storage and/or transmission. Its main purpose is to save the state of an object in order to be able to recreate it when needed. The reverse process is called deserialization. Learn more.
Protocol buffers (Protobuf) are a language and platform-neutral, extensible mechanism for serializing and deserializing structured data for use in communications protocols and data storage. Essentially, protocol buffers are a method for serializing data to efficiently exchange data between data producer & consumer. The structure is reminiscent of XML or JSON, but unlike these more commonly used text-based serialization methods, protocol buffers are designed to produce extremely compact messages using a binary format. The main tradeoff is that protocol buffers aren’t human-readable.
3 Musketeers - ADX, Kafka & Protobuf
Kafka is increasingly become a popular choice of scalable message queueing for large data processing workloads. This makes it very popular in IoT based ecosystem where there is large ingress in data before data processing (or) data storage. Azure Data Explorer is a very powerful time series and analytics database that suits IoT scale data ingestion and data querying.
Kafka supports ingestion of data in multiple formats including JSON, Avro, Protobuf and String. ADX supports ingestion of data from Kafka into ADX in all these formats. Due to excellent schema support, extensibility to various platforms and compression, [protobuf](https://developers.google.com/protocol-buffers) is increasingly becoming a data exchange choice in IoT based systems. The ADX Kafka sink connector leverages the Kafka Connect framework and provides an adapter to ingest data from Kafka in all these formats.
The following section aims to provide configuration to support ingestion of protobuf data from Kafka to ADX.
The following variants are supported for protobuf ingestion from Kafka
1. Confluent version
This version is supported by the Confluent package version of Kafka only. This leverages schema registry as the source of truth for the schemas
2. Community version
This version is supported by the free open-source software (OSS) Apache Kafka package. This version does not ship with the schema registry, there is an open-source version of Confluent Kafka as well that can run standalone without the schema registry. This will be used for the demonstration
Setup examples for the configuration
The code of the connector is also open source and available in the github repository. This can be built from source and deployed into the Kafka connect runtime as well.
Note: Optionally a detailed set up and lab instructions for installing the ADX Kafka connector using the instructions in Installing and Configuring the Kafka Connector. The confluent version of the connector set up is set up as documented in this guide and the HDInsight version is set up as on this guide.
1. Configuring the connector for Confluent version (with schema registry)
For the purposes of this demonstration, we use a docker based set up to run Confluent Kafka with the schema registry. The docker-compose file leverages a set up for undertaking a quick test scenario.
A sample docker compose file that can be used for the setup is provided in the sample.
Docker Compose file for confluent schema registry and kafka connect (github.com)
Using docker-compose, the Kafka setup along with the schema registry can be started up:
> docker-compose -f docker-compose.yml up —build -d
Creating network "confluent_default" with the default driver
Pulling zookeeper (confluentinc/cp-zookeeper :6.2.5)
Creating zookeeper done
Creating kafka done
Creating schema-registry done
Creating connect done
The next step is to set up an azure service principal that will be used in the application. Currently app-id / app-key set up is supported in the connector. Using the az cli, the following can be created:
> az login
> az account set —subscription <subscription id>
> az ad sp create-for-rbac -n kustc-protc-demo-sample-sp
{
"appId": "xnnnx-nn-xnx-xnx-nnnxnnx"
"displayName": "kustc-protc-demo-sample-sp",
"tenant": "xnnnx-nn-xnx-xnx-nnnxnnx",
"appKey": "xnnnx-nn-xnx-xnx-nnnxnnx"
}
The final step is to assign table ingestor role for the service principal on the azure portal for the ADX cluster and database created.
Once these set-ups are done, the next step is to deploy the application connector. The Kusto Kafka connector is based on the Kafka connect framework. Using the sink API’s allow ingestion of data from Kafka to ADX. There is native support for a variety of formats including JSON , Apache Avro , Apache Parquet , Apache ORC and more recently Google’s Protobuf.
Support for protobuf in the connector leverages the protobuf de-serializer from confluent in case of the confluent connector (which uses the schema registry) & open source de-serializers for pure OSS solution. The configurations of these are explained in detail below.
The test set up is a simple producer application that uses a protobuf schema and produces messages. The overall schematic resembles the following:
Setting up the database
In the simplified set up on ADX we create a table PbStreams that has an identity mapping for multi-json and a singular field called payload. This can then be processed further using an Update policy into sub tables further.
//Create the table
.create table PbStreams (payload: dynamic)
//Simplistic identity mapping applied on the table
.create table PbStreams ingestion json mapping 'pb_mapping' '[{"Column": "payload", "Properties": {"Path": "$"}}]'
//Batching ingestion policy. Ingest based on whichever condition is fulfilled first
.alter table PbStreams policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Copying the artifact for deployment
- Download the latest release of the connector from https://github.com/Azure/kafka-sink-azure-kusto/releases
- Un-compress and copy to the plugin path (set to /usr/share/java in the compose file) & restart the container
> docker cp .\microsoftcorporation-kafka-sink-azurekusto <container-hash>:/usr/share/java
> docker restart <container hash>
Configuring and Deploying the ADX connector
The first step is to prepare the configuration for the connector. The following is a sample configuration for the connector:
- The logical name of the connector
- The schema registry URL. The hostname in docker-compose is schema-registry & discoverable by name in the docker network
- When to flush the data, 10KB or 10 seconds whichever comes first
- The name of the topic to subscribe from
- The credentials of the service principal created earlier to connect to ADX
- The ADX URL’s for ingestion and the ADX engine respectively
- Use the protobuf de-serializer from confluent
The deployment of the connector can be done by making a simple REST API call to the Kafka connect endpoint as follows. In this case from the docker host make a REST API call using postman.
This should register the connector in the runtime & be all set to consume protobuf messages and then persist them on ADX tables as per the mapping. The message will be deserialized and saved as a multi-json.
Creating a test producer and sending a message
First, we define a sample proto file that resembles the following.
The dependencies required and the maven lifecycle is on the gist Maven dependencies (github.com). Running the generate-sources lifecycle command should compile the protobuf file into Java and make it available for other classes to use.
A sample producer to send the message is as follows in the gist Send a simple protobuf message (github.com)
Verifying the messages on the ADX table
All going well, a quick query should turn up the messages that were sent to the ADX tables
2. Configuring the connector for OSS version (with schema registry)
The set up for the OSS version is largely similar for the Confluent version as well, the key differences are
- Schema registry is not available for the application to use.
- Using an OSS deserializer to read the protobuf message instead of using the confluent deserializer.
Note that the set up ADX tables and the overall set up is similar for the OSS version as for Confluent version unless explicitly mentioned to be different.
The docker compose file is as specified here Kafka connect without schema registry (github.com). Note that for ease of use we have used Confluent Kafka, this image can be changed to the OSS Kafka image. This is done only for ease of the blog. The key difference from the earlier set up is the missing schema registry and the changes we will use in the configuration there-of:
Copying the artifact for deployment
- Download the latest release of the connector from https://github.com/Azure/kafka-sink-azure-kusto/releases
- Un-compress and copy to the plugin path (set to /usr/share/java in the compose file) & restart the container
> docker cp .\microsoftcorporation-kafka-sink-azurekusto <container-hash>:/usr/share/java > docker restart <container hash>
- For deserialization we use the OSS version of protobuf connector from the repository - https://github.com/blueapron/kafka-connect-protobuf-converter
- Download the artefact from maven central and copy this to the following directory depending on the base kafka image
Confluent Kafka: <confluent_dir>/share/java/kafka-serde-tools
Apache Kafka: <apache_kafka_dir>/libs
5. For the purpose of this example, we will use the same proto sample we used prior and use that to demonstrate the ingestion.
- This requires the classes needed for protobuf be built upfront. A simple pom.xml for setting up and compiling the protobuf is here
- This can be packaged using the mvn package lifecycle method. Copy the jar file (assuming name of proto-serde-with-dependencies.jar) into /usr/share/java/proto-serde-with-dependencies.jar
- Restart the docker container issuing docker restart <container-hash>
Configuring and Deploying the ADX connector
The first step is to prepare the configuration for the connector. The following is a sample configuration for the connector. For brevity only specific configurations have been highlighted. The rest of the configurations are as in the connector above:
- The deserializer from the library we included earlier
- The fully qualified class name of the generated class (including the nested class) that will be used for deserializing the message
Deploy this configuration to Kafka connector
This should register the connector in the runtime.
Try this Today, For Free!
If you’re interested in trying this out today, you can start with your own free Azure Data Explorer cluster to host your incoming data. . For more details on getting started with your own free Data Explorer cluster, you can read out the Start For Free clusters here.
You’re welcome to suggest more ideas for connecting ADX with open source tools/ecosystem and vote for them here - https://aka.ms/adx.ideas