In a normal Kafka cluster a producer application produces a message and publishes it to Kafka and a consumer application consumes the message from Kafka.
In these circumstances it is the application developer's responsibility to ensure that the producer and consumers are reliable and fault tolerant.
Kafka Connect is a framework for connecting Kafka with external systems such as databases, storage systems, applications , search indexes, and file systems, using so-called Connectors, in a reliable and fault tolerant way.
Kafka Connectors are ready-to-use components, which can help import data from external systems into Kafka topics and export data from Kafka topics into external systems. Existing connector implementations are normally available for common data sources and sinks with the option of creating ones own connector.
A source connector collects data from a system. Source systems can be entire databases, applications or message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.
A sink connector delivers data from Kafka topics into other systems, which might be indexes such as Elasticsearch, storage systems such as Azure Blob storage, or databases.
**Most connectors are maintained by the community, while others are supported by Confluent or its partners at Confluent Connector Hub. One can normally find connectors for most popular systems like Azure Blob ,Azure Data Lake Store, Elastic Search etc.
Every connector spawns tasks which are then distributed across workers in the Kafka Connect cluster.
Kafka Connect Architecture
Two edge nodes on an HDInsight cluster will be used to demonstrate Kafka Connect in distributed mode.
Scalability is achieved in Kafka Connect with the addition of more edges nodes to the HDInsight cluster either at the time of creation or post creation.
Since the number of edge nodes can be scaled up or down on an existing cluster , this functionality can be used to scale the size of the Kafka Connect cluster as well.
In this section we would deploy an HDInsight Managed Kafka cluster with two edge nodes inside a Virtual Network and then enable Kafka Connect in standalone mode on one of those edge nodes.
On the Custom deployment template populate the fields as described below. Leave the rest of their fields at their default entries
Check he box titled "I agree to the terms and conditions stated above" and click on Purchase.
Wait till the deployment completes and you get the Your Deployment is Complete message and then click on Go to resource.
Note: In this the Kafka Connect standalone mode you will need to make config changes on a single edge node.
PASSWORD
with the cluster login password, then enter the commandexport password='PASSWORD'
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
KAFKAZKHOSTS
variableecho $KAFKAZKHOSTS
zk1-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181,zk2-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
echo $KAFKABROKERS
wn1-kafka.eahjefyeyyeyeyygqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eaeyhdseyy1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
To run Kafka Connect in standalone mode one needs to look at two important files.
connect-standalone.properties
: Located at /usr/hdp/current/kafka-broker/bin
connect-standalone.sh
: Located at /usr/hdp/current/kafka-broker/bin
Note : The reason we create two copies of the connect-standalone. properties file below is to separate the rest.port property to different ports. If you do not do this , you will run into a rest.port conflict when you try creating the connectors.
connect-standalone.properties
to connect-standalone.properties-1
and populate the properties as shows below.sudo cp /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties
bootstrap.servers=<Enter the full contents of $KAFKABROKERS>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets1
offset.flush.interval.ms=10000
rest.port=8084
plugin.path=/usr/hdp/current/kafka-broker/connectors/jcustenborder-kafka-connect-twitter-0.3.33,/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-azure-blob-storage-1.3.2
connect-standalone.properties
to ``connect-standalone.properties-2` and edit the properties as below( Note the changed rest.port )sudo cp /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-standalone-2.properties
bootstrap.servers=<Enter the full contents of $KAFKAZKHOSTS>
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets1
offset.flush.interval.ms=10000
rest.port=8085
plugin.path=/usr/hdp/current/kafka-broker/connectors/jcustenborder-kafka-connect-twitter-0.3.33,/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-azure-blob-storage-1.3.2
Download the relevant Kafka Plugins from the Confluent Hub to your local desktop
Unzip the files to create the folder structures
sudo mkdir /usr/hdp/current/kafka-broker/connectors
sudo chmod 777 /usr/hdp/current/kafka-broker/connectors
Using WINSCP or any other SCP tool of your choice upload the Kafka Connect plugins into the folder path /usr/hdp/current/kafka-broker/connectors
Create a Twitter App and get the credentials
twitter.oauth.consumerKey
twitter.oauth.consumerSecret
twitter.oauth.accessToken
twitter.oauth.accessTokenSecret
Update the Kafka Connect plugin for Twitter properties file
/usr/hdp/current/kafka-broker/connectors
and create a new properties file called twitter.properties
cd /usr/hdp/current/kafka-broker/connectors/
sudo vi twitter.properties
"name": "Twitter-to-Kafka",
"connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"tasks.max": 1,
"kafka.status.topic":"twitterstatus",
"kafka.delete.topic":"twitterdelete",
"topic": "twitter1",
"twitter.oauth.consumerKey":"<twitter.oauth.consumerKey>",
"twitter.oauth.consumerSecret":"<twitter.oauth.consumerSecret>",
"twitter.oauth.accessToken":"<twitter.oauth.accessToken>",
"twitter.oauth.accessTokenSecret":"<twitter.oauth.accessTokenSecret>",
"filter.keywords":"keyword1,keyword2 ,...",
"process.deletes":false
Create a regular Azure Blob storage account and a container on Azure and note the storage access keys
Navigate to the folder path /usr/hdp/current/kafka-broker/connectors
and create a new properties file called blob.properties
cd /usr/hdp/current/kafka-broker/connectors/
sudo vi blob.properties
name=Kafka-to-Blob
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=twitterstatus
flush.size=3
azblob.account.name=<Azure Blob account Name>
azblob.account.key=<security key>
azblob.container.name=<container name>
format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat
confluent.topic.bootstrap.servers=<Enter the full contents of $KAFKAZKHOSTS>
confluent.topic.replication.factor=3
In the next section we would use the command line to start separate connector instances for running Source Tasks and Sink Tasks.
Start Source connector
sudo /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties /usr/hdp/current/kafka-broker/connectors/twitter.properties
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKAZKHOSTS --topic twitterstatus
Start Sink Connector
sudo /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone-2.properties /usr/hdp/current/kafka-broker/connectors/blob.properties
In this section we would deploy an HDInsight Managed Kafka cluster with two Edge Node inside a Virtual Network and then enable distributed Kafka Connect on both of those edge nodes.
On the Custom deployment template populate the fields as described below. Leave the rest of their fields at their default entries
Check he box titled "I agree to the terms and conditions stated above" and click on Purchase.
ed10-ag4kac.ohdqdgkr0bpe3kjx3dteggje4c.gx.internal.cloudapp.net
ed12-ag4kac.ohdqdgkr0bpe3kjx3dteggje4c.gx.internal.cloudapp.net
ed10
Note: In this lab you will need to make config changes in both the edge nodes ed10 and ed12 . To log into ed12 simply ssh into ed12 from ed10
sshuser@ed10-ag4kac:~$ ssh ed12-ag4kac.ohdqdgkr0bpe3kjx3dteggje4c.gx.internal.cloudapp.net
PASSWORD
with the cluster login password, then enter the commandexport password='PASSWORD'
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
KAFKAZKHOSTS
variableecho $KAFKAZKHOSTS
zk1-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181,zk2-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
echo $KAFKABROKERS
wn1-kafka.eahjefyeyyeyeyygqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eaeyhdseyy1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic agconnect-offsets --zookeeper $KAFKAZKHOSTS
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic agconnect-configs --zookeeper $KAFKAZKHOSTS
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic agconnect-status --zookeeper $KAFKAZKHOSTS
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic twitterstatus --zookeeper $KAFKAZKHOSTS
Download the relevant Kafka Plugins from the Confluent Hub to your local desktop
Unzip the files to create the folder structures
Note: The below step needs to be repeated for both ed10 and ed12 edge nodes
sudo mkdir /usr/hdp/current/kafka-broker/connectors
sudo chmod 777 /usr/hdp/current/kafka-broker/connectors
Using WINSCP or any other SCP tool of your choice upload the Kafka Connect Plugins into folder path created in the last step
rsync -r /usr/hdp/current/kafka-broker/connectors/ sshuser@<edge-node12-FQDN>:/usr/hdp/current/kafka-broker/connectors/
Note: The below steps needs to be repeated for both ed10 and ed12 edge nodes
To run Kafka Connect in distributed mode one needs to look at two important files.
connect-distributed.properties
: Located at /usr/hdp/current/kafka-broker/bin/conf
connect-distributed.sh
: Located at /usr/hdp/current/kafka-broker/bin/
In distributed mode, the workers need to be able to discover each other and have shared storage for connector configuration and offset data. Below are some of important parameters we would need to configure.
group.id
: ID that uniquely identifies the cluster these workers belong to. Make sure this value is not changed between the edge nodes.config.storage.topic
: Topic to store the connector and task configuration state in.offset.storage.topic
: Topic to store the connector offset state in.rest.port
: Port where the REST interface listens for HTTP requests.plugin.path
: Path for the Kafka Connect PluginsEdit the connect-distributed.properties
file sudo vi /usr/hdp/current/kafka-broker/conf/connect-distributed.properties
In the connect-distributed.properties
file, define the topics that will store the connector state, task configuration state, and connector offset state. Uncomment and modify the parameters in connect-distributed.properties
file as shown below. Note that we use some of the topics we created earlier.
bootstrap.servers=<Enter the full contents of $KAFKABROKERS>
group.id=agconnect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=agconnect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
config.storage.topic=agconnect-configs
config.storage.replication.factor=3
status.storage.topic=agconnect-status
status.storage.replication.factor=3
status.storage.partitions=5
offset.flush.interval.ms=10000
rest.port=8083
plugin.path=/usr/hdp/current/kafka-broker/connectors/jcustenborder-kafka-connect-twitter-0.3.33,/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-azure-blob-storage-1.3.2
nohup sudo /usr/hdp/current/kafka-broker/bin/connect-distributed.sh /usr/hdp/current/kafka-broker/conf/connect-distributed.properties &
Note : A file nohup.out is created in the same folder from where it is executed. If you are interested in exploring the startup logs simply cat nohup.out
Note : In distributed mode, the REST API is the primary interface to the Connect cluster. Requests can be made from any edge node and the REST API automatically forwards requests. By default REST API for Kafka Connect runs on port 8083 but is configurable in connector properties
curl -s http://<edge-node-FQDN>:8083/ |jq
curl -s http://<edge-node-FQDN>:8083/ |jq
{
"version": "2.1.0.3.1.2.1-1",
"commit": "ded5eefdb4f63651",
"kafka_cluster_id": "W0HIh8naTgip7Taju7G7fg"
}
In this section we started Kafka Connect in distributed mode alongside an HDInsight cluster and verified it using the Kafka REST API.
In the next section we would use Kafka REST API's to start separate connector instances for running Source Tasks and Sink Tasks.
Create a Twitter App and get the credentials
twitter.oauth.consumerKey
twitter.oauth.consumerSecret
twitter.oauth.accessToken
twitter.oauth.accessTokenSecret
curl -X POST http://<edge-node-FQDN>:8083/connectors -H "Content-Type: application/json" -d @- <<BODY
{
"name": "Twitter-to-Kafka",
"config": {
"name": "Twitter-to-Kafka",
"connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"tasks.max": 3,
"kafka.status.topic":"twitterstatus",
"kafka.delete.topic":"twitterdelete",
"twitter.oauth.consumerKey":"<twitter.oauth.consumerKey>",
"twitter.oauth.consumerSecret":"<twitter.oauth.consumerSecret>",
"twitter.oauth.accessToken":"<twitter.oauth.accessToken>",
"twitter.oauth.accessTokenSecret":"<twitter.oauth.accessTokenSecret>",
"filter.keywords":"<keyword>",
"process.deletes":false
}
}
BODY
Twitter-to-Kafka
was createdcurl -X GET http://ed10-ag4kac.ohdqdgkr0bpe3kjx3dteggje4c.gx.internal.cloudapp.net:8083/connectors
["local-file-source","Twitter-to-Kafka"]
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKAZKHOSTS --topic twitterstatus
If everything is working , you should see a stream of relevant Twitter Messages on the console with specified keywords.
Try pausing the tasks in the connector , this should also pause the Twitter Stream on the console producer.
curl -X PUT http://<edge-node-FQDN>:8083/connectors/Twitter-to-Kafka/pause
curl -X PUT http://<edge-node-FQDN>:8083/connectors/Twitter-to-Kafka/resume
Create a regular Azure Blob storage account and a container on Azure and note the storage access keys
From any edge node run the below to create a new connector and start tasks. Note that the number of tasks can be increased as per the size of your cluster.
curl -X POST http://<edge-node-FQDN>:8083/connectors -H "Content-Type: application/json" -d @- <<BODY
{
"name": "Kafka-to-Blob",
"config": {
"connector.class": "io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector",
"tasks.max": 1,
"topics":"twitterstatus",
"flush.size":3,
"azblob.account.name":"<Storage-account-name>",
"azblob.account.key":"<Storage-accesss-key>",
"azblob.container.name":"<Container-name>",
"format.class":"io.confluent.connect.azure.blob.format.avro.AvroFormat",
"confluent.topic.bootstrap.servers":"Enter the full contents of $KAFKAZKHOSTS",
"confluent.topic.replication.factor":3
}
}
BODY
Kafka-to-Blob
was created. You should see both the source and sink connectors.curl -X GET http://<edge-node-FQDN>:8083/connectors
["local-file-source","Twitter-to-Kafka","Kafka-to-Blob"]
Authenticate into your Azure portal and navigate to the storage account to validate if Twitter Messages are being sent to the specific container.
Below are some commonly used REST APIs for controlling KAFKA Connect
Status of distributed connect
curl -s http://<edge-node-FQDN>:8083/ |jq
curl -X GET http://<edge-node-FQDN>:8083/connector-plugins | jq
curl -X GET http://<edge-node-FQDN>:8083/connectors
curl -X GET http://<edge-node-FQDN>:8083/connectors/<connector-name>
curl -X GET http://<edge-node-FQDN>:8083/connectors/<connector-name>/tasks
curl -X POST http://<edge-node-FQDN>:8083/connectors/<connector-name>/restart
curl -X DELETE http://<edge-node-FQDN>:8083/connectors/<connector-name>/
curl -X PUT http://<edge-node-FQDN>:8083/connectors/<connector-name>/pause
curl -X PUT http://<edge-node-FQDN>:8083/connectors/<connector-name>/resume
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.