From ec31424c7c0dcf20c5f07bee908763960eff5bba Mon Sep 17 00:00:00 2001 From: yx9o Date: Wed, 10 Jun 2026 21:52:03 +0800 Subject: [PATCH] [ISSUE #10465] Preserve suspend flag when rewriting POP revive retry records --- .../broker/pop/PopConsumerService.java | 4 +++- .../broker/processor/PopReviveService.java | 1 + .../broker/pop/PopConsumerServiceTest.java | 18 +++++++++++++++--- .../broker/processor/PopReviveServiceTest.java | 4 ++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 9ab5eb651be..c9e48a6db4f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -603,7 +603,8 @@ public long revive(AtomicLong currentTime, int maxCount) { long nextInvisibleTime = record.getInvisibleTime() + backoffInterval; PopConsumerRecord retryRecord = new PopConsumerRecord(System.currentTimeMillis(), record.getGroupId(), record.getTopicId(), record.getQueueId(), - record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId()); + record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId(), + record.isSuspend()); retryRecord.setAttemptTimes(record.getAttemptTimes() + 1); failureList.add(retryRecord); log.warn("PopConsumerService revive backoff retry, record={}", retryRecord); @@ -768,6 +769,7 @@ public synchronized void transferToFsStore() { ck.setBrokerName(brokerConfig.getBrokerName()); ck.addDiff(0); ck.setRePutTimes(String.valueOf(record.getAttemptTimes())); + ck.setSuspend(record.isSuspend()); int reviveQueueId = (int) record.getOffset() % brokerConfig.getReviveQueueNum(); MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(ck, reviveQueueId); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 07f16e98965..1ab90a9d2a3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -625,6 +625,7 @@ private void rePutCK(PopCheckPoint oldCK, Pair pair) { newCk.setBrokerName(oldCK.getBrokerName()); newCk.addDiff(0); newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap + newCk.setSuspend(oldCK.isSuspend()); if (oldCK.getReviveTime() <= System.currentTimeMillis()) { // never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 089d2c1f22b..77f0b77a67a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.pop.PopCheckPoint; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; import org.junit.Assert; @@ -421,6 +422,7 @@ public void reviveBackoffRetryTest() { record.setGroupId("group"); record.setQueueId(0); record.setOffset(0); + record.setSuspend(true); consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record)); Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class), "", false))) @@ -436,7 +438,11 @@ public void reviveBackoffRetryTest() { // should be invisible now Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(0, visibleTimestamp, 1).size()); // will be visible again in 10 seconds - Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1).size()); + List retryRecords = consumerService.getPopConsumerStore() + .scanExpiredRecords(visibleTimestamp, System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1); + Assert.assertEquals(1, retryRecords.size()); + Assert.assertTrue(retryRecords.get(0).isSuspend()); + Assert.assertEquals(1, retryRecords.get(0).getAttemptTimes()); consumerService.shutdown(); } @@ -453,11 +459,13 @@ public void transferToFsStoreTest() { temp.setGroupId("group"); temp.setQueueId(2); temp.setOffset(i); + temp.setSuspend(true); return temp; }) .collect(Collectors.toList()); - Mockito.when(brokerController.getPopMessageProcessor().buildCkMsg(any(), anyInt())) + ArgumentCaptor ckCaptor = ArgumentCaptor.forClass(PopCheckPoint.class); + Mockito.when(brokerController.getPopMessageProcessor().buildCkMsg(ckCaptor.capture(), anyInt())) .thenReturn(new MessageExtBrokerInner()); Mockito.when(brokerController.getMessageStore()).thenReturn(Mockito.mock(MessageStore.class)); Mockito.when(brokerController.getMessageStore().asyncPutMessage(any())) @@ -467,6 +475,10 @@ public void transferToFsStoreTest() { consumerService.start(); consumerService.getPopConsumerStore().writeRecords(consumerRecordList); consumerService.transferToFsStore(); + Assert.assertEquals(3, ckCaptor.getAllValues().size()); + for (PopCheckPoint ck : ckCaptor.getAllValues()) { + Assert.assertTrue(ck.isSuspend()); + } consumerService.shutdown(); } @@ -743,4 +755,4 @@ public void testReviveRetryWithSuspendFalseMultipleTimes() { messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes()); } } -} \ No newline at end of file +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index fa7e9982e1f..4c148d21866 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -266,12 +266,14 @@ public void testReviveMsgFromCk_messageFound_writeRetryOK() throws Throwable { @Test public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws Throwable { PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + ck.setSuspend(true); PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); reviveObj.map.put("", ck); reviveObj.endTime = System.currentTimeMillis(); StringBuilder actualRetryTopic = new StringBuilder(); StringBuilder actualReviveTopic = new StringBuilder(); AtomicLong actualInvisibleTime = new AtomicLong(0L); + List actualSuspend = new ArrayList<>(); when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false))); @@ -285,6 +287,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws actualReviveTopic.append(msg.getTopic()); PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), PopCheckPoint.class); actualInvisibleTime.set(rewriteCK.getReviveTime()); + actualSuspend.add(rewriteCK.isSuspend()); return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)); }); @@ -296,6 +299,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString()); Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString()); Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, actualInvisibleTime.get()); // first interval is 10s + Assert.assertEquals(Arrays.asList(true), actualSuspend); verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK }