Kafka Schema Registry provides serializers that plug into Kafka clients that handle message schema storage and retrieval for Kafka messages that are sent in the Avro format. Its used to be a OSS project by Confluent , but is now under the Confluent community license. This lab uses the OSS Schema Registry.
The Schema Registry additionally serves the below purposes
In an HDInsight Managed Kafka cluster the Schema Registry is typically deployed on an Edge node to allow compute separation from Head Nodes.
Below is a representative architecture of how the Schema Registry is deployed on an HDInsight cluster. Note that Schema Registry natively exposes a REST API for operations on it. Producers and consumers can interact with the Schema Registry from within the VNet or using the Kafka REST Proxy.
In this section we would deploy an HDInsight Managed Kafka cluster with an Edge Node inside a Virtual Network and then install the Confluent Schema Registry on the Edge Node.
On the Custom deployment template populate the fields as described below. Leave the rest of their fields at their default entries
Check he box titled "I agree to the terms and conditions stated above" and click on Purchase.
Using an SSH client of your choice ssh into the edge node using the sshuser and password that you set in the custom ARM script.
In the next section we would configure the Confluent Kafka Schema Registry that we installed on the edge node
The confluent schema registry is located at /etc/schema-registry/schema-registry.properties
and the mechanisms to start and stop service executables are located at the /usr/bin/
folder.
The Schema Register needs to know the Zookeeper service to be able to interact with HDInsight Kafka cluster. Follow the below steps to get the details of the Zookeeper Quorum.
PASSWORD
with the cluster login password, then enter the commandexport password='PASSWORD'
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
KAFKAZKHOSTS
variableecho $KAFKAZKHOSTS
zk1-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181,zk2-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Check to see if the Kafka Broker information is available
echo $KAFKABROKERS
wn1-kafka.eahjefyeyyeyeyygqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eaeyhdseyy1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
sudo vi /etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zk0-ohkl-h:2181,zk1-ohkl-h:2181,zk2-ohkl-h:2181
kafkastore.topic=_schemas
debug=false
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zk1-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181,zk2-ag4kaf.q2hwzr1xkxjuvobkaagmjjkhta.gx.internal.cloudapp.net:2181
kafkastore.topic=_schemas
debug=true
Save and exit the properties file using :wq
command
Use the below commands to Start the Schema Registry and point it to use the updated Schema Registry properties file
cd /bin
$ sudo schema-registry-start /etc/schema-registry/schema-registry.properties
...
...
[2020-03-22 13:24:49,089] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.Application:190)
[2020-03-22 13:24:49,154] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2020-03-22 13:24:49,753] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2020-03-22 13:24:49,902] INFO Started o.e.j.s.ServletContextHandler@40844aab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2020-03-22 13:24:49,914] INFO Started NetworkTrafficServerConnector@33fe57a9{HTTP/1.1}{0.0.0.0:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2020-03-22 13:24:49,915] INFO Started @2780ms (org.eclipse.jetty.server.Server:379)
[2020-03-22 13:24:49,915] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
With the Schema Registry running in one SSH session , launch another SSH window and try out some basic commands to ensure that Schema Registry is working as expected.
Register a new version of a schema under the subject "Kafka-key" and note the output
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}'
HTTP/1.1 200 OK
Date: Sun, 22 Mar 2020 16:33:04 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 9
Server: Jetty(9.2.24.v20180105)
curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
HTTP/1.1 200 OK
Date: Sun, 22 Mar 2020 16:34:18 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 9
Server: Jetty(9.2.24.v20180105)
curl -X GET -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
http://localhost:8081/subjects
HTTP/1.1 200 OK
Date: Sun, 22 Mar 2020 16:34:39 GMT
Content-Type: application/vnd.schemaregistry.v1+json
Content-Length: 27
Server: Jetty(9.2.24.v20180105)
["Kafka-value","Kafka-key"]
agkafkaschemareg
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic agkafkaschemareg --zookeeper $KAFKAZKHOSTS
Use the Kafka Avro Console Producer to create a schema , assign the schema to the Topic and start sending data to the topic in Avro format. Ensure that the Kafka Topic in the previous step is successfully created and that $KAFKABROKERS has a value in it.
The schema we are sending is a Key: Value Pair
Key : Int
Value
{
"type": "record",
"name": "example_schema",
"namespace": "com.example",
"fields": [
{
"name": "cust_id",
"type": "int",
"doc": "Id of the customer account"
},
{
"name": "year",
"type": "int",
"doc": "year of expense"
},
{
"name": "expenses",
"type": {
"type": "array",
"items": "float"
},
"doc": "Expenses for the year"
}
],
"doc:": "A basic schema for storing messages"
}
/usr/bin/kafka-avro-console-producer --broker-list $KAFKABROKERS --topic agkafkaschemareg --property parse.key=true --property key.schema='{"type" : "int", "name" : "id"}' --property value.schema='{ "type" : "record", "name" : "example_schema", "namespace" : "com.example", "fields" : [ { "name" : "cust_id", "type" : "int", "doc" : "Id of the customer account" }, { "name" : "year", "type" : "int", "doc" : "year of expense" }, { "name" : "expenses", "type" : {"type": "array", "items": "float"}, "doc" : "Expenses for the year" } ], "doc:" : "A basic schema for storing messages" }'
1 TAB {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
2 TAB {"cust_id":3535353, "year":2011, "expenses":[761.35, 92.18, 14.41]}
3 TAB {"cust_id":7979797, "year":2011, "expenses":[4489.00]}
1 {"cust_id":1313131, "year":2012, "expenses":[1313.13, 2424.24]}
2 {"cust_id":1313131,"cust_age":34 "year":2012, "expenses":[1313.13, 2424.24,34.212]}
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"cust_id":1313131,"cust_age":34 "year":2012, "expenses":[1313.13, 2424.24,34.212]} to Avro of schema {"type":"record","name":"example_schema","namespace":"com.example","fields":[{"name":"cust_id","type":"int","doc":"Id of the customer account"},{"name":"year","type":"int","doc":"year of expense"},{"name":"expenses","type":{"type":"array","items":"float"},"doc":"Expenses for the year"}],"doc:":"A basic schema for storing messages"}
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('"' (code 34)): was expecting comma to separate OBJECT entries
at [Source: java.io.StringReader@3561c410; line: 1, column: 35]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:442)
at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:406)
at org.apache.avro.io.JsonDecoder.getVaueAsTree(JsonDecoder.java:549)
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:474)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:166)
at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:83)
at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:511)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:182)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:213)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:200)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:59)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
sudo /usr/bin/kafka-avro-console-consumer --bootstrap-server $KAFKABROKERS --topic agkafkaschemareg --from-beginning
{"cust_id":1313131,"year":2012,"expenses":[1313.13,2424.24]}
{"cust_id":7979797,"year":2011,"expenses":[4489.0]}
{"cust_id":3535353,"year":2011,"expenses":[761.35,92.18,14.41]}
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.