From 74253fc9dfe9b560963273c1ac6c211165a4bf63 Mon Sep 17 00:00:00 2001 From: 014-code <2402143478@qq.com> Date: Thu, 11 Jun 2026 16:08:31 +0800 Subject: [PATCH 1/7] [ISSUE #9796] Support getConsumerRunningInfo in proxy cluster mode Root cause: The remoting proxy did not register RequestCode.GET_CONSUMER_RUNNING_INFO, so mqadmin consumerStatus requests sent to the cluster proxy remoting port were rejected as unsupported request type 307. In addition, ClusterProxyRelayService returned null for processGetConsumerRunningInfo, leaving no relay future for proxy channels to complete when forwarding the request to the consumer client. Solution: Register GET_CONSUMER_RUNNING_INFO in RemotingProtocolServer and handle it in ConsumerManagerActivity. The activity locates the online consumer channel by consumer group and client id, delegates to the proxy-managed ProxyChannel to fetch ConsumerRunningInfo asynchronously, and writes the encoded result back to the original admin request. Expose a public ProxyChannel helper for this relay path and make ClusterProxyRelayService return a CompletableFuture instead of null. Tests: Added ConsumerManagerActivityTest to cover successful running-info relay and consumer-not-online response. Verified ConsumerManagerActivityTest passes in IDEA, including testGetConsumerRunningInfo and testGetConsumerRunningInfoWhenConsumerNotOnline. --- .../remoting/RemotingProtocolServer.java | 1 + .../activity/ConsumerManagerActivity.java | 50 ++++- .../relay/ClusterProxyRelayService.java | 2 +- .../proxy/service/relay/ProxyChannel.java | 12 ++ .../activity/ConsumerManagerActivityTest.java | 172 ++++++++++++++++++ 5 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index c26f6bc2ef4..e67e8162ebb 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -230,6 +230,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor); remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST, consumerManagerActivity, this.updateOffsetExecutor); + remotingServer.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index ce1f1b4a514..0b769ad860b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -25,24 +25,30 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ExceptionUtils; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.service.relay.ProxyChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader; -import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.proxy.processor.MessagingProcessor; -import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; public class ConsumerManagerActivity extends AbstractRemotingActivity { public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) { @@ -73,6 +79,9 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom case RequestCode.GET_CONSUMER_CONNECTION_LIST: { return getConsumerConnectionList(ctx, request, context); } + case RequestCode.GET_CONSUMER_RUNNING_INFO: { + return getConsumerRunningInfo(ctx, request, context); + } default: break; } @@ -129,6 +138,39 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R return response; } + protected RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + GetConsumerRunningInfoRequestHeader header = + (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup()); + ClientChannelInfo clientChannelInfo = null; + if (consumerGroupInfo != null) { + clientChannelInfo = consumerGroupInfo.findChannel(header.getClientId()); + } + if (clientChannelInfo == null || !(clientChannelInfo.getChannel() instanceof ProxyChannel)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer <%s> <%s> not online", header.getConsumerGroup(), header.getClientId())); + return response; + } + + CompletableFuture> relayFuture = + ((ProxyChannel) clientChannelInfo.getChannel()).processGetConsumerRunningInfo(request, header); + relayFuture.thenAccept(result -> { + RemotingCommand relayResponse = RemotingCommand.createResponseCommand(null); + relayResponse.setCode(result.getCode()); + relayResponse.setRemark(result.getRemark()); + if (result.getCode() == ResponseCode.SUCCESS && result.getResult() != null) { + relayResponse.setBody(result.getResult().encode()); + } + writeResponse(ctx, context, request, relayResponse); + }).exceptionally(t -> { + writeErrResponse(ctx, context, request, ExceptionUtils.getRealException(t)); + return null; + }); + return null; + } + protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java index 71ce222a8c0..d31bae4e5dd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java @@ -38,7 +38,7 @@ public ClusterProxyRelayService(TransactionService transactionService) { public CompletableFuture> processGetConsumerRunningInfo( ProxyContext context, RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { - return null; + return new CompletableFuture<>(); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java index 72fdfd0259a..605c710dfb8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java @@ -145,6 +145,18 @@ protected abstract CompletableFuture processGetConsumerRunningInfo( GetConsumerRunningInfoRequestHeader header, CompletableFuture> responseFuture); + public CompletableFuture> processGetConsumerRunningInfo( + RemotingCommand command, + GetConsumerRunningInfoRequestHeader header) { + CompletableFuture> responseFuture = new CompletableFuture<>(); + this.processGetConsumerRunningInfo(command, header, responseFuture) + .exceptionally(t -> { + responseFuture.completeExceptionally(t); + return null; + }); + return responseFuture; + } + protected abstract CompletableFuture processConsumeMessageDirectly( RemotingCommand command, ConsumeMessageDirectlyResultRequestHeader header, diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java new file mode 100644 index 00000000000..92322708d19 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.proxy.config.InitConfigTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.proxy.service.relay.ProxyChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.proxy.service.transaction.TransactionData; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.NotifyUnsubscribeLiteRequestHeader; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerManagerActivityTest extends InitConfigTest { + private static final String GROUP = "group"; + private static final String CLIENT_ID = "clientId"; + + private ConsumerManagerActivity consumerManagerActivity; + + @Mock + private MessagingProcessor messagingProcessor; + @Mock + private ConsumerGroupInfo consumerGroupInfo; + @Mock + private ProxyRelayService consumerProxyRelayService; + private CompletableFuture> consumerRunningInfoFuture; + private ProxyChannel consumerChannel; + @Spy + private ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void setUp() { + this.consumerManagerActivity = new ConsumerManagerActivity(null, messagingProcessor); + this.consumerRunningInfoFuture = new CompletableFuture<>(); + this.consumerChannel = new ProxyChannel(consumerProxyRelayService, null, "127.0.0.1:1", "127.0.0.1:2") { + @Override + public boolean isOpen() { + return true; + } + + @Override + public boolean isActive() { + return true; + } + + @Override + protected CompletableFuture processOtherMessage(Object msg) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processCheckTransaction(CheckTransactionStateRequestHeader header, + MessageExt messageExt, TransactionData transactionData, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, + GetConsumerRunningInfoRequestHeader header, + CompletableFuture> responseFuture) { + consumerRunningInfoFuture.thenAccept(responseFuture::complete); + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processConsumeMessageDirectly(RemotingCommand command, + ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + }; + } + + @Test + public void testGetConsumerRunningInfo() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(clientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + ConsumerRunningInfo runningInfo = new ConsumerRunningInfo(); + runningInfo.setJstack("jstack"); + consumerRunningInfoFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "ok", runningInfo)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(captor.getValue().getRemark()).isEqualTo("ok"); + assertThat(captor.getValue().getBody()).isEqualTo(runningInfo.encode()); + } + + @Test + public void testGetConsumerRunningInfoWhenConsumerNotOnline() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(null); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains(GROUP, CLIENT_ID, "not online"); + } +} From aec402a4ba8830a1c1f44031bf7c1478def74dd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E6=A1=89?= <161032298+014-code@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:20:00 +0800 Subject: [PATCH 2/7] Update ConsumerManagerActivityTest.java --- .../activity/ConsumerManagerActivityTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java index 92322708d19..9f99ae9a477 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java @@ -61,9 +61,9 @@ public class ConsumerManagerActivityTest extends InitConfigTest { private static final String GROUP = "group"; private static final String CLIENT_ID = "clientId"; - + private ConsumerManagerActivity consumerManagerActivity; - + @Mock private MessagingProcessor messagingProcessor; @Mock @@ -79,7 +79,7 @@ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return null; } }; - + @Before public void setUp() { this.consumerManagerActivity = new ConsumerManagerActivity(null, messagingProcessor); @@ -89,29 +89,29 @@ public void setUp() { public boolean isOpen() { return true; } - + @Override public boolean isActive() { return true; } - + @Override protected CompletableFuture processOtherMessage(Object msg) { return CompletableFuture.completedFuture(null); } - + @Override protected CompletableFuture processCheckTransaction(CheckTransactionStateRequestHeader header, MessageExt messageExt, TransactionData transactionData, CompletableFuture> responseFuture) { return CompletableFuture.completedFuture(null); } - + @Override protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header) { return CompletableFuture.completedFuture(null); } - + @Override protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, GetConsumerRunningInfoRequestHeader header, @@ -119,7 +119,7 @@ protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand consumerRunningInfoFuture.thenAccept(responseFuture::complete); return CompletableFuture.completedFuture(null); } - + @Override protected CompletableFuture processConsumeMessageDirectly(RemotingCommand command, ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, @@ -128,7 +128,7 @@ protected CompletableFuture processConsumeMessageDirectly(RemotingCommand } }; } - + @Test public void testGetConsumerRunningInfo() throws Exception { GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); @@ -154,7 +154,7 @@ public void testGetConsumerRunningInfo() throws Exception { assertThat(captor.getValue().getRemark()).isEqualTo("ok"); assertThat(captor.getValue().getBody()).isEqualTo(runningInfo.encode()); } - + @Test public void testGetConsumerRunningInfoWhenConsumerNotOnline() throws Exception { GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); From db006232cdb7229e4344d0b7d3352cdf9941119a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E6=A1=89?= <161032298+014-code@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:20:42 +0800 Subject: [PATCH 3/7] Update ConsumerManagerActivity.java --- .../remoting/activity/ConsumerManagerActivity.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index 0b769ad860b..ec2afd7b619 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -54,7 +54,7 @@ public class ConsumerManagerActivity extends AbstractRemotingActivity { public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) { super(requestPipeline, messagingProcessor); } - + @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { @@ -87,7 +87,7 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom } return null; } - + protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); @@ -100,7 +100,7 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo response.setCode(ResponseCode.SUCCESS); return response; } - + protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -137,7 +137,7 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R response.setRemark("the consumer group[" + header.getConsumerGroup() + "] not online"); return response; } - + protected RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -170,7 +170,7 @@ protected RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, Remo }); return null; } - + protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -191,7 +191,7 @@ protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand }); return null; } - + protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(null); From c9252410a921cba2cbb8488dee32a58bf8f312c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E6=A1=89?= <161032298+014-code@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:21:43 +0800 Subject: [PATCH 4/7] Update ProxyChannel.java --- .../proxy/service/relay/ProxyChannel.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java index 605c710dfb8..75df0570f89 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java @@ -52,9 +52,9 @@ public abstract class ProxyChannel extends SimpleChannel { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final SocketAddress remoteSocketAddress; protected final SocketAddress localSocketAddress; - + protected final ProxyRelayService proxyRelayService; - + protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, String remoteAddress, String localAddress) { super(parent, remoteAddress, localAddress); @@ -62,7 +62,7 @@ protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, Stri this.remoteSocketAddress = NetworkUtil.string2SocketAddress(remoteAddress); this.localSocketAddress = NetworkUtil.string2SocketAddress(localAddress); } - + protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, ChannelId id, String remoteAddress, String localAddress) { super(parent, id, remoteAddress, localAddress); @@ -70,7 +70,7 @@ protected ProxyChannel(ProxyRelayService proxyRelayService, Channel parent, Chan this.remoteSocketAddress = NetworkUtil.string2SocketAddress(remoteAddress); this.localSocketAddress = NetworkUtil.string2SocketAddress(localAddress); } - + @Override public ChannelFuture writeAndFlush(Object msg) { CompletableFuture processFuture = new CompletableFuture<>(); @@ -129,22 +129,22 @@ public ChannelFuture writeAndFlush(Object msg) { }); return promise; } - + protected abstract CompletableFuture processOtherMessage(Object msg); - + protected abstract CompletableFuture processCheckTransaction( CheckTransactionStateRequestHeader header, MessageExt messageExt, TransactionData transactionData, CompletableFuture> responseFuture); - + protected abstract CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header); - + protected abstract CompletableFuture processGetConsumerRunningInfo( RemotingCommand command, GetConsumerRunningInfoRequestHeader header, CompletableFuture> responseFuture); - + public CompletableFuture> processGetConsumerRunningInfo( RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { @@ -156,63 +156,63 @@ public CompletableFuture> processGetConsum }); return responseFuture; } - + protected abstract CompletableFuture processConsumeMessageDirectly( RemotingCommand command, ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, CompletableFuture> responseFuture); - + @Override public ChannelConfig config() { return null; } - + @Override public ChannelMetadata metadata() { return null; } - + @Override protected AbstractUnsafe newUnsafe() { return null; } - + @Override protected boolean isCompatible(EventLoop loop) { return false; } - + @Override protected void doBind(SocketAddress localAddress) throws Exception { } - + @Override protected void doDisconnect() throws Exception { } - + @Override protected void doClose() throws Exception { } - + @Override protected void doBeginRead() throws Exception { } - + @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { } - + @Override protected SocketAddress localAddress0() { return this.localSocketAddress; } - + @Override protected SocketAddress remoteAddress0() { return this.remoteSocketAddress; From 572239f3e2302f0d1d2418a211e5ab03d57e7966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E6=A1=89?= <161032298+014-code@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:30:38 +0800 Subject: [PATCH 5/7] Update RemotingProtocolServer.java --- .../remoting/RemotingProtocolServer.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index e67e8162ebb..e224d647f0b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -66,7 +66,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - + protected final MessagingProcessor messagingProcessor; protected final RemotingChannelManager remotingChannelManager; protected final ChannelEventListener clientHousekeepingService; @@ -90,12 +90,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu protected final ScheduledExecutorService timerExecutor; protected final TlsCertificateManager tlsCertificateManager; protected final RemotingTlsReloadHandler tlsReloadHandler; - - + + public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertificateManager tlsCertificateManager) throws Exception { this.messagingProcessor = messagingProcessor; this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService()); - + RequestPipeline pipeline = createRequestPipeline(messagingProcessor); this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor); this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); @@ -197,7 +197,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific this.registerRemotingServer(this.defaultRemotingServer); } - + protected class RemotingTlsReloadHandler implements TlsCertificateManager.TlsContextReloadListener { @Override public void onTlsContextReload() { @@ -207,7 +207,7 @@ public void onTlsContextReload() { } } } - + protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor); remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor); @@ -241,7 +241,7 @@ protected void registerRemotingServer(RemotingServer remotingServer) { remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor); } - + @Override public void shutdown() throws Exception { // Unregister the TLS context reload handler @@ -256,7 +256,7 @@ public void shutdown() throws Exception { this.topicRouteExecutor.shutdown(); this.defaultExecutor.shutdown(); } - + @Override public void start() throws Exception { // Register the TLS context reload handler @@ -265,7 +265,7 @@ public void start() throws Exception { this.remotingChannelManager.start(); this.defaultRemotingServer.start(); } - + @Override public CompletableFuture invokeToClient(Channel channel, RemotingCommand request, long timeoutMillis) { @@ -292,7 +292,7 @@ public void operationFail(Throwable throwable) { } return future; } - + protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProcessor) { RequestPipeline pipeline = (ctx, request, context) -> { }; @@ -305,7 +305,7 @@ protected RequestPipeline createRequestPipeline(MessagingProcessor messagingProc } return pipeline.pipe(new ContextInitPipeline()); } - + protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { private final long maxWaitTimeMillsInQueue; @@ -329,7 +329,7 @@ public boolean needPrintJstack(ThreadPoolExecutor executor, double value) { return value > maxWaitTimeMillsInQueue; } } - + protected long headSlowTimeMills(BlockingQueue q) { try { long slowTimeMills = 0; @@ -349,7 +349,7 @@ protected long headSlowTimeMills(BlockingQueue q) { } return -1; } - + protected void cleanExpireRequest() { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -360,7 +360,7 @@ protected void cleanExpireRequest() { cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue()); cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue()); } - + protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) { while (true) { try { @@ -392,7 +392,7 @@ protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, } } } - + private RequestTask castRunnable(final Runnable runnable) { try { if (runnable instanceof FutureTaskExt) { From e6cd8d0c5700a75939846ec8fad7d3b75a65fe28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E6=A1=89?= <161032298+014-code@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:31:06 +0800 Subject: [PATCH 6/7] Update ClusterProxyRelayService.java --- .../proxy/service/relay/ClusterProxyRelayService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java index d31bae4e5dd..4e1d2bc272d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayService.java @@ -29,18 +29,18 @@ * not implement yet */ public class ClusterProxyRelayService extends AbstractProxyRelayService { - + public ClusterProxyRelayService(TransactionService transactionService) { super(transactionService); } - + @Override public CompletableFuture> processGetConsumerRunningInfo( ProxyContext context, RemotingCommand command, GetConsumerRunningInfoRequestHeader header) { return new CompletableFuture<>(); } - + @Override public CompletableFuture> processConsumeMessageDirectly( ProxyContext context, RemotingCommand command, From 5445b8cd8c2678dd4f32c6b806d209cfcc4aeccb Mon Sep 17 00:00:00 2001 From: 014-code <2402143478@qq.com> Date: Mon, 15 Jun 2026 18:52:48 +0800 Subject: [PATCH 7/7] [ISSUE #9796] Add proxy coverage and stabilize SqlFilterIT - add proxy tests for getConsumerRunningInfo cluster relay flows - cover relay errors, exceptional completion and non-proxy channels - verify remoting processor registration for GET_CONSUMER_RUNNING_INFO - stabilize SqlFilterIT by waiting for broker version metadata before SQL92 pull --- .../remoting/RemotingProtocolServerTest.java | 73 ++++++++++++++++++ .../activity/ConsumerManagerActivityTest.java | 74 ++++++++++++++++++- .../relay/ClusterProxyRelayServiceTest.java | 43 +++++++++++ .../proxy/service/relay/ProxyChannelTest.java | 56 ++++++++++++++ .../client/consumer/filter/SqlFilterIT.java | 18 +++++ 5 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java new file mode 100644 index 00000000000..19b1f880b1a --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import org.apache.rocketmq.proxy.config.InitConfigTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RemotingProtocolServerTest extends InitConfigTest { + @Mock + private MessagingProcessor messagingProcessor; + @Mock + private TlsCertificateManager tlsCertificateManager; + @Mock + private ProxyRelayService proxyRelayService; + private RemotingProtocolServer remotingProtocolServer; + + @Before + public void setUp() throws Exception { + when(messagingProcessor.getProxyRelayService()).thenReturn(proxyRelayService); + remotingProtocolServer = new RemotingProtocolServer(messagingProcessor, tlsCertificateManager); + } + + @After + public void tearDown() throws Exception { + if (remotingProtocolServer != null) { + remotingProtocolServer.shutdown(); + } + } + + @Test + public void testRegisterGetConsumerRunningInfoProcessor() { + RemotingServer remotingServer = mock(RemotingServer.class); + + remotingProtocolServer.registerRemotingServer(remotingServer); + + verify(remotingServer).registerProcessor( + eq(RequestCode.GET_CONSUMER_RUNNING_INFO), + same(remotingProtocolServer.consumerManagerActivity), + same(remotingProtocolServer.defaultExecutor)); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java index 92322708d19..58261bf6482 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivityTest.java @@ -70,6 +70,8 @@ public class ConsumerManagerActivityTest extends InitConfigTest { private ConsumerGroupInfo consumerGroupInfo; @Mock private ProxyRelayService consumerProxyRelayService; + @Mock + private ClientChannelInfo clientChannelInfo; private CompletableFuture> consumerRunningInfoFuture; private ProxyChannel consumerChannel; @Spy @@ -116,7 +118,13 @@ protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribe protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, GetConsumerRunningInfoRequestHeader header, CompletableFuture> responseFuture) { - consumerRunningInfoFuture.thenAccept(responseFuture::complete); + consumerRunningInfoFuture.whenComplete((result, t) -> { + if (t != null) { + responseFuture.completeExceptionally(t); + } else { + responseFuture.complete(result); + } + }); return CompletableFuture.completedFuture(null); } @@ -155,6 +163,53 @@ public void testGetConsumerRunningInfo() throws Exception { assertThat(captor.getValue().getBody()).isEqualTo(runningInfo.encode()); } + @Test + public void testGetConsumerRunningInfoWhenRelayReturnsError() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo proxyClientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(proxyClientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + consumerRunningInfoFuture.complete(new ProxyRelayResult<>(ResponseCode.SYSTEM_ERROR, "failed", null)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(captor.getValue().getRemark()).isEqualTo("failed"); + assertThat(captor.getValue().getBody()).isNull(); + } + + @Test + public void testGetConsumerRunningInfoWhenRelayThrows() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + ClientChannelInfo proxyClientChannelInfo = new ClientChannelInfo(consumerChannel, CLIENT_ID, LanguageCode.JAVA, 0); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(proxyClientChannelInfo); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response).isNull(); + + consumerRunningInfoFuture.completeExceptionally(new RuntimeException("failed")); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RemotingCommand.class); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(captor.getValue().getRemark()).isEqualTo("failed"); + } + @Test public void testGetConsumerRunningInfoWhenConsumerNotOnline() throws Exception { GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); @@ -169,4 +224,21 @@ public void testGetConsumerRunningInfoWhenConsumerNotOnline() throws Exception { assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); assertThat(response.getRemark()).contains(GROUP, CLIENT_ID, "not online"); } + + @Test + public void testGetConsumerRunningInfoWhenConsumerChannelIsNotProxyChannel() throws Exception { + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + header.setConsumerGroup(GROUP); + header.setClientId(CLIENT_ID); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + request.makeCustomHeaderToNet(); + + when(messagingProcessor.getConsumerGroupInfo(any(), eq(GROUP))).thenReturn(consumerGroupInfo); + when(consumerGroupInfo.findChannel(eq(CLIENT_ID))).thenReturn(clientChannelInfo); + when(clientChannelInfo.getChannel()).thenReturn(new SimpleChannel(null, "1", "2")); + + RemotingCommand response = consumerManagerActivity.processRequest0(ctx, request, null); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).contains(GROUP, CLIENT_ID, "not online"); + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java new file mode 100644 index 00000000000..aaff42fe66b --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ClusterProxyRelayServiceTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.relay; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ClusterProxyRelayServiceTest { + @Test + public void testProcessGetConsumerRunningInfoReturnsPendingFuture() { + ClusterProxyRelayService clusterProxyRelayService = new ClusterProxyRelayService(null); + GetConsumerRunningInfoRequestHeader header = new GetConsumerRunningInfoRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, header); + + CompletableFuture> future = + clusterProxyRelayService.processGetConsumerRunningInfo(ProxyContext.create(), request, header); + + assertThat(future).isNotNull(); + assertThat(future).isNotDone(); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java index 03be5cdb018..00ddb45fe6e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/relay/ProxyChannelTest.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.NotImplementedException; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; @@ -43,7 +44,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -160,4 +163,57 @@ protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribe assertTrue(channel.writeAndFlush(consumerRunningInfoRequest).isSuccess()); assertTrue(channel.writeAndFlush(consumeMessageDirectlyResult).isSuccess()); } + + @Test + public void testProcessGetConsumerRunningInfoWhenProcessFails() { + RuntimeException exception = new RuntimeException("failed"); + MockProxyChannel channel = new MockProxyChannel(this.proxyRelayService, null, "127.0.0.2:8888", "127.0.0.1:10911") { + @Override + protected CompletableFuture processOtherMessage(Object msg) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processCheckTransaction(CheckTransactionStateRequestHeader header, + MessageExt messageExt, TransactionData transactionData, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processGetConsumerRunningInfo(RemotingCommand command, + GetConsumerRunningInfoRequestHeader header, + CompletableFuture> responseFuture) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(exception); + return future; + } + + @Override + protected CompletableFuture processConsumeMessageDirectly(RemotingCommand command, + ConsumeMessageDirectlyResultRequestHeader header, MessageExt messageExt, + CompletableFuture> responseFuture) { + return CompletableFuture.completedFuture(null); + } + + @Override + protected CompletableFuture processNotifyUnsubscribeLite(NotifyUnsubscribeLiteRequestHeader header) { + return CompletableFuture.completedFuture(null); + } + }; + + CompletableFuture> future = + channel.processGetConsumerRunningInfo(mock(RemotingCommand.class), new GetConsumerRunningInfoRequestHeader()); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("Expected the future to complete exceptionally"); + } catch (ExecutionException e) { + assertSame(exception, e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Interrupted while waiting for the future"); + } + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 88afbeef2a0..a1025ea346b 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.test.client.consumer.filter; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +26,8 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -41,6 +44,7 @@ import org.junit.Test; import static com.google.common.truth.Truth.assertThat; +import static org.awaitility.Awaitility.await; public class SqlFilterIT extends BaseConf { private static Logger logger = LoggerFactory.getLogger(SqlFilterIT.class); @@ -98,6 +102,20 @@ public void testFilterPullConsumer() throws Exception { List receivedMessage = new ArrayList<>(2); Set mqs = consumer.fetchSubscribeMessageQueues(topic); + MQClientInstance mqClientInstance = consumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getmQClientFactory(); + await().atMost(Duration.ofSeconds(30)).until(() -> { + mqClientInstance.updateTopicRouteInfoFromNameServer(topic); + mqClientInstance.sendHeartbeatToAllBrokerWithLock(); + for (MessageQueue mq : mqs) { + FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe( + mqClientInstance.getBrokerNameFromMessageQueue(mq), 0, false); + if (brokerResult == null || brokerResult.getBrokerVersion() == 0) { + return false; + } + } + return true; + }); for (MessageQueue mq : mqs) { SINGLE_MQ: while (true) {