Before Creating the Enhancement Request
Summary
Add BatchChangeInvisibleTime support in the Proxy and Broker, following the existing BatchAck pattern. Currently, when the Proxy needs to change invisible time for multiple messages (client disconnect, write-back failure, periodic renewal, message filtering), it sends individual CHANGE_MESSAGE_INVISIBLETIME requests to the Broker one by one. This causes excessive network round-trips and, on the PopConsumerKVService (RocksDB) path, excessive RocksDB flushes (2N db.write() calls for N messages).
Motivation
Multiple scenarios in the Proxy require batch changeInvisibleTime:
-
Client disconnect (DefaultReceiptHandleManager.returnHandleGroup): When a client goes offline, the Proxy scans all unacked receipt handles for that client and calls changeInvisibleTime for each one individually. A consumer with many in-flight messages triggers N separate network round-trips.
-
ReceiveMessage write-back failure (ReceiveMessageResponseStreamWriter): When writing messages back to the gRPC client fails mid-stream, the Proxy nacks all remaining messages one by one.
-
Periodic renewal (DefaultReceiptHandleManager.scheduleRenewTask): The Proxy periodically renews receipt handles that are about to expire. Each handle is renewed individually via a separate changeInvisibleTime call.
-
Pop message filtering (ConsumerProcessor.filterPopResult): Messages filtered as TO_RETURN are changed invisible time individually.
On the Broker side with PopConsumerKVServiceEnable=true, each individual changeInvisibleTime call in PopConsumerService.changeInvisibilityDuration() does:
popConsumerStore.writeRecords(Collections.singletonList(ckRecord)) — 1 RocksDB WriteBatch with 1 entry
popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)) — 1 RocksDB WriteBatch with 1 entry
For N messages, this results in 2N db.write() calls. Since PopConsumerRocksdbStore uses sync=true on WriteOptions, each write forces a WAL flush, making this a significant bottleneck.
Describe the Solution You'd Like
Add a new BATCH_CHANGE_MESSAGE_INVISIBLETIME request code, following the existing BatchAck (BATCH_ACK_MESSAGE) pattern:
Request format — Each entry carries the same parameters as the single ChangeInvisibleTimeRequestHeader:
consumerGroup, topic, queueId, extraInfo, offset, invisibleTime, liteTopic, suspend
Response format — Per-entry results with new receipt handle info (same as ChangeInvisibleTimeResponseHeader):
code (SUCCESS or error), popTime, invisibleTime, reviveQid
Layer-by-layer changes:
-
Remoting: Add RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, BatchChangeInvisibleTimeRequestBody (with List<ChangeInvisibleTimeRequestEntry>), BatchChangeInvisibleTimeResponseBody (with per-entry results)
-
Broker — True batch processing:
PopConsumerService.batchChangeInvisibilityDuration(): Collect all CK records and ack records, then call writeRecords(allCkRecords) once and deleteRecords(allAckRecords) once. This reduces N messages from 2N db.write() to just 2 db.write() calls.
ChangeInvisibleTimeProcessor: Handle BATCH_CHANGE_MESSAGE_INVISIBLETIME — on KV path, delegate to the batch method above; on traditional revive path, iterate and call appendCheckPointThenAckOrigin() per entry.
BrokerController: Register the new request code to changeInvisibleTimeProcessor.
-
Client: Add batchChangeInvisibleTimeAsync() to MQClientAPIImpl and MQClientAPIExt.
-
Proxy Service: Add batchChangeInvisibleTime() to MessageService, ClusterMessageService, and LocalMessageService.
-
Proxy Processor: Add batchChangeInvisibleTime() to ConsumerProcessor (group by broker, filter expired handles), MessagingProcessor interface, and DefaultMessagingProcessor.
-
Caller integration:
DefaultReceiptHandleManager.returnHandleGroup(): Collect all handles, batch send
DefaultReceiptHandleManager.scheduleRenewTask(): Collect handles needing renewal per group key, batch send, update each handle with per-entry result
ReceiveMessageResponseStreamWriter: Collect all nack messages, batch send
ConsumerProcessor.filterPopResult(): Collect all TO_RETURN messages, batch send
-
Config: Add enableBatchChangeInvisibleTime flag (default false) for gradual rollout.
Describe Alternatives You've Considered
-
Proxy-level batching only (no Broker changes): Group entries by broker at the Proxy, but still send individual CHANGE_MESSAGE_INVISIBLETIME requests. This reduces Proxy-side processing overhead but not the number of network round-trips or RocksDB flushes. Rejected because the Broker-side RocksDB flush reduction is the main performance win.
-
Reuse BATCH_ACK_MESSAGE with BitSet merging: BatchAck merges multiple offsets sharing the same CK into a BitSet. This doesn't work for ChangeInvisibleTime because each entry requires its own new checkpoint write + old checkpoint ack, and each entry may have a different invisibleTime. A list-based request body is necessary.
-
Async fire-and-forget (no per-entry response): For the returnHandleGroup scenario, per-entry results aren't strictly needed. However, the renewal scenario (startRenewMessage) requires per-entry new receipt handles to update the cached handle via messageReceiptHandle.updateReceiptHandle(). Supporting per-entry responses makes the batch API general-purpose across all scenarios.
Additional Context
RocksDB batch optimization detail:
Current (N messages = 2N db.write()):
// PopConsumerService.changeInvisibilityDuration() — called N times
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord)); // 1 WriteBatch, 1 entry
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); // 1 WriteBatch, 1 entry
Proposed (N messages = 2 db.write()):
// PopConsumerService.batchChangeInvisibilityDuration() — called once
this.popConsumerStore.writeRecords(allCkRecords); // 1 WriteBatch, N entries
this.popConsumerStore.deleteRecords(allAckRecords); // 1 WriteBatch, N entries
PopConsumerRocksdbStore.writeRecords/deleteRecords already accept List<PopConsumerRecord> and use WriteBatch internally, so the infrastructure for batch RocksDB writes is already in place — we just need to stop passing Collections.singletonList().
Before Creating the Enhancement Request
Summary
Add
BatchChangeInvisibleTimesupport in the Proxy and Broker, following the existingBatchAckpattern. Currently, when the Proxy needs to change invisible time for multiple messages (client disconnect, write-back failure, periodic renewal, message filtering), it sends individualCHANGE_MESSAGE_INVISIBLETIMErequests to the Broker one by one. This causes excessive network round-trips and, on the PopConsumerKVService (RocksDB) path, excessive RocksDB flushes (2Ndb.write()calls for N messages).Motivation
Multiple scenarios in the Proxy require batch
changeInvisibleTime:Client disconnect (
DefaultReceiptHandleManager.returnHandleGroup): When a client goes offline, the Proxy scans all unacked receipt handles for that client and callschangeInvisibleTimefor each one individually. A consumer with many in-flight messages triggers N separate network round-trips.ReceiveMessage write-back failure (
ReceiveMessageResponseStreamWriter): When writing messages back to the gRPC client fails mid-stream, the Proxy nacks all remaining messages one by one.Periodic renewal (
DefaultReceiptHandleManager.scheduleRenewTask): The Proxy periodically renews receipt handles that are about to expire. Each handle is renewed individually via a separatechangeInvisibleTimecall.Pop message filtering (
ConsumerProcessor.filterPopResult): Messages filtered asTO_RETURNare changed invisible time individually.On the Broker side with
PopConsumerKVServiceEnable=true, each individualchangeInvisibleTimecall inPopConsumerService.changeInvisibilityDuration()does:popConsumerStore.writeRecords(Collections.singletonList(ckRecord))— 1 RocksDB WriteBatch with 1 entrypopConsumerStore.deleteRecords(Collections.singletonList(ackRecord))— 1 RocksDB WriteBatch with 1 entryFor N messages, this results in 2N
db.write()calls. SincePopConsumerRocksdbStoreusessync=trueon WriteOptions, each write forces a WAL flush, making this a significant bottleneck.Describe the Solution You'd Like
Add a new
BATCH_CHANGE_MESSAGE_INVISIBLETIMErequest code, following the existingBatchAck(BATCH_ACK_MESSAGE) pattern:Request format — Each entry carries the same parameters as the single
ChangeInvisibleTimeRequestHeader:consumerGroup,topic,queueId,extraInfo,offset,invisibleTime,liteTopic,suspendResponse format — Per-entry results with new receipt handle info (same as
ChangeInvisibleTimeResponseHeader):code(SUCCESS or error),popTime,invisibleTime,reviveQidLayer-by-layer changes:
Remoting: Add
RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME,BatchChangeInvisibleTimeRequestBody(withList<ChangeInvisibleTimeRequestEntry>),BatchChangeInvisibleTimeResponseBody(with per-entry results)Broker — True batch processing:
PopConsumerService.batchChangeInvisibilityDuration(): Collect all CK records and ack records, then callwriteRecords(allCkRecords)once anddeleteRecords(allAckRecords)once. This reduces N messages from 2Ndb.write()to just 2db.write()calls.ChangeInvisibleTimeProcessor: HandleBATCH_CHANGE_MESSAGE_INVISIBLETIME— on KV path, delegate to the batch method above; on traditional revive path, iterate and callappendCheckPointThenAckOrigin()per entry.BrokerController: Register the new request code tochangeInvisibleTimeProcessor.Client: Add
batchChangeInvisibleTimeAsync()toMQClientAPIImplandMQClientAPIExt.Proxy Service: Add
batchChangeInvisibleTime()toMessageService,ClusterMessageService, andLocalMessageService.Proxy Processor: Add
batchChangeInvisibleTime()toConsumerProcessor(group by broker, filter expired handles),MessagingProcessorinterface, andDefaultMessagingProcessor.Caller integration:
DefaultReceiptHandleManager.returnHandleGroup(): Collect all handles, batch sendDefaultReceiptHandleManager.scheduleRenewTask(): Collect handles needing renewal per group key, batch send, update each handle with per-entry resultReceiveMessageResponseStreamWriter: Collect all nack messages, batch sendConsumerProcessor.filterPopResult(): Collect all TO_RETURN messages, batch sendConfig: Add
enableBatchChangeInvisibleTimeflag (default false) for gradual rollout.Describe Alternatives You've Considered
Proxy-level batching only (no Broker changes): Group entries by broker at the Proxy, but still send individual
CHANGE_MESSAGE_INVISIBLETIMErequests. This reduces Proxy-side processing overhead but not the number of network round-trips or RocksDB flushes. Rejected because the Broker-side RocksDB flush reduction is the main performance win.Reuse
BATCH_ACK_MESSAGEwith BitSet merging:BatchAckmerges multiple offsets sharing the same CK into a BitSet. This doesn't work forChangeInvisibleTimebecause each entry requires its own new checkpoint write + old checkpoint ack, and each entry may have a differentinvisibleTime. A list-based request body is necessary.Async fire-and-forget (no per-entry response): For the
returnHandleGroupscenario, per-entry results aren't strictly needed. However, the renewal scenario (startRenewMessage) requires per-entry new receipt handles to update the cached handle viamessageReceiptHandle.updateReceiptHandle(). Supporting per-entry responses makes the batch API general-purpose across all scenarios.Additional Context
RocksDB batch optimization detail:
Current (N messages = 2N
db.write()):Proposed (N messages = 2
db.write()):PopConsumerRocksdbStore.writeRecords/deleteRecordsalready acceptList<PopConsumerRecord>and useWriteBatchinternally, so the infrastructure for batch RocksDB writes is already in place — we just need to stop passingCollections.singletonList().