Azure Event Hub
14 TopicsSSL/TLS connection issue troubleshooting guide
You may experience exceptions or errors when establishing TLS connections with Azure services. Exceptions are vary dramatically depending on the client and server types. A typical ones such as "Could not create SSL/TLS secure channel." "SSL Handshake Failed", etc. In this article we will discuss common causes of TLS related issue and troubleshooting steps.38KViews9likes1CommentUnable to delete Azure EventHub Cluster/Namespace/Entity/ConsumerGroup from Portal/ PowerShell/ CLI
Issue: Unable to delete Azure EventHub Cluster/Namespace/ Entity from Portal/ PowerShell/ CLI. Case 1: When EventHub tier is Premium and you are unable to delete Azure EventHub namespace and getting conflict operation error 409. Sample error message : "statusMessage":"{\"error\":{\"code\":\"Conflict\",\"message\":\"Namespace provisioning in transition. For more information visit https://aka.ms/eventhubsarmexceptions. Reason: The reason for stuck state of EventHub namespace or its provisioning failure is due to a known race condition which initiates between two or more internal microservices from premium EventHub architecture from different namespaces. If we trigger a Premium namespace provisioning and an event hub creation right after that then this race condition initiates and both provisioning may fail and the runtime creation will fail with internal server error 500. Recommendation: It is recommended to not perform back to back create operation on EventHub until the first EventHub namespace is not created successfully and if you want to delete the namespace just after its creation then it is recommended that the operation should be performed after 1 hour of its created time. Action to be taken: In such situation where your EventHub is stuck in activating/ Deleting state, raise support request with Microsoft to fix the state of namespace and to bring it in active state. Case 2: While deleting EventHub you have received a conflict error 409 but it's not a Premium EventHub. Reason: The reason for this conflict error could be due to any pending operations executing in the backend associated with the EventHub components and you might be trying to delete the EventHub while its execution is not completed. Recommendation and Action to be taken: In such situation, wait for some time to finish the pending operations on EventHub or its component and then retry after sometime. Case 3: Getting successful message on deletion of EventHub entity within a namespace but after sometime it is recreating and reappearing on portal. Reason: The recreation of entities in namespace could be due to any diagnostics settings enabled against the namespace entity or the Application insights might be using the EventHub entity and so Azure Monitor resource provider could be recreating the EventHub. Action to be taken: In such case, Please follow the below steps: Check if the entity is operational using PowerShell/ CLI. You may test using any Get command, example - Get-AzEventhub If the EventHub is recreated, check the content of EventHub. You can check the content either by Processing Data option on EventHub control pane on portal or by using Service Bus explorer tool. Once you see any content or record in EventHub entity, then identify the resource ID which is sending the events to that EventHub by looking at content data. Go to that resource from Azure portal and disable the diagnostic settings or application Insight settings which is using the EH entity. Recommendation: If you want to delete the EventHub entity or namespace, you should check whether none of the resource from this document are streaming logs to EventHub. Case 4: When you have deleted the EventHub and all operations on the deleted EventHub entity are failing but it is still showing on Portal Reason: The reason why we are still seeing a non-operational and deleted EventHub could be due to its stale entry in ARM cache Action to be taken: In such case, Please follow the below steps: Check if the entity is operational using PowerShell/ CLI. You may test using any Get command, example - Get-AzEventhub If the operation is failing with error code 404 i.e. entity not found, but it is still showing on portal then raise a support ticket with Microsoft to perform the synchronize operation on ARM cache of resource. Case 5: When you are unable to delete Kafka enabled EventHub topic. Reason: One of the reason why you are still seeing the Kafka enabled EventHub topic after its deletion could be because the Kafka producer keeps re-creating the EventHub due to Auto topic creation is ON by default. Action to be taken: Check the activity logs to make sure that you see the deleted operation. Set the Auto topic creation property as OFF. Recommendation: You can either stop the Kafka producers or pick another EventHub name. Case 6: Unable to delete Dedicated Event hub cluster and getting error message "BadRequest" Reason: It is known issue that a dedicated EventHub Cluster cannot be deleted until four hours after its creation time. Recommendation and Action to be taken: Please rerun this operation after that time has elapsed, or contact EventHub team through a support request if the cluster is stuck in a particular state. Details to be included in support ticket should be resource ID, correlation ID of operation and timestamp of issue.6.3KViews4likes4Comments[Eventhub] How to receive diagnostic log from Storage Accounts on different Subscriptions to EH
“I would like to have a centralized EventHub receiving logs from Storage Accounts on different Subscriptions.” Gathering different Storage Account’s Diagnostic Log from a different Subscription to one single EventHub.4.4KViews1like0Comments[EventHub] The behavior of EPH SDK behavior of Epoch
Are you seeing logs where Epoch ‘0’ is getting disconnected? Does your consumer group consist of numerous consumers but only one partition? If so, this article will help you understand why you are getting such informative logs. "...current receiver '<RECEIVER_NAME>' with epoch '0' is getting disconnected"5.3KViews2likes0CommentsCustom DNS server issues while connecting from Databricks to Event Hub
Scenario: You have an Event hub which receives events from Databricks. Suddenly, you observe that after receiving few messages, you encounter below error: AMQPConnectionError: Unable to open authentication session on connection b'EHProducer-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'. Please confirm target hostname exists: b'xxxxxxx.servicebus.windows.net' You start Wondering! May be your Event Hub namespace doesn’t exist or someone has deleted accidentally. You navigate to Azure portal and check Event Hub namespace exists and even you were continuously receiving messages in Event Hubs. Then what could be the reason for the above error to be encountered? Root Cause: Connectivity Issue prevailed while connecting Data Bricks to Event Hub. This happens while you have a custom DNS Server in Databricks. It is basically used in all PAAS services where Custom DNS Server has been used. IP address 168.63.129.16 is a virtual public IP address that is used to facilitate a communication channel to Azure platform resources. More about this Special IP address: - This special public IP address is owned by Microsoft and will not change. As mentioned in this article we recommend that you allow this IP address in any local (in the VM) firewall policies (outbound direction). The communication between this special IP address and the resources is safe because only the internal Azure platform can source a message from this IP address. If this address is blocked, unexpected behavior can occur in a variety of scenarios. 168.63.129.16 is a virtual IP of the host node and as such it is not subject to user defined routes. Wire Server IP plays a important role in DHCP and DNS. This IP is also used for DNS resolution. Any Azure resources like Storage Account, Cosmos DB, Event Hubs or VM, VMSS and App Services will have a FQDN (Fully Qualified Domain Namespace). Each of these FQDN will be hosting the Domain names and A records at the backend server. These can be probed once you forward the request to the Wire Server IP 168.63.129.16 It acts as any cast IP in the Azure backbone. Custom DNS Server (10.2.0.4) doesn’t own any of the FQDN’s that is managed by Azure. So basically, any Azure IAAS or PAAS services will need this Wire Server IP to connect from Custom DNS server (10.2.0.4) to connect with any IAAS or PAAS services. How to Mitigate this behavior- So, in this scenario, Databricks, which is integrated with VNET had a Custom DNS Server. DNS server will forward the requests to Wire Server IP 168.63.129.16 to Azure PAAS resolution, in this case is Event hub from which Databricks is receiving the Events. Steps needed to add 168.63.129.16 your DNS servers: In the Azure Portal, please open the VNET where the Databricks service is injected in. Open the tab DNS Server. Add the IP 168.63.129.16 in the IP address (Add DNS server) Click on Save Button.6.9KViews0likes0Comments[EventHub] Types of Throttling Errors and how to mitigate it
Are you getting 50002 Error or 50008 Error return code in your EventHub? If that’s the case, you are in the right place. In this documentation, we will go through what these two return codes are and how to prevent your EventHub to throttle.9.2KViews2likes0CommentsCommon causes of SSL/TLS connection issues and solutions
In the TLS connection common causes and troubleshooting guide (microsoft.com) and TLS connection common causes and troubleshooting guide (microsoft.com), the mechanism of establishing SSL/TLS and tools to troubleshoot SSL/TLS connection were introduced. In this article, I would like to introduce 3 common issues that may occur when establishing SSL/TLS connection and corresponding solutions for windows, Linux, .NET and Java. TLS version mismatch Cipher suite mismatch TLS certificate is not trusted TLS version mismatch Before we jump into solutions, let me introduce how TLS version is determined. As the dataflow introduced in the first session(https://techcommunity.microsoft.com/t5/azure-paas-blog/ssl-tls-connection-issue-troubleshooting-guide/ba-p/2108065), TLS connection is always started from client end, so it is client proposes a TLS version and server only finds out if server itself supports the client's TLS version. If the server supports the TLS version, then they can continue the conversation, if server does not support, the conversation is ended. Detection You may test with the tools introduced in this blog(TLS connection common causes and troubleshooting guide (microsoft.com)) to verify if TLS connection issue was caused by TLS version mismatch. If capturing network packet, you can also view TLS version specified in Client Hello. If connection terminated without Server Hello, it could be either TLS version mismatch or Ciphersuite mismatch. Solution Different types of clients have their own mechanism to determine TLS version. For example, Web browsers - IE, Edge, Chrome, Firefox have their own set of TLS versions. Applications have their own library to define TLS version. Operating system level like windows also supports to define TLS version. Web browser In the latest Edge and Chrome, TLS 1.0 and TLS 1.1 are deprecated. TLS 1.2 is the default TLS version for these 2 browsers. Below are the steps of setting TLS version in Internet Explorer and Firefox and are working in Window 10. Internet Explorer Search Internet Options Find the setting in the Advanced tab. Firefox Open Firefox, type about:config in the address bar. Type tls in the search bar, find the setting of security.tls.version.min and security.tls.version.max. The value is the range of supported tls version. 1 is for tls 1.0, 2 is for tls 1.1, 3 is for tls 1.2, 4 is for tls 1.3. Windows System Different windows OS versions have different default TLS versions. The default TLS version can be override by adding/editing DWORD registry values ‘Enabled’ and ‘DisabledByDefault’. These registry values are configured separately for the protocol client and server roles under the registry subkeys named using the following format: <SSL/TLS/DTLS> <major version number>.<minor version number><Client\Server> For example, below is the registry paths with version-specific subkeys: Computer\HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\SecurityProviders\SCHANNEL\Protocols\TLS 1.2\Client For the details, please refer to Transport Layer Security (TLS) registry settings | Microsoft Learn. Application that running with .NET framework The application uses OS level configuration by default. For a quick test for http requests, you can add the below line to specify the TLS version in your application before TLS connection is established. To be on a safer end, you may define it in the beginning of the project. ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12 Above can be used as a quick test to verify the problem, it is always recommended to follow below document for best practices. https://docs.microsoft.com/en-us/dotnet/framework/network-programming/tls Java Application For the Java application which uses Apache HttpClient to communicate with HTTP server, you may check link How to Set TLS Version in Apache HttpClient | Baeldung about how to set TLS version in code. Cipher suite mismatch Like TLS version mismatch, CipherSuite mismatch can also be tested with the tools that introduced in previous article. Detection In the network packet, the connection is terminated after Client Hello, so if you do not see a Server Hello packet, that indicates either TLS version mismatch or ciphersuite mismatch. If server is supported public access, you can also test using SSLLab(https://www.ssllabs.com/ssltest/analyze.html) to detect all supported CipherSuite. Solution From the process of establishing SSL/TLS connections, the server has final decision of choosing which CipherSuite in the communication. Different Windows OS versions support different TLS CipherSuite and priority order. For the supported CipherSuite, please refer to Cipher Suites in TLS/SSL (Schannel SSP) - Win32 apps | Microsoft Learn for details. If a service is hosted in Windows OS. the default order could be override by below group policy to affect the logic of choosing CipherSuite to communicate. The steps are working in the Windows Server 2019. Edit group policy -> Computer Configuration > Administrative Templates > Network > SSL Configuration Settings -> SSL Cipher Suite Order. Enable the configured with the priority list for all cipher suites you want. The CipherSuites can be manipulated by command as well. Please refer to TLS Module | Microsoft Learn for details. TLS certificate is not trusted Detection Access the url from web browser. It does not matter if the page can be loaded or not. Before loading anything from the remote server, web browser tries to establish TLS connection. If you see the error below returned, it means certificate is not trusted on current machine. Solution To resolve this issue, we need to add the CA certificate into client trusted root store. The CA certificate can be got from web browser. Click warning icon -> the warning of ‘isn’t secure’ in the browser. Click ‘show certificate’ button. Export the certificate. Import the exported crt file into client system. Windows Manage computer certificates. Trusted Root Certification Authorities -> Certificates -> All Tasks -> Import. Select the exported crt file with other default setting. Ubuntu Below command is used to check current trust CA information in the system. awk -v cmd='openssl x509 -noout -subject' ' /BEGIN/{close(cmd)};{print | cmd}' < /etc/ssl/certs/ca-certificates.crt If you did not see desired CA in the result, the commands below are used to add new CA certificates. $ sudo cp <exported crt file> /usr/local/share/ca-certificates $ sudo update-ca-certificates RedHat/CentOS Below command is used to check current trust CA information in the system. awk -v cmd='openssl x509 -noout -subject' ' /BEGIN/{close(cmd)};{print | cmd}' < /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem If you did not see desired CA in the result, the commands below are used to add new CA certificates. sudo cp <exported crt file> /etc/pki/ca-trust/source/anchors/ sudo update-ca-trust Java The JVM uses a trust store which contains certificates of well-known certification authorities. The trust store on the machine may not contain the new certificates that we recently started using. If this is the case, then the Java application would receive SSL failures when trying to access the storage endpoint. The errors would look like the following: Exception in thread "main" java.lang.RuntimeException: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at org.example.App.main(App.java:54) Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:130) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:371) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:314) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:309) Run the below command to import the crt file to JVM cert store. The command is working in the JDK 19.0.2. keytool -importcert -alias <alias> -keystore "<JAVA_HOME>/lib/security/cacerts" -storepass changeit -file <crt_file> Below command is used to export current certificates information in the JVM cert store. keytool -keystore " <JAVA_HOME>\lib\security\cacerts" -list -storepass changeit > cert.txt The certificate will be displayed in the cert.txt file if it was imported successfully.43KViews4likes0CommentsAzure Data Factory trigger is not initiated when uploading a file using Java SDK
Uploading file using Java SDK class DataLakeFileClient does not initiate an ADF trigger despite configuring the ADF trigger correctly to be initiated once a new file is created. This is happening only when the trigger is configured to ignore 0 byte blobs.4KViews7likes0CommentsUse python SDK to send and receive events with schema registry in azure event hub
This blog is the complement of another blog which is related to Azure Event Hub Schema Registry. As we known, it’s not supported to migrate Confluent(Kafka) schema registry into Azure schema registry directly. We need to create and manage the scheme registry in the azure event hub separately. The good news is Azure Event Hub supplies multiple client sdks for us to use to serialize and deserialize payloads containing schema registry identifiers and avro-encoded data. In this section, I’d like to share how to use python sdk to send or receive events with schema registry in the Azure Event Hub mostly. Prerequisites: 1.Create a schema registry group in the event hub portal. You could refer to this official guidance to create schema registry group. 2.Install required python packages with pip tool a.pip install azure-schemaregistry-avroencoder The main package we will use below. b.pip install azure-identity Authentication is required to take schema registry group and create schema registry. Hence, we need use TokenCredential protocol of AAD credential. c.pip install aiohttp We need installing an async transport to use aysnc API. 3.To implement the TokenCredential authentication flow mentioned above, the following credential types if enabled will be tried in order: EnvironmentCredential ManagedIdentityCredential SharedTokenCacheCredential VisualStudioCredential VisualStudioCodeCredential AzureCliCredential AzurePowerShellCredential InteractiveBrowserCredential In test demo below, EnvironmentCredential is used, hence we need to register an AAD application to get tenant id, client id and client secret information. 4.If we want to pass EventData as message type while encoding, we also need to make sure that we have installed azure-eventhub>=5.9.0 to use azure.eventhub.EventData module class. Test Demo: 1.Client initialization import os from azure.eventhub import EventHubConsumerClient from azure.schemaregistry import SchemaRegistryClient from azure.schemaregistry.encoder.avroencoder import AvroEncoder from azure.identity import DefaultAzureCredential os.environ["AZURE_TENANT_ID"] = "***" os.environ["AZURE_CLIENT_ID"] = "***" os.environ["AZURE_CLIENT_SECRET"] = ***" token_credential = DefaultAzureCredential() fully_qualified_namespace = '<event hub namespace name>.servicebus.windows.net' group_name = '<schema registry group name>' eventhub_connection_str = '<Connection String>' eventhub_name = '<the event hub entity name we need to access>' 2.Send event data with encoded content definition = """ {"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }""" schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential) avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True) eventhub_producer = EventHubProducerClient.from_connection_string( conn_str=eventhub_connection_str, eventhub_name=eventhub_name ) with eventhub_producer, avro_encoder: event_data_batch = eventhub_producer.create_batch() dict_content = {"name": "John", "favorite_number": 20, "favorite_color": "blue"} event_data = avro_encoder.encode(dict_content, schema=definition) event_data_batch.add(event_data) eventhub_producer.send_batch(event_data_batch) If we set the auto_register parameter as true, it registers new schemas passed to encode. We can check the created schema registry in event hub portal, the schema name is combined by value of namespace and name property in the definition we set. If we observe that the created event data, the event text content is encoded as binary. In fact, the avro_encoder.encode method is using underlying BinaryEncoder function to encode/decode the message data. Once the schema registry is created, we can’t add extra property which is not contained in the standard schema definition. For example, if we set dictionary content as: dict_content = {"name": "Bob1", "favorite_number": 7, "favorite_color": "red","jayName":"jay"} we will encounter following exception: If we miss required property, it won’t passed through as well. For example, if we set dictionary content as: dict_content = {"name": "Bob", "favorite_number": 7} we will encounter following exception: 3.Receive event data with decoded content eventhub_consumer = EventHubConsumerClient.from_connection_string( conn_str=eventhub_connection_str, consumer_group='$Default', eventhub_name=eventhub_name, ) def on_event(partition_context, event): decoded_content = avro_encoder.decode(event) print(decoded_content) with eventhub_consumer, avro_encoder: eventhub_consumer.receive(on_event=on_event, starting_position="-1") We could also get sync and async sample code snippets from this link.3.8KViews0likes0CommentsHow to Migrate Schema Registry from Confluent to Azure Event Hub
In many event streaming and messaging scenarios, the event or message payload contains structured data. Schema-driven formats such as Apache Avro are often used to serialize or deserialize such structured data. As we know, Confluent is a general platform that we can use Confluent Registry Schema to implement this functionality. Meanwhile, Azure Event Hub supports Azure Schema Registry as well. The target audience for this blog is the developers who were using Confluent Schema Registry and planning to migrate Confluent Schema Registry to Azure Schema Registry in Azure Event hub. This blog won’t introduce common basic Confluent/ Kafka client implementation mostly. If you’d like to integrate KafkaProducer into your event-driven application, you could learn from this tutorial. Instead, this blog points to showing the key steps/differences while migrating Schema Registry from Confluent to Azure Event Hub. Overview: Confluent Schema Registry: https://docs.confluent.io/platform/current/schema-registry/schema_registry_tutorial.html#sr-benefits Apache Kafka producers write data to Kafka topics and Kafka consumers read data from Kafka topics. There is an implicit “contract” that producers write data with a schema that can be read by consumers, even as producers and consumers evolve their schemas. Schema Registry helps ensure that this contract is met with compatibility checks. Azure Schema Registry: https://docs.microsoft.com/en-us/azure/event-hubs/schema-registry-overview An event producer uses a schema to serialize event payload and publish it to an event broker such as Event Hubs. Event consumers read event payload from the broker and de-serialize it using the same schema. So, both producers and consumers can validate the integrity of the data with the same schema. Confluent Schema Registry: 1.Configuration We could refer to the sample configuration file from this link. # Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BOOTSTRAP_SERVERS }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url={{ SR_URL }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }} 2.Parameter definition and Initialize Kafka client It loads the above configuration file automatically and put all of properties into props object. try (InputStream inputStream = new FileInputStream(configFile)) { props.load(inputStream); } props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 3.Authentication: As observed above, Confluent client is using basic auth to connect with target Schema Registry. You could refer to the Step3 from this guidance to generate the API KEY and API Secret. basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }} 4.Produce event for (long i = 0; i < 10; i++) { final String orderId = "id" + Long.toString(i); final Payment payment = new Payment(orderId, 1000.00d); final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment); producer.send(record); Thread.sleep(1000L); } producer.flush(); System.out.printf("Successfully produced 10 messages to a topic called %s%n", TOPIC); Here, the producer supports two basic types of Avro records a specific code-generated class, or a generic record. Using a specific code-generated class requires you to define and compile a Java class for your schema, but it easier to work with in your code. However, in other scenarios where you need to work dynamically with data of any type and do not have Java classes for your record types, use GenericRecord <streams-data-avro>. Migration from Confluent to Azure Event Hub There isn’t direct way or tool to migrate schema registry to azure schema registry from external server. The azure schema registry needs to be recreated into schema group in the Azure Event Hub namespace. Before moving forward, you need to create the Schema Registry in your event hub namespace over azure portal. Generally, we use Kafka-Integrated Apache Avro serializers and deserializers to utilize Registry Schema as well. The good news is we can use this Azure Registry Schema for Kafka SDK to use Registry Schema which is compatible with native Kafka-Integrated client mostly. The most of code which is used to utilize Confluent Registry Schema can be reused. However, please kindly notice following key points and differences. 1.Configuration Please refer to the following configuration which is retrieved from this link in the same repository. # standard EH Kafka configs bootstrap.servers=jaygongeventhub.servicebus.windows.net:9093 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://jaygongeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=***="; topic=jayevent # schema registry specific configs schema.registry.url=https://jaygongeventhub.servicebus.windows.net # used by producer only schema.group=jaysg # used for MSI-enabled VM, ignores client secret credentials if enabled use.managed.identity.credential=false tenant.id=*** client.id=*** client.secret=*** The following Event Hub configurations are required: bootstrap.servers – Event Hub namespace sasl.jaas.config - JAAS configuration for SASL - Event Hub namespace, SAS key name, and SAS key value should be replaced topic - Event Hub topic for produce and consume operations The following Schema Registry configurations are required: schema.registry.url - Event Hub namespace with Schema Registry enabled. schema.group - Schema Registry group with serialization type 'Avro' (must be created before the application runs) use.managed.identity.credential - indicates that MSI credentials should be used, should be used for MSI-enabled VM tenant.id - sets the tenant ID of the application client.id - sets the client ID of the application client.secret - sets the client secret for authentication 2.Parameter definition Please refer to the snippet of sample code from App.java. Properties props = new Properties(); props.load(new FileInputStream("src/main/resources/app.properties")); // EH specific properties String brokerUrl = props.getProperty("bootstrap.servers"); String jaasConfig = props.getProperty("sasl.jaas.config"); String topicName = props.getProperty("topic"); // Schema Registry specific properties String registryUrl = props.getProperty("schema.registry.url"); String schemaGroup = props.getProperty("schema.group"); The value of schema.registry.url does not have to be the same as the accessed event hub namespace which is set in the property bootstrap.servers. That means we can use the azure schema registry from different event hub namespace in another azure subscription. We only need to make sure that we have sufficient permission to access target schema registry via AAD credentials. 3.Authentification As we mentioned above, we need to make sure that we have permission to access schema registry. It’s different with the basic auth way in the Confluent. When accessing the azure schema registry programmatically, we need to register an application in Azure Active Directory (Azure AD) and add the security principal of the application to one of the Azure role-based access control (Azure RBAC) roles: If you want to know the instructions on creating registering an application using the Azure portal, see Register an app with Azure AD. Then you could note down the client ID (application ID), tenant ID, and the secret to use in the above configuration file: tenant.id, client.id and client.secret. We need to create ToeknCredential via Azure AD SDK to acquire access token to connect with target Scheme Registry in Event Hub namespace. Please refer to sample code below. TokenCredential credential; if (props.getProperty("use.managed.identity.credential").equals("true")) { credential = new ManagedIdentityCredentialBuilder().build(); } else { credential = new ClientSecretCredentialBuilder() .tenantId(props.getProperty("tenant.id")) .clientId(props.getProperty("client.id")) .clientSecret(props.getProperty("client.secret")) .build(); } 4.Initialize Kafka client The following sample code indicates that it’s built based on native Kafka client SDK. If your previous code is based on native Kafka client, they should be compatible so that you don’t need to change the properties definition except the following credential section. However, if your previous code is packed in a high level against native Kafka API, you might need to refer to this repository to rebuild the application code. // EH Kafka Configs props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", jaasConfig); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Schema Registry configs props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential); props.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, schemaGroup); KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props); 5.Produce event for (int i = 0; i < 10; i++) { Order order = new Order("ID-" + i, 10.99 + i, "Sample order -" + i); ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order); producer.send(record); logger.info("Sent Order {}", order); } producer.flush(); Azure Schema Registry supports two basic types of Avro records which is similar to Confluent. You could use specific Jave module class above. Or you could use generic record definition via following code. String key = "key1"; String userSchema = "{\"namespace\": \"com.azure.schemaregistry.samples\", \"type\": \"record\", \"name\": " + "\"Order\", \"fields\": [ { \"name\": \"id\", \"type\": \"string\"}, " + "{ \"name\": \"amount\", \"type\": \"double\"}, { \"name\": \"description\", \"type\": \"string\"}]}"; Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(userSchema); After running the code successfully, you could verify the Schema pattern from azure portal. If you encounter 401 unauthorized error, please grant Schema Registry Contributor (or Reader) RBAC role to your login account.5.6KViews3likes1Comment