{
"level": "INFO",
"location": {
"class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1",
"file": "OffsetMonitor.java",
"method": "run",
"line": "132"
},
"logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
"message": "TopicList starts updating",
"host": "ureplicator-controller-2-778558bbf8-5twpf",
"tags": [
"ureplicator"
],
"@timestamp": "2021-09-10T08:59:19.493Z",
"thread": "topic-list-cron-0",
"@version": "1"
}
{
"level": "INFO",
"location": {
"class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
"file": "OffsetMonitor.java",
"method": "updateTopicList",
"line": "197"
},
"logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
"message": "Update topicList",
"host": "ureplicator-controller-2-778558bbf8-5twpf",
"tags": [
"ureplicator"
],
"@timestamp": "2021-09-10T08:59:24.493Z",
"thread": "topic-list-cron-0",
"@version": "1"
}
{
"exception": {
"class": "java.nio.channels.ClosedChannelException",
"stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
},
"level": "INFO",
"location": {
"class": "kafka.utils.Logging$class",
"file": "Logging.scala",
"method": "info",
"line": "68"
},
"logger": "kafka.consumer.SimpleConsumer",
"message": "Reconnect due to error:",
"host": "ureplicator-controller-2-778558bbf8-5twpf",
"tags": [
"ureplicator"
],
"@timestamp": "2021-09-10T08:59:24.494Z",
"thread": "topic-list-cron-0",
"@version": "1"
}
{
"exception": {
"class": "java.nio.channels.ClosedChannelException",
"stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
},
"level": "WARN",
"location": {
"class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
"file": "OffsetMonitor.java",
"method": "updateTopicList",
"line": "234"
},
"logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
"message": "Got exception to get metadata from broker=null:-1",
"host": "ureplicator-controller-2-778558bbf8-5twpf",
"tags": [
"ureplicator"
],
"@timestamp": "2021-09-10T08:59:24.499Z",
"thread": "topic-list-cron-0",
"@version": "1"
}
Is it somehow possible to fix this behavior for me on my own, by correcting some configuration, for example, or do I need to wait for a fix of the uReplicator itself and its new version?
I would be glad to hear any advice, thanks.
Hi to all!
@yangy0000 - I found strange OffsetMonitor behavior when I disabled ports without encryption and with anonymous access on Kafka clusters SRC and DST. That is, those ports that worked under the
PLAINTEXTprotocol. And only those ports on Kafka clusters, where the protocol isSASL_SSL, continued to work - that is, with encryption and authorization using theSCRAMmechanism.And after that, the OffsetMonitor began to produce the following errors in logs:
Is it somehow possible to fix this behavior for me on my own, by correcting some configuration, for example, or do I need to wait for a fix of the uReplicator itself and its new version?
I would be glad to hear any advice, thanks.