How to Migrate Schema Registry from Confluent to Azure Event Hub
Published Jul 05 2022 04:08 PM 5,241 Views
Microsoft

In many event streaming and messaging scenarios, the event or message payload contains structured data. Schema-driven formats such as Apache Avro are often used to serialize or deserialize such structured data. As we know, Confluent is a general platform that we can use Confluent Registry Schema to implement this functionality. Meanwhile, Azure Event Hub supports Azure Schema Registry as well.

 

The target audience for this blog is the developers who were using Confluent Schema Registry and planning to migrate Confluent Schema Registry to Azure Schema Registry in Azure Event hub. This blog won’t introduce common basic Confluent/ Kafka client implementation mostly. If you’d like to integrate KafkaProducer into your event-driven application, you could learn from this tutorial. Instead, this blog points to showing the key steps/differences while migrating Schema Registry from Confluent to Azure Event Hub.

 

Overview:

Confluent Schema Registry: https://docs.confluent.io/platform/current/schema-registry/schema_registry_tutorial.html#sr-benefits

Apache Kafka producers write data to Kafka topics and Kafka consumers read data from Kafka topics. There is an implicit “contract” that producers write data with a schema that can be read by consumers, even as producers and consumers evolve their schemas. Schema Registry helps ensure that this contract is met with compatibility checks.

 

Azure Schema Registry: https://docs.microsoft.com/en-us/azure/event-hubs/schema-registry-overview

An event producer uses a schema to serialize event payload and publish it to an event broker such as Event Hubs. Event consumers read event payload from the broker and de-serialize it using the same schema. So, both producers and consumers can validate the integrity of the data with the same schema.

 

Confluent Schema Registry:

1.Configuration

We could refer to the sample configuration file from this link.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

 

2.Parameter definition and Initialize Kafka client

It loads the above configuration file automatically and put all of properties into props object.

 

try (InputStream inputStream = new FileInputStream(configFile)) {
  props.load(inputStream);
}

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

 

3.Authentication:

As observed above, Confluent client is using basic auth to connect with target Schema Registry. You could refer to the Step3 from this guidance to generate the API KEY and API Secret.

basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

 

4.Produce event

for (long i = 0; i < 10; i++) {
    final String orderId = "id" + Long.toString(i);
    final Payment payment = new Payment(orderId, 1000.00d);
    final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
    producer.send(record);
    Thread.sleep(1000L);
}
producer.flush();
System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC);

 

Here, the producer supports two basic types of Avro records a specific code-generated class, or a generic record.

Using a specific code-generated class requires you to define and compile a Java class for your schema, but it easier to work with in your code.

However, in other scenarios where you need to work dynamically with data of any type and do not have Java classes for your record types, use GenericRecord <streams-data-avro>.

 

 

Migration from Confluent to Azure Event Hub

There isn’t direct way or tool to migrate schema registry to azure schema registry from external server. The azure schema registry needs to be recreated into schema group in the Azure Event Hub namespace.

 

Before moving forward, you need to create the Schema Registry in your event hub namespace over azure portal.

 

Jay_Gong_0-1657007019811.png

 

Generally, we use Kafka-Integrated Apache Avro serializers and deserializers to utilize Registry Schema as well. The good news is we can use this Azure Registry Schema for Kafka SDK to use Registry Schema which is compatible with native Kafka-Integrated client mostly. The most of code which is used to utilize Confluent Registry Schema can be reused. However, please kindly notice following key points and differences.

 

1.Configuration

Please refer to the following configuration which is retrieved from this link in the same repository.

# standard EH Kafka configs
bootstrap.servers=jaygongeventhub.servicebus.windows.net:9093
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://jaygongeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=***=";

topic=jayevent

# schema registry specific configs
schema.registry.url=https://jaygongeventhub.servicebus.windows.net

# used by producer only
schema.group=jaysg

# used for MSI-enabled VM, ignores client secret credentials if enabled
use.managed.identity.credential=false

tenant.id=***

client.id=***

client.secret=***

 

The following Event Hub configurations are required:

