diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java index 88099207b93..581c474d2b6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java @@ -24,12 +24,16 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.NotifyClientTerminationResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.RecallMessageRequest; @@ -53,6 +57,7 @@ import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; import org.apache.rocketmq.proxy.grpc.v2.consumer.AckMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.consumer.ChangeInvisibleDurationActivity; +import org.apache.rocketmq.proxy.grpc.v2.consumer.OffsetActivity; import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.ForwardMessageToDLQActivity; import org.apache.rocketmq.proxy.grpc.v2.producer.RecallMessageActivity; @@ -69,6 +74,7 @@ public class DefaultGrpcMessagingActivity extends AbstractStartAndShutdown imple protected ReceiveMessageActivity receiveMessageActivity; protected AckMessageActivity ackMessageActivity; protected ChangeInvisibleDurationActivity changeInvisibleDurationActivity; + protected OffsetActivity offsetActivity; protected SendMessageActivity sendMessageActivity; protected RecallMessageActivity recallMessageActivity; protected ForwardMessageToDLQActivity forwardMessageToDLQActivity; @@ -87,6 +93,7 @@ protected void init(MessagingProcessor messagingProcessor) { this.receiveMessageActivity = new ReceiveMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.ackMessageActivity = new AckMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.changeInvisibleDurationActivity = new ChangeInvisibleDurationActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.offsetActivity = new OffsetActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.recallMessageActivity = new RecallMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.forwardMessageToDLQActivity = new ForwardMessageToDLQActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); @@ -118,6 +125,16 @@ public CompletableFuture queryAssignment(ProxyContext c return this.routeActivity.queryAssignment(ctx, request); } + @Override + public CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request) { + return this.offsetActivity.queryOffset(ctx, request); + } + + @Override + public CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request) { + return this.offsetActivity.getOffset(ctx, request); + } + @Override public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, StreamObserver responseObserver) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java index de68f0f8e50..e4317bda8de 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java @@ -25,12 +25,16 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.NotifyClientTerminationResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.RecallMessageRequest; @@ -57,6 +61,10 @@ public interface GrpcMessagingActivity extends StartAndShutdown { CompletableFuture queryAssignment(ProxyContext ctx, QueryAssignmentRequest request); + CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request); + + CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request); + void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, StreamObserver responseObserver); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 3429ad54e27..ecca3324fb5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -26,6 +26,8 @@ import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest; import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.HeartbeatResponse; import apache.rocketmq.v2.MessagingServiceGrpc; @@ -33,6 +35,8 @@ import apache.rocketmq.v2.NotifyClientTerminationResponse; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.RecallMessageRequest; @@ -303,6 +307,40 @@ public void ackMessage(AckMessageRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> QueryOffsetResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessagingActivity.queryOffset(context, request) + .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + + @Override + public void getOffset(GetOffsetRequest request, StreamObserver responseObserver) { + Function statusResponseCreator = status -> GetOffsetResponse.newBuilder().setStatus(status).build(); + ProxyContext context = createContext(); + try { + this.addExecutor(this.consumerThreadPoolExecutor, + context, + request, + () -> grpcMessagingActivity.getOffset(context, request) + .whenComplete((response, throwable) -> writeResponse(context, request, response, responseObserver, throwable, statusResponseCreator)), + responseObserver, + statusResponseCreator); + } catch (Throwable t) { + writeResponse(context, request, null, responseObserver, t, statusResponseCreator); + } + } + @Override public void forwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest request, StreamObserver responseObserver) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivity.java new file mode 100644 index 00000000000..9217ef06635 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivity.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.grpc.v2.consumer; + +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; +import apache.rocketmq.v2.QueryOffsetPolicy; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; +import com.google.protobuf.util.Timestamps; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; +import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +public class OffsetActivity extends AbstractMessagingActivity { + + public OffsetActivity(MessagingProcessor messagingProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + public CompletableFuture getOffset(ProxyContext ctx, GetOffsetRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + validateTopicAndConsumerGroup(request.getMessageQueue().getTopic(), request.getGroup()); + org.apache.rocketmq.common.message.MessageQueue messageQueue = convertMessageQueue(request.getMessageQueue()); + return this.messagingProcessor.queryConsumerOffset( + ctx, + messageQueue, + request.getGroup().getName(), + MessagingProcessor.DEFAULT_TIMEOUT_MILLS + ).thenApply(this::convertToGetOffsetResponse); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + + public CompletableFuture queryOffset(ProxyContext ctx, QueryOffsetRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + validateTopic(request.getMessageQueue().getTopic()); + return queryOffset0(ctx, request).thenApply(this::convertToQueryOffsetResponse); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; + } + + protected CompletableFuture queryOffset0(ProxyContext ctx, QueryOffsetRequest request) { + QueryOffsetPolicy queryOffsetPolicy = request.getQueryOffsetPolicy(); + org.apache.rocketmq.common.message.MessageQueue messageQueue = convertMessageQueue(request.getMessageQueue()); + switch (queryOffsetPolicy) { + case BEGINNING: + return this.messagingProcessor.getMinOffset( + ctx, + messageQueue, + MessagingProcessor.DEFAULT_TIMEOUT_MILLS + ); + case END: + return this.messagingProcessor.getMaxOffset( + ctx, + messageQueue, + MessagingProcessor.DEFAULT_TIMEOUT_MILLS + ); + case TIMESTAMP: + if (!request.hasTimestamp()) { + throw new GrpcProxyException(Code.BAD_REQUEST, "timestamp is required for TIMESTAMP query offset policy"); + } + return this.messagingProcessor.searchOffset( + ctx, + messageQueue, + Timestamps.toMillis(request.getTimestamp()), + MessagingProcessor.DEFAULT_TIMEOUT_MILLS + ); + default: + throw new GrpcProxyException(Code.BAD_REQUEST, "unsupported query offset policy: " + queryOffsetPolicy); + } + } + + protected org.apache.rocketmq.common.message.MessageQueue convertMessageQueue( + apache.rocketmq.v2.MessageQueue messageQueue) { + return new org.apache.rocketmq.common.message.MessageQueue( + messageQueue.getTopic().getName(), + messageQueue.getBroker().getName(), + messageQueue.getId() + ); + } + + protected GetOffsetResponse convertToGetOffsetResponse(long offset) { + return GetOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(offset) + .build(); + } + + protected QueryOffsetResponse convertToQueryOffsetResponse(long offset) { + return QueryOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(offset) + .build(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index b66d57c62a6..b17038587fc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -62,6 +62,7 @@ import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -619,6 +620,23 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture searchOffset(ProxyContext ctx, MessageQueue messageQueue, long timestamp, + long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + try { + AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() + .buildAddressableMessageQueue(ctx, messageQueue); + SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); + requestHeader.setTopic(addressableMessageQueue.getTopic()); + requestHeader.setQueueId(addressableMessageQueue.getQueueId()); + requestHeader.setTimestamp(timestamp); + future = serviceManager.getMessageService().searchOffset(ctx, addressableMessageQueue, requestHeader, timeoutMillis); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + protected Set buildAddressableSet(ProxyContext ctx, Set mqSet) { Set addressableMessageQueueSet = new HashSet<>(mqSet.size()); for (MessageQueue mq : mqSet) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 3e7a8894859..164b455d2ec 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -288,6 +288,12 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, MessageQueue messageQueue, long timestamp, + long timeoutMillis) { + return this.consumerProcessor.searchOffset(ctx, messageQueue, timestamp, timeoutMillis); + } + @Override public CompletableFuture recallMessage(ProxyContext ctx, String topic, String recallHandle, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index a1500dbdedd..7920813c15c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -345,6 +345,13 @@ CompletableFuture getMinOffset( long timeoutMillis ); + CompletableFuture searchOffset( + ProxyContext ctx, + MessageQueue messageQueue, + long timestamp, + long timeoutMillis + ); + CompletableFuture recallMessage( ProxyContext ctx, String topic, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index 77c4ef60f14..c5e6866dee9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -249,6 +250,16 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage ); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().searchOffset( + messageQueue.getBrokerAddr(), + requestHeader, + timeoutMillis + ); + } + @Override public CompletableFuture recallMessage(ProxyContext ctx, String brokerName, RecallMessageRequestHeader requestHeader, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index c93fa93983c..f0ad81e614e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -74,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -479,6 +480,12 @@ public CompletableFuture getMinOffset(ProxyContext ctx, AddressableMessage throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService"); } + @Override + public CompletableFuture searchOffset(ProxyContext ctx, AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, long timeoutMillis) { + throw new NotImplementedException("searchOffset is not implemented in LocalMessageService"); + } + @Override public CompletableFuture recallMessage(ProxyContext ctx, String brokerName, RecallMessageRequestHeader requestHeader, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 1e828c36fd9..75b4749bf3c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; @@ -164,6 +165,13 @@ CompletableFuture getMinOffset( long timeoutMillis ); + CompletableFuture searchOffset( + ProxyContext ctx, + AddressableMessageQueue messageQueue, + SearchOffsetRequestHeader requestHeader, + long timeoutMillis + ); + CompletableFuture recallMessage( ProxyContext ctx, String brokerName, diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java index 3ce701cfb4a..b8bcb6903a6 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java @@ -21,7 +21,12 @@ import apache.rocketmq.v2.AddressScheme; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.Endpoints; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; import apache.rocketmq.v2.MessageQueue; +import apache.rocketmq.v2.QueryOffsetPolicy; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; import apache.rocketmq.v2.Resource; @@ -57,6 +62,10 @@ public class GrpcMessagingApplicationTest extends InitConfigTest { @Mock StreamObserver queryRouteResponseStreamObserver; @Mock + StreamObserver queryOffsetResponseStreamObserver; + @Mock + StreamObserver getOffsetResponseStreamObserver; + @Mock GrpcMessagingActivity grpcMessagingActivity; GrpcMessagingApplication grpcMessagingApplication; @@ -106,6 +115,70 @@ public void testQueryRoute() { }); } + @Test + public void testQueryOffset() { + Metadata metadata = new Metadata(); + metadata.put(GrpcConstants.CLIENT_ID, CLIENT_ID); + metadata.put(GrpcConstants.LANGUAGE, JAVA); + metadata.put(GrpcConstants.REMOTE_ADDRESS, REMOTE_ADDR); + metadata.put(GrpcConstants.LOCAL_ADDRESS, LOCAL_ADDR); + + Assert.assertNotNull(Context.current() + .withValue(GrpcConstants.METADATA, metadata) + .attach()); + + CompletableFuture future = new CompletableFuture<>(); + QueryOffsetRequest request = QueryOffsetRequest.newBuilder() + .setMessageQueue(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .build()) + .setQueryOffsetPolicy(QueryOffsetPolicy.END) + .build(); + Mockito.when(grpcMessagingActivity.queryOffset(Mockito.any(ProxyContext.class), Mockito.eq(request))) + .thenReturn(future); + QueryOffsetResponse response = QueryOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(1) + .build(); + grpcMessagingApplication.queryOffset(request, queryOffsetResponseStreamObserver); + future.complete(response); + await().untilAsserted(() -> { + Mockito.verify(queryOffsetResponseStreamObserver, Mockito.times(1)).onNext(Mockito.same(response)); + }); + } + + @Test + public void testGetOffset() { + Metadata metadata = new Metadata(); + metadata.put(GrpcConstants.CLIENT_ID, CLIENT_ID); + metadata.put(GrpcConstants.LANGUAGE, JAVA); + metadata.put(GrpcConstants.REMOTE_ADDRESS, REMOTE_ADDR); + metadata.put(GrpcConstants.LOCAL_ADDRESS, LOCAL_ADDR); + + Assert.assertNotNull(Context.current() + .withValue(GrpcConstants.METADATA, metadata) + .attach()); + + CompletableFuture future = new CompletableFuture<>(); + GetOffsetRequest request = GetOffsetRequest.newBuilder() + .setGroup(Resource.newBuilder().setName("group").build()) + .setMessageQueue(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .build()) + .build(); + Mockito.when(grpcMessagingActivity.getOffset(Mockito.any(ProxyContext.class), Mockito.eq(request))) + .thenReturn(future); + GetOffsetResponse response = GetOffsetResponse.newBuilder() + .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) + .setOffset(1) + .build(); + grpcMessagingApplication.getOffset(request, getOffsetResponseStreamObserver); + future.complete(response); + await().untilAsserted(() -> { + Mockito.verify(getOffsetResponseStreamObserver, Mockito.times(1)).onNext(Mockito.same(response)); + }); + } + @Test public void testQueryRouteWithBadClientID() { Metadata metadata = new Metadata(); @@ -130,4 +203,4 @@ public void testQueryRouteWithBadClientID() { assertEquals(Code.CLIENT_ID_REQUIRED, responseArgumentCaptor.getValue().getStatus().getCode()); } -} \ No newline at end of file +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivityTest.java new file mode 100644 index 00000000000..1be558fb06f --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/OffsetActivityTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.consumer; + +import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.GetOffsetRequest; +import apache.rocketmq.v2.GetOffsetResponse; +import apache.rocketmq.v2.MessageQueue; +import apache.rocketmq.v2.QueryOffsetPolicy; +import apache.rocketmq.v2.QueryOffsetRequest; +import apache.rocketmq.v2.QueryOffsetResponse; +import apache.rocketmq.v2.Resource; +import com.google.protobuf.util.Timestamps; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +public class OffsetActivityTest extends BaseActivityTest { + + private static final String TOPIC = "topic"; + private static final String GROUP = "group"; + private static final String BROKER_NAME = "brokerName"; + private static final long OFFSET = 123L; + private static final long TIMESTAMP = 1000L; + + private OffsetActivity offsetActivity; + private MessageQueue messageQueue; + private org.apache.rocketmq.common.message.MessageQueue commonMessageQueue; + + @Before + public void before() throws Throwable { + super.before(); + this.offsetActivity = new OffsetActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.messageQueue = MessageQueue.newBuilder() + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .setBroker(Broker.newBuilder().setName(BROKER_NAME).build()) + .setId(0) + .build(); + this.commonMessageQueue = new org.apache.rocketmq.common.message.MessageQueue(TOPIC, BROKER_NAME, 0); + } + + @Test + public void testGetOffset() throws Throwable { + ArgumentCaptor messageQueueCaptor = + ArgumentCaptor.forClass(org.apache.rocketmq.common.message.MessageQueue.class); + when(this.messagingProcessor.queryConsumerOffset( + any(), + messageQueueCaptor.capture(), + eq(GROUP), + eq(MessagingProcessor.DEFAULT_TIMEOUT_MILLS) + )).thenReturn(CompletableFuture.completedFuture(OFFSET)); + + GetOffsetResponse response = this.offsetActivity.getOffset( + createContext(), + GetOffsetRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(GROUP).build()) + .setMessageQueue(messageQueue) + .build() + ).get(); + + assertEquals(Code.OK, response.getStatus().getCode()); + assertEquals(OFFSET, response.getOffset()); + assertEquals(commonMessageQueue, messageQueueCaptor.getValue()); + } + + @Test + public void testQueryOffsetBeginning() throws Throwable { + when(this.messagingProcessor.getMinOffset(any(), eq(commonMessageQueue), anyLong())) + .thenReturn(CompletableFuture.completedFuture(OFFSET)); + + QueryOffsetResponse response = this.offsetActivity.queryOffset( + createContext(), + QueryOffsetRequest.newBuilder() + .setMessageQueue(messageQueue) + .setQueryOffsetPolicy(QueryOffsetPolicy.BEGINNING) + .build() + ).get(); + + assertEquals(Code.OK, response.getStatus().getCode()); + assertEquals(OFFSET, response.getOffset()); + } + + @Test + public void testQueryOffsetEnd() throws Throwable { + when(this.messagingProcessor.getMaxOffset(any(), eq(commonMessageQueue), anyLong())) + .thenReturn(CompletableFuture.completedFuture(OFFSET)); + + QueryOffsetResponse response = this.offsetActivity.queryOffset( + createContext(), + QueryOffsetRequest.newBuilder() + .setMessageQueue(messageQueue) + .setQueryOffsetPolicy(QueryOffsetPolicy.END) + .build() + ).get(); + + assertEquals(Code.OK, response.getStatus().getCode()); + assertEquals(OFFSET, response.getOffset()); + } + + @Test + public void testQueryOffsetTimestamp() throws Throwable { + ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.searchOffset( + any(), + eq(commonMessageQueue), + timestampCaptor.capture(), + eq(MessagingProcessor.DEFAULT_TIMEOUT_MILLS) + )).thenReturn(CompletableFuture.completedFuture(OFFSET)); + + QueryOffsetResponse response = this.offsetActivity.queryOffset( + createContext(), + QueryOffsetRequest.newBuilder() + .setMessageQueue(messageQueue) + .setQueryOffsetPolicy(QueryOffsetPolicy.TIMESTAMP) + .setTimestamp(Timestamps.fromMillis(TIMESTAMP)) + .build() + ).get(); + + assertEquals(Code.OK, response.getStatus().getCode()); + assertEquals(OFFSET, response.getOffset()); + assertEquals(TIMESTAMP, timestampCaptor.getValue().longValue()); + } + + @Test + public void testQueryOffsetTimestampWithoutTimestamp() throws Throwable { + try { + this.offsetActivity.queryOffset( + createContext(), + QueryOffsetRequest.newBuilder() + .setMessageQueue(messageQueue) + .setQueryOffsetPolicy(QueryOffsetPolicy.TIMESTAMP) + .build() + ).get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof GrpcProxyException); + GrpcProxyException grpcProxyException = (GrpcProxyException) e.getCause(); + assertEquals(Code.BAD_REQUEST, grpcProxyException.getCode()); + return; + } + throw new AssertionError("Expected GrpcProxyException"); + } +}