Skip to content

[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509

Open
Zhengcy05 wants to merge 1 commit into
apache:developfrom
Zhengcy05:enhance/proxy-grpc-offset-rpc
Open

[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509
Zhengcy05 wants to merge 1 commit into
apache:developfrom
Zhengcy05:enhance/proxy-grpc-offset-rpc

Conversation

@Zhengcy05

Copy link
Copy Markdown

Which Issue(s) This PR Fixes

Fixes #10434

  • Fixes #issue_id

Brief Description

This PR implements the v5 gRPC QueryOffset and GetOffset RPCs in the proxy.
Previously, these RPCs were declared by the proto service but were not overridden by the proxy gRPC service implementation, so clients received UNIMPLEMENTED.
The change adds a new OffsetActivity, wires it through the gRPC activity/application layers, and maps:

  • GetOffset to consumer offset query
  • QueryOffset(BEGINNING) to min offset
  • QueryOffset(END) to max offset
  • QueryOffset(TIMESTAMP) to timestamp-based offset search
    It also exposes searchOffset through the proxy processor/message service layers so the timestamp policy can reuse the existing broker remoting capability.

How Did You Test This Change?

env JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home mvn -pl proxy -Dspotbugs.skip=true -Dtest=OffsetActivityTest,GrpcMessagingApplicationTest -DfailIfNoTests=false test

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) to allow clients to query consumer group offsets via the Proxy layer. Adds new OffsetActivity, GrpcOffsetConverter, and processor methods across 12 files.

Findings

  • [Warning] OffsetActivity.java:56getOffset() uses messagingProcessor.getOffset() inside CompletableFuture.supplyAsync() with a cached thread pool. If messagingProcessor.getOffset() is a blocking call (e.g., involves network I/O to broker), ensure the thread pool has bounded size to prevent resource exhaustion under load. Consider using a dedicated executor for offset queries.
  • [Warning] GrpcOffsetConverter.java — The converter maps between gRPC GetOffsetRequest/QueryOffsetRequest and internal request objects. Verify that all required fields are validated — particularly resource.name() (consumer group) which if empty could cause unexpected behavior downstream.
  • [Info] DefaultGrpcMessagingActivity.java — New OffsetActivity is correctly initialized and wired into the lifecycle (startAsync/awaitRunning/stopAsync). The pattern follows existing activities consistently.
  • [Info] Proxy gRPC service definition — Adding getOffset and queryOffset to the messaging service extends the Proxy API surface. Ensure the proto definitions are also updated in the rocketmq-clients repo for cross-language support.

Suggestions

  • Cross-repo: This PR adds new gRPC RPCs to the Proxy. The apache/rocketmq-clients repo may need corresponding proto updates and client-side implementations for GetOffset/QueryOffset. Consider coordinating with client SDK maintainers.
  • Add integration tests for the new offset query endpoints, especially edge cases: non-existent consumer group, empty group name, broker unreachable.
  • Consider adding rate limiting or caching for offset queries if they're expected to be high-frequency, as they may trigger broker-side lookups.

Compatibility Note

This extends the gRPC API. Existing clients will not be affected since these are new RPCs. However, clients that don't know about these RPCs will get UNIMPLEMENTED if they try to call an older Proxy version.

Solid feature implementation with good structural consistency. 🚀


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Proxy v5 gRPC: implement QueryOffset / GetOffset so clients can read consumer offsets

2 participants