bootstrap.servers – Event Hub namespace

sasl.jaas.config - JAAS configuration for SASL - Event Hub namespace, SAS key name, and SAS key value should be replaced

topic - Event Hub topic for produce and consume operations

 

The following Schema Registry configurations are required:

schema.registry.url - Event Hub namespace with Schema Registry enabled.

schema.group - Schema Registry group with serialization type 'Avro' (must be created before the application runs)

use.managed.identity.credential - indicates that MSI credentials should be used, should be used for MSI-enabled VM

tenant.id - sets the tenant ID of the application

client.id - sets the client ID of the application

client.secret - sets the client secret for authentication

 

2.Parameter definition

Please refer to the snippet of sample code from App.java.

Properties props = new Properties();
props.load(new FileInputStream("src/main/resources/app.properties"));

// EH specific properties
String brokerUrl = props.getProperty("bootstrap.servers");
String jaasConfig = props.getProperty("sasl.jaas.config");
String topicName = props.getProperty("topic");

// Schema Registry specific properties
String registryUrl = props.getProperty("schema.registry.url");
String schemaGroup = props.getProperty("schema.group"); 

The value of schema.registry.url does not have to be the same as the accessed event hub namespace which is set in the property bootstrap.servers. That means we can use the azure schema registry from different event hub namespace in another azure subscription. We only need to make sure that we have sufficient permission to access target schema registry via AAD credentials.

 

3.Authentification

As we mentioned above, we need to make sure that we have permission to access schema registry. It’s different with the basic auth way in the Confluent. When accessing the azure schema registry programmatically, we need to register an application in Azure Active Directory (Azure AD) and add the security principal of the application to one of the Azure role-based access control (Azure RBAC) roles:

Jay_Gong_1-1657007019815.png

 

If you want to know the instructions on creating registering an application using the Azure portal, see Register an app with Azure AD. Then you could note down the client ID (application ID), tenant ID, and the secret to use in the above configuration file: tenant.id, client.id and client.secret.

 

We need to create ToeknCredential via Azure AD SDK to acquire access token to connect with target Scheme Registry in Event Hub namespace. Please refer to sample code below.

TokenCredential credential;
if (props.getProperty("use.managed.identity.credential").equals("true")) {
    credential = new ManagedIdentityCredentialBuilder().build();
} else {
    credential = new ClientSecretCredentialBuilder()
            .tenantId(props.getProperty("tenant.id"))
            .clientId(props.getProperty("client.id"))
            .clientSecret(props.getProperty("client.secret"))
            .build();
}

 

4.Initialize Kafka client

The following sample code indicates that it’s built based on native Kafka client SDK. If your previous code is based on native Kafka client, they should be compatible so that you don’t need to change the properties definition except the following credential section. However, if your previous code is packed in a high level against native Kafka API, you might need to refer to this repository to rebuild the application code.

 

// EH Kafka Configs
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasConfig);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);

// Schema Registry configs
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
props.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, schemaGroup);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

 

5.Produce event

for (int i = 0; i < 10; i++) {
    Order order = new Order("ID-" + i, 10.99 + i, "Sample order -" + i);
    ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order);
    producer.send(record);
    logger.info("Sent Order {}", order);
}
producer.flush();

 Azure Schema Registry supports two basic types of Avro records which is similar to Confluent. You could use specific Jave module class above. Or you could use generic record definition via following code.  

String key = "key1";
String userSchema = "{\"namespace\": \"com.azure.schemaregistry.samples\", \"type\": \"record\", \"name\": " +
        "\"Order\", \"fields\": [ { \"name\": \"id\", \"type\": \"string\"}, " +
        "{ \"name\": \"amount\", \"type\": \"double\"}, { \"name\": \"description\", \"type\": \"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema); 

 

After running the code successfully, you could verify the Schema pattern from azure portal. If you encounter 401 unauthorized error, please grant Schema Registry Contributor (or Reader) RBAC role to your login account. 

1.png

 

 

1 Comment
Co-Authors
Version history
Last update:
‎Jul 05 2022 12:52 AM
Updated by: