[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509
Open
Zhengcy05 wants to merge 1 commit into
Open
Conversation
RockteMQ-AI
reviewed
Jun 15, 2026
RockteMQ-AI
left a comment
Contributor
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) to allow clients to query consumer group offsets via the Proxy layer. Adds new OffsetActivity, GrpcOffsetConverter, and processor methods across 12 files.
Findings
- [Warning]
OffsetActivity.java:56—getOffset()usesmessagingProcessor.getOffset()insideCompletableFuture.supplyAsync()with a cached thread pool. IfmessagingProcessor.getOffset()is a blocking call (e.g., involves network I/O to broker), ensure the thread pool has bounded size to prevent resource exhaustion under load. Consider using a dedicated executor for offset queries. - [Warning]
GrpcOffsetConverter.java— The converter maps between gRPCGetOffsetRequest/QueryOffsetRequestand internal request objects. Verify that all required fields are validated — particularlyresource.name()(consumer group) which if empty could cause unexpected behavior downstream. - [Info]
DefaultGrpcMessagingActivity.java— NewOffsetActivityis correctly initialized and wired into the lifecycle (startAsync/awaitRunning/stopAsync). The pattern follows existing activities consistently. - [Info]
Proxy gRPC service definition— AddinggetOffsetandqueryOffsetto the messaging service extends the Proxy API surface. Ensure the proto definitions are also updated in therocketmq-clientsrepo for cross-language support.
Suggestions
- Cross-repo: This PR adds new gRPC RPCs to the Proxy. The
apache/rocketmq-clientsrepo may need corresponding proto updates and client-side implementations forGetOffset/QueryOffset. Consider coordinating with client SDK maintainers. - Add integration tests for the new offset query endpoints, especially edge cases: non-existent consumer group, empty group name, broker unreachable.
- Consider adding rate limiting or caching for offset queries if they're expected to be high-frequency, as they may trigger broker-side lookups.
Compatibility Note
This extends the gRPC API. Existing clients will not be affected since these are new RPCs. However, clients that don't know about these RPCs will get UNIMPLEMENTED if they try to call an older Proxy version.
Solid feature implementation with good structural consistency. 🚀
Automated review by github-manager-bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which Issue(s) This PR Fixes
Fixes #10434
Brief Description
This PR implements the v5 gRPC QueryOffset and GetOffset RPCs in the proxy.
Previously, these RPCs were declared by the proto service but were not overridden by the proxy gRPC service implementation, so clients received UNIMPLEMENTED.
The change adds a new OffsetActivity, wires it through the gRPC activity/application layers, and maps:
It also exposes searchOffset through the proxy processor/message service layers so the timestamp policy can reuse the existing broker remoting capability.
How Did You Test This Change?
env JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home mvn -pl proxy -Dspotbugs.skip=true -Dtest=OffsetActivityTest,GrpcMessagingApplicationTest -DfailIfNoTests=false test