From 635c34fa67a5824edf0214fefe202e73689fbd0b Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:24:13 +0800 Subject: [PATCH 1/6] fix: avoid query fail when query timerMsg by realTopic --- .../java/org/apache/rocketmq/client/impl/MQAdminImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 729e917eed4..9dd673676fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -326,11 +326,12 @@ public QueryResult queryMessage(String clusterName, String topic, String key, in public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end, boolean isUniqKey, String indexType, String lastKey) throws MQClientException, InterruptedException { boolean isLmq = MixAll.isLmq(topic); + boolean isSysWheelTimer = topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"); String routeTopic = topic; // if topic is lmq ,then use clusterName as lmq parent topic // Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer - if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) + if (!StringUtils.isEmpty(topic) && (isLmq || isSysWheelTimer) && !StringUtils.isEmpty(clusterName)) { routeTopic = clusterName; } @@ -344,7 +345,7 @@ public QueryResult queryMessage(String clusterName, String topic, String key, in if (topicRouteData != null) { List brokerAddrs = new LinkedList<>(); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - if (!isLmq && clusterName != null && !clusterName.isEmpty() + if (!isLmq && !isSysWheelTimer && clusterName != null && !clusterName.isEmpty() && !clusterName.equals(brokerData.getCluster())) { continue; } From 7ac12969a63a86af307e85f07d954919ad8594c6 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:53:17 +0800 Subject: [PATCH 2/6] add test --- .../rocketmq/client/impl/MQAdminImplTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index f52aba2dc00..cfbae850d33 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -22,10 +22,12 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -51,6 +53,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -79,6 +82,8 @@ public class MQAdminImplTest { private final long defaultTimeout = 3000L; + private final String sysWheelTimerTopic = TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"; + @Before public void init() throws RemotingException, InterruptedException, MQClientException { when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mQClientAPIImpl); @@ -200,6 +205,33 @@ public void assertQueryMessageByUniqKey() throws InterruptedException, MQClientE assertEquals(defaultTopic, queryResult.getMessageList().get(0).getTopic()); } + @Test + public void testQueryMessageBySysWheelTimerTopicShouldUseClusterNameAsRouteTopic() throws Exception { + String realTopic = "realTopic"; + TopicRouteData routeData = createRouteData(); + + when(mQClientFactory.getAnExistTopicRouteData(sysWheelTimerTopic)).thenReturn(null); + when(mQClientFactory.getAnExistTopicRouteData(realTopic)).thenReturn(routeData); + + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + QueryMessageResponseHeader responseHeader = new QueryMessageResponseHeader(); + responseHeader.setIndexLastUpdatePhyoffset(1L); + responseHeader.setIndexLastUpdateTimestamp(System.currentTimeMillis()); + RemotingCommand response = mock(RemotingCommand.class); + when(response.decodeCommandCustomHeader(QueryMessageResponseHeader.class)).thenReturn(responseHeader); + when(response.getBody()).thenReturn(getMessageResult()); + when(response.getCode()).thenReturn(ResponseCode.SUCCESS); + callback.operationSucceed(response); + return null; + }).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(), any(InvokeCallback.class), any()); + String msgId = buildMsgId(); + QueryResult result = mqAdminImpl.queryMessage(realTopic, sysWheelTimerTopic, msgId, 32, + 0L, Long.MAX_VALUE, true, MessageConst.INDEX_UNIQUE_TYPE, null); + assertNotNull(result); + assertNotNull(result.getMessageList()); + } + private String buildMsgId() { MessageExt msgExt = createMessageExt(); int storeHostIPLength = (msgExt.getFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16; From 798cf5690f29f764b15bb689aeb047b108e636f1 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:53:39 +0800 Subject: [PATCH 3/6] add test --- .../java/org/apache/rocketmq/client/impl/MQAdminImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index cfbae850d33..af408f57648 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; From a38d5bd82769076826977fa01b8a04db01c95024 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 12:03:00 +0800 Subject: [PATCH 4/6] add test --- .../java/org/apache/rocketmq/client/impl/MQAdminImplTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index af408f57648..d01a9a4d390 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -207,10 +207,6 @@ public void assertQueryMessageByUniqKey() throws InterruptedException, MQClientE @Test public void testQueryMessageBySysWheelTimerTopicShouldUseClusterNameAsRouteTopic() throws Exception { String realTopic = "realTopic"; - TopicRouteData routeData = createRouteData(); - - when(mQClientFactory.getAnExistTopicRouteData(sysWheelTimerTopic)).thenReturn(null); - when(mQClientFactory.getAnExistTopicRouteData(realTopic)).thenReturn(routeData); doAnswer(invocation -> { InvokeCallback callback = invocation.getArgument(3); From f487d4b60a4595d0cd4e8d1a77e366a858ebe93b Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 14:21:08 +0800 Subject: [PATCH 5/6] trigger ci From 798e3bbdec5e51029daa7a728434b2e5e6adb78c Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 15:02:09 +0800 Subject: [PATCH 6/6] trigger ci