Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ public long revive(AtomicLong currentTime, int maxCount) {
long nextInvisibleTime = record.getInvisibleTime() + backoffInterval;
PopConsumerRecord retryRecord = new PopConsumerRecord(System.currentTimeMillis(),
record.getGroupId(), record.getTopicId(), record.getQueueId(),
record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId());
record.getRetryFlag(), nextInvisibleTime, record.getOffset(), record.getAttemptId(),
record.isSuspend());
retryRecord.setAttemptTimes(record.getAttemptTimes() + 1);
failureList.add(retryRecord);
log.warn("PopConsumerService revive backoff retry, record={}", retryRecord);
Expand Down Expand Up @@ -768,6 +769,7 @@ public synchronized void transferToFsStore() {
ck.setBrokerName(brokerConfig.getBrokerName());
ck.addDiff(0);
ck.setRePutTimes(String.valueOf(record.getAttemptTimes()));
ck.setSuspend(record.isSuspend());
int reviveQueueId = (int) record.getOffset() % brokerConfig.getReviveQueueNum();
MessageExtBrokerInner ckMsg =
brokerController.getPopMessageProcessor().buildCkMsg(ck, reviveQueueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
newCk.setBrokerName(oldCK.getBrokerName());
newCk.addDiff(0);
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap
newCk.setSuspend(oldCK.isSuspend());
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time
int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -421,6 +422,7 @@ public void reviveBackoffRetryTest() {
record.setGroupId("group");
record.setQueueId(0);
record.setOffset(0);
record.setSuspend(true);
consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record));

Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class), "", false)))
Expand All @@ -436,7 +438,11 @@ public void reviveBackoffRetryTest() {
// should be invisible now
Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(0, visibleTimestamp, 1).size());
// will be visible again in 10 seconds
Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1).size());
List<PopConsumerRecord> retryRecords = consumerService.getPopConsumerStore()
.scanExpiredRecords(visibleTimestamp, System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1);
Assert.assertEquals(1, retryRecords.size());
Assert.assertTrue(retryRecords.get(0).isSuspend());
Assert.assertEquals(1, retryRecords.get(0).getAttemptTimes());

consumerService.shutdown();
}
Expand All @@ -453,11 +459,13 @@ public void transferToFsStoreTest() {
temp.setGroupId("group");
temp.setQueueId(2);
temp.setOffset(i);
temp.setSuspend(true);
return temp;
})
.collect(Collectors.toList());

Mockito.when(brokerController.getPopMessageProcessor().buildCkMsg(any(), anyInt()))
ArgumentCaptor<PopCheckPoint> ckCaptor = ArgumentCaptor.forClass(PopCheckPoint.class);
Mockito.when(brokerController.getPopMessageProcessor().buildCkMsg(ckCaptor.capture(), anyInt()))
.thenReturn(new MessageExtBrokerInner());
Mockito.when(brokerController.getMessageStore()).thenReturn(Mockito.mock(MessageStore.class));
Mockito.when(brokerController.getMessageStore().asyncPutMessage(any()))
Expand All @@ -467,6 +475,10 @@ public void transferToFsStoreTest() {
consumerService.start();
consumerService.getPopConsumerStore().writeRecords(consumerRecordList);
consumerService.transferToFsStore();
Assert.assertEquals(3, ckCaptor.getAllValues().size());
for (PopCheckPoint ck : ckCaptor.getAllValues()) {
Assert.assertTrue(ck.isSuspend());
}
consumerService.shutdown();
}

Expand Down Expand Up @@ -743,4 +755,4 @@ public void testReviveRetryWithSuspendFalseMultipleTimes() {
messageExt.setReconsumeTimes(capturedMessage.getReconsumeTimes());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,14 @@ public void testReviveMsgFromCk_messageFound_writeRetryOK() throws Throwable {
@Test
public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws Throwable {
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setSuspend(true);
PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
reviveObj.map.put("", ck);
reviveObj.endTime = System.currentTimeMillis();
StringBuilder actualRetryTopic = new StringBuilder();
StringBuilder actualReviveTopic = new StringBuilder();
AtomicLong actualInvisibleTime = new AtomicLong(0L);
List<Boolean> actualSuspend = new ArrayList<>();

when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false)));
Expand All @@ -285,6 +287,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws
actualReviveTopic.append(msg.getTopic());
PopCheckPoint rewriteCK = JSON.parseObject(msg.getBody(), PopCheckPoint.class);
actualInvisibleTime.set(rewriteCK.getReviveTime());
actualSuspend.add(rewriteCK.isSuspend());
return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK));
});

Expand All @@ -296,6 +299,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws
Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString());
Assert.assertEquals(REVIVE_TOPIC, actualReviveTopic.toString());
Assert.assertEquals(INVISIBLE_TIME + 10 * 1000L, actualInvisibleTime.get()); // first interval is 10s
Assert.assertEquals(Arrays.asList(true), actualSuspend);
verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry
verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}
Expand Down
Loading