Forum Discussion

LakhwantSingh's avatar
LakhwantSingh
Copper Contributor
Feb 02, 2024

Debezium with Event Hubs as sink

Hi,

 

I am doing a POC on CDC using Debezium.

 

There a few ways how this can be done. One of the ways is to use "Debezium Server" where one can define the source and sink properties.

 

I am trying to use Event Hub as the sink. But unfortunately I am getting the timeout error (logs below). I have checked with the concerned team and they said there are no issues with the Event Hub.

 

Can someone please help with any suggestions as to what could be causing this?

 

2024-02-02 06:01:57,878 INFO [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$248/0x0000000840326840@6a4ccef7' stream name mapper

2024-02-02 06:01:58,075 INFO [com.azu.mes.eve.EventHubClientBuilder] (main) {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_01d5df_1706871718073"}

2024-02-02 06:01:58,123 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (main) {"az.sdk.message":"Setting next AMQP channel.","entityPath":"splittest"}

2024-02-02 06:01:58,124 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (main) {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"splittest"}

2024-02-02 06:01:58,131 INFO [io.deb.ser.eve.EventHubsChangeConsumer] (main) Using default Event Hubs client for namespace 'eventhubns-01.servicebus.windows.net'

2024-02-02 06:01:58,139 INFO [com.azu.cor.amq.imp.ReactorConnection] (main) {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","port":5671}

2024-02-02 06:01:58,162 INFO [com.azu.cor.amq.imp.ReactorExecutor] (main) {"az.sdk.message":"Starting reactor.","connectionId":"MF_01d5df_1706871718073"}

2024-02-02 06:01:58,167 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionInit","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","namespace":"eventhubns-01.servicebus.windows.net"}

2024-02-02 06:01:58,168 INFO [com.azu.cor.amq.imp.han.ReactorHandler] (reactor-executor-1) {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_01d5df_1706871718073"}

2024-02-02 06:01:58,169 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_01d5df_1706871718073","errorCondition":null,"errorDescription":null,"hostName":"eventhubns-01.servicebus.windows.net"}

2024-02-02 06:01:58,250 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionBound","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","peerDetails":"eventhubns-01.servicebus.windows.net:5671"}

2024-02-02 06:02:58,168 WARN [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onTransportError","connectionId":"MF_01d5df_1706871718073","errorCondition":"amqp:connection:framing-error","errorDescription":"org.apache.qpid.proton.engine.TransportException: connection aborted","hostName":"eventhubns-01.servicebus.windows.net"}

2024-02-02 06:02:58,173 INFO [com.azu.cor.amq.imp.ReactorConnection] (parallel-1) {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_01d5df_1706871718073","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Error occurred while connection was starting. Error: com.azure.core.amqp.exception.AmqpException: Connection 'MF_01d5df_1706871718073' not opened within AmqpRetryOptions.tryTimeout(): PT1M, errorContext[NAMESPACE: eventhubns-01.servicebus.windows.net. ERROR CONTEXT: N/A]"}

2024-02-02 06:02:58,174 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Channel is closed. Requesting upstream.","entityPath":"splittest"}

2024-02-02 06:02:58,175 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Connection not requested, yet. Requesting one.","entityPath":"splittest"}

2024-02-02 06:02:58,175 INFO [com.azu.mes.eve.EventHubClientBuilder] (parallel-1) {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_673eca_1706871778175"}

2024-02-02 06:02:58,177 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Setting next AMQP channel.","entityPath":"splittest"}

2024-02-02 06:02:58,177 INFO [com.azu.mes.eve.imp.EventHubConnectionProcessor] (parallel-1) {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"splittest"}

2024-02-02 06:02:58,177 INFO [com.azu.cor.amq.imp.han.ConnectionHandler] (reactor-executor-1) {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_01d5df_1706871718073","hostName":"eventhubns-01.servicebus.windows.net","state":"CLOSED","remoteState":"UNINITIALIZED"}

2024-02-02 06:02:58,191 INFO [com.azu.cor.amq.imp.ReactorConnection] (reactor-executor-1) {"az.sdk.message":"Closing executor.","connectionId":"MF_01d5df_1706871718073"}

2024-02-02 06:02:58,224 ERROR [io.qua.run.Application] (main) Failed to start application (with profile [prod]): java.lang.RuntimeException: Failed to start quarkus

at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)

at io.quarkus.runtime.Application.start(Application.java:101)

at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)

at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)

at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)

at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)

at io.debezium.server.Main.main(Main.java:15)

Caused by: com.azure.core.amqp.exception.AmqpException: Connection 'MF_01d5df_1706871718073' not opened within AmqpRetryOptions.tryTimeout(): PT1M, errorContext[NAMESPACE: eventhubns-01.servicebus.windows.net. ERROR CONTEXT: N/A]

at com.azure.core.amqp.implementation.ReactorConnection.lambda$new$1(ReactorConnection.java:135)

at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)

at reactor.core.publisher.Mono.subscribe(Mono.java:4400)

at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:301)

at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)

at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)

at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)

at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)

at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)

at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)

at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

 

 

 

  • priyamjm's avatar
    priyamjm
    Copper Contributor
    You need a valid application.properties file.

    Debezium Server Docker's default is "/debezium/conf/application.properties" path.

    debezium.sink.type=eventhubs
    debezium.sink.eventhubs.connectionstring=??
    debezium.sink.eventhubs.hubname=??
    debezium.sink.eventhubs.maxbatchsize=800000
    . . .

Resources