This article will explain the process of sending Json schema formatted topics from an HDInsight managed Kafka standalone server to a MySQL DB. The steps can be extended for a distributed system also. We have used Ubuntu 18.0.4 machines for the cluster.
There are some prerequisite steps:
- Create a HDInsight Managed Kafka cluster with OSS Kafka Schema Registry. You can follow this link for reference.
- Start the Schema Registry service in a separate console. You can refer to the above link.
- Create an Azure Mysql DB and a private link to access it. Create the required database, in this article, we will use a sample DB with one “id” field.
Detail Steps:
- Install mysql jdbc driver
$sudo apt-get install -y libmysql-java
This will install MySQL JDBC driver in your machine in the location /usr/share/java
Please place the mysql-connector-java-{version}.jar file in the appropriate plugin path.
To find the plugin path use the below command
$sudo find / -name kafka-connect-jdbc\*.jar
You will find the plugin paths, please put the jar file in the appropriate plugin path, if you skip this step, you may get "No Suitable Driver" found error.
- Configure Kafka Connect in standalone mode
Acquire the Zookeeper and Kafka broker data. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command
$export password='PASSWORD'
Extract the correctly cased cluster name
$export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Extract the Kafka Zookeeper hosts
$export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
To extract Kafka Broker information into the variable KAFKABROKERS use the below command
$export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Copy the to and populate the properties as shown below.
$sudo cp /usr/hdp/current/kafka-broker/config/ /usr/hdp/current/kafka-broker/config/
bootstrap.servers=<Enter the full contents of $KAFKABROKERS>
- Deploy the JDBC sink Kafka Connect Plugins
Download the relevant Kafka Plugins from the Confluent Hub to your local desktop
Create a new folder path on the edge node and set its properties
$sudo mkdir /usr/hdp/current/kafka-broker/connectors
$sudo chmod 777 /usr/hdp/current/kafka-broker/connectors
Using WINSCP or any other SCP tool of your choice upload the Kafka Connect plugins into the folder path /usr/hdp/current/kafka-broker/connectors
- Configure Kafka Connect plugin for mysql jdbc sink connector
$sudo vi /usr/hdp/current/kafka-broker/connectors/
name=<name of the connector>
- Start the sink connector
In a new session start the sink connector
$sudo /usr/hdp/current/kafka-broker/bin/ /usr/hdp/current/kafka-broker/config/ /usr/hdp/current/kafka-broker/connectors/
- Create a Kafka topic
In a new session create a new topic
$/usr/hdp/current/kafka-broker/bin/ --create --replication-factor 3 --partitions 8 --topic <sampleTopic name> --zookeeper $KAFKAZKHOSTS
- Send data into the topic
$/usr/hdp/current/kafka-broker/bin/ --broker-list $KAFKABROKERS --topic <sampleTopic name>
> {"schema": {"type": "struct", "fields": [{"type": "int32", "optional": true, "field": "id" }], "optional": false, "name": "foobar"},"payload": {"id": 40000}}
For more details regarding Json Schema formats please refer here.
- Check the mysql db for the new entry.
Please login to the mysql db and you should find a new entry to the database.
Kafka Connect with HDInsight Managed Kafka - Microsoft Tech Community
JDBC Connector (Source and Sink) | Confluent Hub
HDInsight Managed Kafka with OSS Kafka Schema Registry - Microsoft Tech Community
Kafka Connect in Action: JDBC Sink - YouTube
Installing a JDBC driver for the Kafka Connect JDBC connector - YouTube