Forum Discussion
LakhwantSingh
Feb 02, 2024Copper Contributor
Debezium with Event Hubs as sink
Hi,
I am doing a POC on CDC using Debezium.
There a few ways how this can be done. One of the ways is to use "Debezium Server" where one can define the source and sink properties.
I am trying to use Event Hub as the sink. But unfortunately I am getting the timeout error (logs below). I have checked with the concerned team and they said there are no issues with the Event Hub.
Can someone please help with any suggestions as to what could be causing this?
2024-02-02 06:01:57,878 INFO [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$248/0x0000000840326840@6a4ccef7' stream name mapper
2024-02-02 06:01:58,075 INFO [com.azu.mes.eve.EventHubClientBuilder] (main) {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_01d5df_1706871718073"}
2024-02-02 06:01:58,123 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (main) {"az.sdk.message":"Setting next AMQP channel.","entityPath":"splittest"}
2024-02-02 06:01:58,124 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (main) {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"splittest"}
2024-02-02 06:01:58,131 INFO [io.deb.ser.eve.EventHubsChangeConsumer] (main) Using default Event Hubs client for namespace 'eventhubns-01.servicebus.windows.net'
2024-02-02 06:01:58,139 INFO [com.azu.cor.amq.imp.ReactorConnection] (main) {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","port":5671}
2024-02-02 06:01:58,162 INFO [com.azu.cor.amq.imp.ReactorExecutor] (main) {"az.sdk.message":"Starting reactor.","connectionId":"MF_01d5df_1706871718073"}
2024-02-02 06:01:58,167 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionInit","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","namespace":"eventhubns-01.servicebus.windows.net"}
2024-02-02 06:01:58,168 INFO [com.azu.cor.amq.imp.han.ReactorHandler] (reactor-executor-1) {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_01d5df_1706871718073"}
2024-02-02 06:01:58,169 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_01d5df_1706871718073","errorCondition":null,"errorDescription":null,"hostName":"eventhubns-01.servicebus.windows.net"}
2024-02-02 06:01:58,250 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionBound","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","peerDetails":"eventhubns-01.servicebus.windows.net:5671"}
2024-02-02 06:02:58,168 WARN [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onTransportError","connectionId":"MF_01d5df_1706871718073","errorCondition":"amqp:connection:framing-error","errorDescription":"org.apache.qpid.proton.engine.TransportException: connection aborted","hostName":"eventhubns-01.servicebus.windows.net"}
2024-02-02 06:02:58,173 INFO [com.azu.cor.amq.imp.ReactorConnection] (parallel-1) {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_01d5df_1706871718073","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Error occurred while connection was starting. Error: com.azure.core.amqp.exception.AmqpException: Connection 'MF_01d5df_1706871718073' not opened within AmqpRetryOptions.tryTimeout(): PT1M, errorContext[NAMESPACE: eventhubns-01.servicebus.windows.net. ERROR CONTEXT: N/A]"}
2024-02-02 06:02:58,174 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Channel is closed. Requesting upstream.","entityPath":"splittest"}
2024-02-02 06:02:58,175 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Connection not requested, yet. Requesting one.","entityPath":"splittest"}
2024-02-02 06:02:58,175 INFO [com.azu.mes.eve.EventHubClientBuilder] (parallel-1) {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_673eca_1706871778175"}
2024-02-02 06:02:58,177 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Setting next AMQP channel.","entityPath":"splittest"}
2024-02-02 06:02:58,177 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"splittest"}
2024-02-02 06:02:58,177 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","state":"CLOSED","remoteState":"UNINITIALIZED"}
2024-02-02 06:02:58,191 INFO [com.azu.cor.amq.imp.ReactorConnection] (reactor-executor-1) {"az.sdk.message":"Closing executor.","connectionId":"MF_01d5df_1706871718073"}
2024-02-02 06:02:58,224 ERROR [io.qua.run.Application] (main) Failed to start application (with profile [prod]): java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.debezium.server.Main.main(Main.java:15)
Caused by: com.azure.core.amqp.exception.AmqpException: Connection 'MF_01d5df_1706871718073' not opened within AmqpRetryOptions.tryTimeout(): PT1M, errorContext[NAMESPACE: eventhubns-01.servicebus.windows.net. ERROR CONTEXT: N/A]
at com.azure.core.amqp.implementation.ReactorConnection.lambda$new$1(ReactorConnection.java:135)
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:301)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
- priyamjmCopper ContributorYou need a valid application.properties file.
Debezium Server Docker's default is "/debezium/conf/application.properties" path.
debezium.sink.type=eventhubs
debezium.sink.eventhubs.connectionstring=??
debezium.sink.eventhubs.hubname=??
debezium.sink.eventhubs.maxbatchsize=800000
. . .