Skip to content

[Enhancement] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes #10489

@qianye1001

Description

@qianye1001

Before Creating the Enhancement Request

  • I have confirmed that this should be classified as an enhancement rather than a bug/feature.

Summary

Add BatchChangeInvisibleTime support in the Proxy and Broker, following the existing BatchAck pattern. Currently, when the Proxy needs to change invisible time for multiple messages (client disconnect, write-back failure, periodic renewal, message filtering), it sends individual CHANGE_MESSAGE_INVISIBLETIME requests to the Broker one by one. This causes excessive network round-trips and, on the PopConsumerKVService (RocksDB) path, excessive RocksDB flushes (2N db.write() calls for N messages).

Motivation

Multiple scenarios in the Proxy require batch changeInvisibleTime:

  1. Client disconnect (DefaultReceiptHandleManager.returnHandleGroup): When a client goes offline, the Proxy scans all unacked receipt handles for that client and calls changeInvisibleTime for each one individually. A consumer with many in-flight messages triggers N separate network round-trips.

  2. ReceiveMessage write-back failure (ReceiveMessageResponseStreamWriter): When writing messages back to the gRPC client fails mid-stream, the Proxy nacks all remaining messages one by one.

  3. Periodic renewal (DefaultReceiptHandleManager.scheduleRenewTask): The Proxy periodically renews receipt handles that are about to expire. Each handle is renewed individually via a separate changeInvisibleTime call.

  4. Pop message filtering (ConsumerProcessor.filterPopResult): Messages filtered as TO_RETURN are changed invisible time individually.

On the Broker side with PopConsumerKVServiceEnable=true, each individual changeInvisibleTime call in PopConsumerService.changeInvisibilityDuration() does:

  • popConsumerStore.writeRecords(Collections.singletonList(ckRecord)) — 1 RocksDB WriteBatch with 1 entry
  • popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)) — 1 RocksDB WriteBatch with 1 entry

For N messages, this results in 2N db.write() calls. Since PopConsumerRocksdbStore uses sync=true on WriteOptions, each write forces a WAL flush, making this a significant bottleneck.

Describe the Solution You'd Like

Add a new BATCH_CHANGE_MESSAGE_INVISIBLETIME request code, following the existing BatchAck (BATCH_ACK_MESSAGE) pattern:

Request format — Each entry carries the same parameters as the single ChangeInvisibleTimeRequestHeader:

  • consumerGroup, topic, queueId, extraInfo, offset, invisibleTime, liteTopic, suspend

Response format — Per-entry results with new receipt handle info (same as ChangeInvisibleTimeResponseHeader):

  • code (SUCCESS or error), popTime, invisibleTime, reviveQid

Layer-by-layer changes:

  1. Remoting: Add RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME, BatchChangeInvisibleTimeRequestBody (with List<ChangeInvisibleTimeRequestEntry>), BatchChangeInvisibleTimeResponseBody (with per-entry results)

  2. Broker — True batch processing:

    • PopConsumerService.batchChangeInvisibilityDuration(): Collect all CK records and ack records, then call writeRecords(allCkRecords) once and deleteRecords(allAckRecords) once. This reduces N messages from 2N db.write() to just 2 db.write() calls.
    • ChangeInvisibleTimeProcessor: Handle BATCH_CHANGE_MESSAGE_INVISIBLETIME — on KV path, delegate to the batch method above; on traditional revive path, iterate and call appendCheckPointThenAckOrigin() per entry.
    • BrokerController: Register the new request code to changeInvisibleTimeProcessor.
  3. Client: Add batchChangeInvisibleTimeAsync() to MQClientAPIImpl and MQClientAPIExt.

  4. Proxy Service: Add batchChangeInvisibleTime() to MessageService, ClusterMessageService, and LocalMessageService.

  5. Proxy Processor: Add batchChangeInvisibleTime() to ConsumerProcessor (group by broker, filter expired handles), MessagingProcessor interface, and DefaultMessagingProcessor.

  6. Caller integration:

    • DefaultReceiptHandleManager.returnHandleGroup(): Collect all handles, batch send
    • DefaultReceiptHandleManager.scheduleRenewTask(): Collect handles needing renewal per group key, batch send, update each handle with per-entry result
    • ReceiveMessageResponseStreamWriter: Collect all nack messages, batch send
    • ConsumerProcessor.filterPopResult(): Collect all TO_RETURN messages, batch send
  7. Config: Add enableBatchChangeInvisibleTime flag (default false) for gradual rollout.

Describe Alternatives You've Considered

  1. Proxy-level batching only (no Broker changes): Group entries by broker at the Proxy, but still send individual CHANGE_MESSAGE_INVISIBLETIME requests. This reduces Proxy-side processing overhead but not the number of network round-trips or RocksDB flushes. Rejected because the Broker-side RocksDB flush reduction is the main performance win.

  2. Reuse BATCH_ACK_MESSAGE with BitSet merging: BatchAck merges multiple offsets sharing the same CK into a BitSet. This doesn't work for ChangeInvisibleTime because each entry requires its own new checkpoint write + old checkpoint ack, and each entry may have a different invisibleTime. A list-based request body is necessary.

  3. Async fire-and-forget (no per-entry response): For the returnHandleGroup scenario, per-entry results aren't strictly needed. However, the renewal scenario (startRenewMessage) requires per-entry new receipt handles to update the cached handle via messageReceiptHandle.updateReceiptHandle(). Supporting per-entry responses makes the batch API general-purpose across all scenarios.

Additional Context

RocksDB batch optimization detail:

Current (N messages = 2N db.write()):

// PopConsumerService.changeInvisibilityDuration() — called N times
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));   // 1 WriteBatch, 1 entry
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); // 1 WriteBatch, 1 entry

Proposed (N messages = 2 db.write()):

// PopConsumerService.batchChangeInvisibilityDuration() — called once
this.popConsumerStore.writeRecords(allCkRecords);     // 1 WriteBatch, N entries
this.popConsumerStore.deleteRecords(allAckRecords);    // 1 WriteBatch, N entries

PopConsumerRocksdbStore.writeRecords/deleteRecords already accept List<PopConsumerRecord> and use WriteBatch internally, so the infrastructure for batch RocksDB writes is already in place — we just need to stop passing Collections.singletonList().

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions