diff --git a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java index f462aabc0df..95a720da236 100644 --- a/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java +++ b/auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java @@ -68,6 +68,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.RequestHeaderRegistry; +import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader; @@ -322,6 +324,39 @@ public List build(ChannelHandlerContext context, Re } } break; + case RequestCode.DELETE_TOPIC_IN_BROKER_LIST: + // Batch APIs carry their target list in the request body, not in an annotated + // CommandCustomHeader, so the annotation-based path in + // RequestHeaderRegistry would otherwise produce an empty context list and let + // the request through without a DELETE permission check. Decode the body and + // emit one DELETE context per topic instead. + DeleteTopicListRequestBody deleteTopicListRequestBody = + DeleteTopicListRequestBody.decode(command.getBody(), DeleteTopicListRequestBody.class); + if (CollectionUtils.isNotEmpty(deleteTopicListRequestBody.getTopicList())) { + for (String topicName : deleteTopicListRequestBody.getTopicList()) { + if (StringUtils.isBlank(topicName)) { + continue; + } + topic = Resource.ofTopic(topicName); + result.add(DefaultAuthorizationContext.of(subject, topic, Action.DELETE, sourceIp)); + } + } + break; + case RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST: + // See DELETE_TOPIC_IN_BROKER_LIST: emit one DELETE context per group from the + // request body so authorization can enforce per-group DELETE permission. + DeleteSubscriptionGroupListRequestBody deleteGroupListRequestBody = + DeleteSubscriptionGroupListRequestBody.decode(command.getBody(), DeleteSubscriptionGroupListRequestBody.class); + if (CollectionUtils.isNotEmpty(deleteGroupListRequestBody.getGroupNameList())) { + for (String groupName : deleteGroupListRequestBody.getGroupNameList()) { + if (StringUtils.isBlank(groupName)) { + continue; + } + group = Resource.ofGroup(groupName); + result.add(DefaultAuthorizationContext.of(subject, group, Action.DELETE, sourceIp)); + } + } + break; default: result = buildContextByAnnotation(subject, command, sourceIp); break; diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java index 920a0e12009..a39e4e7cf32 100644 --- a/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java @@ -57,6 +57,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.RequestHeaderRegistry; +import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -570,6 +572,49 @@ public void buildRemoting() { Assert.assertEquals("channel-id", getContext(result, ResourceType.TOPIC).getChannelId()); Assert.assertEquals(String.valueOf(RequestCode.UNLOCK_BATCH_MQ), getContext(result, ResourceType.TOPIC).getRpcCode()); + // DELETE_TOPIC_IN_BROKER_LIST: body-driven, must yield one DELETE context per topic. + DeleteTopicListRequestBody deleteTopicListBody = new DeleteTopicListRequestBody(); + deleteTopicListBody.setTopicList(Arrays.asList("topicA", "topicB", "", " ")); + + request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER_LIST, null); + request.setBody(JSON.toJSONBytes(deleteTopicListBody)); + request.setVersion(441); + request.addExtField("AccessKey", "rocketmq"); + request.makeCustomHeaderToNet(); + + result = builder.build(channelHandlerContext, request); + // Blank entries are filtered, so 2 valid topics produce 2 contexts. + Assert.assertEquals(2, result.size()); + for (DefaultAuthorizationContext ctx : result) { + Assert.assertEquals(ResourceType.TOPIC, ctx.getResource().getResourceType()); + Assert.assertEquals("User:rocketmq", ctx.getSubject().getSubjectKey()); + Assert.assertTrue(ctx.getActions().contains(Action.DELETE)); + Assert.assertEquals(String.valueOf(RequestCode.DELETE_TOPIC_IN_BROKER_LIST), ctx.getRpcCode()); + } + Assert.assertTrue(result.stream().anyMatch(ctx -> "Topic:topicA".equals(ctx.getResource().getResourceKey()))); + Assert.assertTrue(result.stream().anyMatch(ctx -> "Topic:topicB".equals(ctx.getResource().getResourceKey()))); + + // DELETE_SUBSCRIPTIONGROUP_LIST: body-driven, must yield one DELETE context per group. + DeleteSubscriptionGroupListRequestBody deleteGroupListBody = new DeleteSubscriptionGroupListRequestBody(); + deleteGroupListBody.setGroupNameList(Arrays.asList("groupX", "groupY")); + + request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST, null); + request.setBody(JSON.toJSONBytes(deleteGroupListBody)); + request.setVersion(441); + request.addExtField("AccessKey", "rocketmq"); + request.makeCustomHeaderToNet(); + + result = builder.build(channelHandlerContext, request); + Assert.assertEquals(2, result.size()); + for (DefaultAuthorizationContext ctx : result) { + Assert.assertEquals(ResourceType.GROUP, ctx.getResource().getResourceType()); + Assert.assertEquals("User:rocketmq", ctx.getSubject().getSubjectKey()); + Assert.assertTrue(ctx.getActions().contains(Action.DELETE)); + Assert.assertEquals(String.valueOf(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST), ctx.getRpcCode()); + } + Assert.assertTrue(result.stream().anyMatch(ctx -> "Group:groupX".equals(ctx.getResource().getResourceKey()))); + Assert.assertTrue(result.stream().anyMatch(ctx -> "Group:groupY".equals(ctx.getResource().getResourceKey()))); + } private DefaultAuthorizationContext getContext(List contexts, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c88d4e5ad2f..5e26815e793 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -28,9 +28,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -128,6 +130,8 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -270,6 +274,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.updateAndCreateTopicList(ctx, request); case RequestCode.DELETE_TOPIC_IN_BROKER: return this.deleteTopic(ctx, request); + case RequestCode.DELETE_TOPIC_IN_BROKER_LIST: + return this.deleteTopicList(ctx, request); case RequestCode.GET_ALL_TOPIC_CONFIG: return this.getAllTopicConfig(ctx, request); case RequestCode.GET_TIMER_CHECK_POINT: @@ -310,6 +316,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.getAllSubscriptionGroup(ctx, request); case RequestCode.DELETE_SUBSCRIPTIONGROUP: return this.deleteSubscriptionGroup(ctx, request); + case RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST: + return this.deleteSubscriptionGroupList(ctx, request); case RequestCode.GET_TOPIC_STATS_INFO: return this.getTopicStatsInfo(ctx, request); case RequestCode.GET_CONSUMER_CONNECTION_LIST: @@ -787,17 +795,7 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, topicsToClean.add(topic); if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) { - final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); - for (String group : groups) { - final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true); - if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) { - topicsToClean.add(popRetryTopicV2); - } - final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); - if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { - topicsToClean.add(popRetryTopicV1); - } - } + collectPopRetryTopics(topic, topicsToClean); } try { @@ -816,6 +814,85 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, return response; } + private synchronized RemotingCommand deleteTopicList(ChannelHandlerContext ctx, + RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + DeleteTopicListRequestBody requestBody = DeleteTopicListRequestBody.decode( + request.getBody(), DeleteTopicListRequestBody.class); + List topicList = requestBody == null ? null : requestBody.getTopicList(); + + if (CollectionUtils.isEmpty(topicList)) { + response.setCode(ResponseCode.INVALID_PARAMETER); + response.setRemark("The specified topic list is blank."); + return response; + } + + LOGGER.info("AdminBrokerProcessor#deleteTopicList: broker receive request to delete topics={}, caller={}", + topicList, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + boolean validateSystemTopic = brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic(); + // dedup while preserving the input order + Set topicsToClean = new LinkedHashSet<>(); + for (String topic : topicList) { + if (UtilAll.isBlank(topic)) { + response.setCode(ResponseCode.INVALID_PARAMETER); + response.setRemark("The specified topic is blank."); + return response; + } + if (validateSystemTopic && TopicValidator.isSystemTopic(topic)) { + response.setCode(ResponseCode.INVALID_PARAMETER); + response.setRemark("The topic[" + topic + "] is conflict with system topic."); + return response; + } + topicsToClean.add(topic); + } + + if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) { + // snapshot the inputs before mutating the set, so retry topics for already-added retry topics are not collected + for (String topic : new ArrayList<>(topicsToClean)) { + collectPopRetryTopics(topic, topicsToClean); + } + } + + try { + for (String topic : topicsToClean) { + if (LiteMetadataUtil.isLiteMessageType(topic, brokerController)) { + brokerController.getLiteLifecycleManager().cleanByParentTopic(topic); + } + } + for (String topic : topicsToClean) { + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); + this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); + this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic); + } + this.brokerController.getMessageStore().deleteTopics(new HashSet<>(topicsToClean)); + // delete topic configs last so that a retry after partial failure can still + // rediscover derived POP retry topics via collectPopRetryTopics → selectTopicConfig + this.brokerController.getTopicConfigManager().deleteTopicConfigList(new ArrayList<>(topicsToClean)); + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private void collectPopRetryTopics(String topic, Collection topicsToClean) { + final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); + for (String group : groups) { + final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) { + topicsToClean.add(popRetryTopicV2); + } + final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) { + topicsToClean.add(popRetryTopicV1); + } + } + } + private void deleteTopicInBroker(String topic) { this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); this.brokerController.getTopicQueueMappingManager().delete(topic); @@ -1714,6 +1791,58 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, return response; } + private RemotingCommand deleteSubscriptionGroupList(ChannelHandlerContext ctx, + RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + DeleteSubscriptionGroupListRequestBody requestBody = DeleteSubscriptionGroupListRequestBody.decode( + request.getBody(), DeleteSubscriptionGroupListRequestBody.class); + List groupNameList = requestBody == null ? null : requestBody.getGroupNameList(); + + if (CollectionUtils.isEmpty(groupNameList)) { + response.setCode(ResponseCode.INVALID_PARAMETER); + response.setRemark("The specified group name list is blank."); + return response; + } + + // dedup while preserving the input order + Set groupNames = new LinkedHashSet<>(); + for (String groupName : groupNameList) { + if (UtilAll.isBlank(groupName)) { + response.setCode(ResponseCode.INVALID_PARAMETER); + response.setRemark("The specified group name is blank."); + return response; + } + groupNames.add(groupName); + } + + LOGGER.info("AdminBrokerProcessor#deleteSubscriptionGroupList: groupNames={}, caller={}", + groupNames, RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + try { + // batch delete subscription group config in one persist + this.brokerController.getSubscriptionGroupManager() + .deleteSubscriptionGroupConfigList(new ArrayList<>(groupNames)); + + boolean cleanOffset = requestBody.isCleanOffset(); + boolean autoDeleteUnusedStats = this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats(); + for (String groupName : groupNames) { + if (cleanOffset || LiteMetadataUtil.isLiteGroupType(groupName, this.brokerController)) { + this.brokerController.getConsumerOffsetManager().removeOffset(groupName); + this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByGroupName(groupName); + } + if (autoDeleteUnusedStats) { + this.brokerController.getBrokerStatsManager().onGroupDeleted(groupName); + } + } + } catch (Throwable t) { + return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); + } + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 5850309e8cd..e6f633ca90b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -396,6 +396,15 @@ public void deleteSubscriptionGroupConfig(final String groupName) { } } + public void deleteSubscriptionGroupConfigList(final List groupNames) { + if (groupNames == null || groupNames.isEmpty()) { + return; + } + for (String groupName : groupNames) { + deleteSubscriptionGroupConfig(groupName); + } + } + public void setSubscriptionGroupTable(ConcurrentMap subscriptionGroupTable) { this.subscriptionGroupTable = subscriptionGroupTable; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index b481242b121..1436d4cde90 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -626,6 +626,15 @@ public void deleteTopicConfig(final String topic) { } } + public void deleteTopicConfigList(final List topics) { + if (topics == null || topics.isEmpty()) { + return; + } + for (String topic : topics) { + deleteTopicConfig(topic); + } + } + public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index c342400d141..d5fe99c720f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -76,6 +76,8 @@ import org.apache.rocketmq.remoting.protocol.body.AclInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; @@ -475,6 +477,64 @@ public void testDeleteWithPopRetryTopic() throws Exception { verify(messageStore, times(2)).deleteTopics(anySet()); } + @Test + public void testDeleteTopicListInBroker() throws Exception { + // empty list should fail with INVALID_PARAMETER + RemotingCommand emptyRequest = buildDeleteTopicListRequest(new ArrayList<>()); + RemotingCommand emptyResponse = adminBrokerProcessor.processRequest(handlerContext, emptyRequest); + assertThat(emptyResponse.getCode()).isEqualTo(ResponseCode.INVALID_PARAMETER); + + // system topic in batch should be rejected + for (String sysTopic : systemTopicSet) { + RemotingCommand request = buildDeleteTopicListRequest(Arrays.asList("TEST_DELETE_TOPIC_BATCH_OK", sysTopic)); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.INVALID_PARAMETER); + assertThat(response.getRemark()).isEqualTo("The topic[" + sysTopic + "] is conflict with system topic."); + } + + // happy path + List topicList = Arrays.asList("TEST_DELETE_TOPIC_BATCH_1", "TEST_DELETE_TOPIC_BATCH_2"); + RemotingCommand request = buildDeleteTopicListRequest(topicList); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testDeleteTopicListBatchPersist() throws Exception { + topicConfigManager = mock(TopicConfigManager.class); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(null); + + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(consumerOffsetManager.whichGroupByTopic(anyString())).thenReturn(new HashSet<>()); + + List topicList = Arrays.asList("TEST_DELETE_TOPIC_BATCH_A", "TEST_DELETE_TOPIC_BATCH_B", "TEST_DELETE_TOPIC_BATCH_A"); + RemotingCommand request = buildDeleteTopicListRequest(topicList); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + // expect deleteTopicConfigList to be called once with deduplicated list (single persist call) + verify(topicConfigManager, times(1)).deleteTopicConfigList(any(List.class)); + // singular deleteTopicConfig should not be invoked for batch path + verify(topicConfigManager, times(0)).deleteTopicConfig(anyString()); + // physical message store delete should be invoked once for the batch + verify(messageStore, times(1)).deleteTopics(anySet()); + } + + @Test + public void testDeleteSubscriptionGroupList() throws Exception { + // empty list should fail with INVALID_PARAMETER + RemotingCommand emptyRequest = buildDeleteSubscriptionGroupListRequest(new ArrayList<>(), false); + RemotingCommand emptyResponse = adminBrokerProcessor.processRequest(handlerContext, emptyRequest); + assertThat(emptyResponse.getCode()).isEqualTo(ResponseCode.INVALID_PARAMETER); + + // happy path + List groupList = Arrays.asList("GID-Group-Name-1", "GID-Group-Name-2"); + RemotingCommand request = buildDeleteSubscriptionGroupListRequest(groupList, true); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + @Test public void testGetAllTopicConfigInRocksdb() throws Exception { if (notToBeExecuted()) { @@ -1691,6 +1751,20 @@ private RemotingCommand buildDeleteTopicRequest(String topic) { return request; } + private RemotingCommand buildDeleteTopicListRequest(List topicList) { + DeleteTopicListRequestBody requestBody = new DeleteTopicListRequestBody(topicList); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER_LIST, null); + request.setBody(requestBody.encode()); + return request; + } + + private RemotingCommand buildDeleteSubscriptionGroupListRequest(List groupNameList, boolean cleanOffset) { + DeleteSubscriptionGroupListRequestBody requestBody = new DeleteSubscriptionGroupListRequestBody(groupNameList, cleanOffset); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST, null); + request.setBody(requestBody.encode()); + return request; + } + private MessageExt createDefaultMessageExt() { MessageExt messageExt = new MessageExt(); messageExt.setMsgId("12345678"); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 8294ffd422f..759c11cca8b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -125,6 +125,8 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetBrokerLiteInfoResponseBody; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; @@ -2197,6 +2199,27 @@ public void deleteTopicInBroker(final String addr, final String topic, final lon throw new MQClientException(response.getCode(), response.getRemark()); } + public void deleteTopicInBrokerList(final String addr, final List topicList, final long timeoutMillis) + throws RemotingException, InterruptedException, MQClientException { + DeleteTopicListRequestBody requestBody = new DeleteTopicListRequestBody(topicList); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER_LIST, null); + request.setBody(requestBody.encode()); + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + if (response == null) { + throw new MQClientException(ResponseCode.SYSTEM_ERROR, "response is null when deleting topic list in broker"); + } + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException { DeleteTopicFromNamesrvRequestHeader requestHeader = new DeleteTopicFromNamesrvRequestHeader(); @@ -2257,6 +2280,29 @@ public void deleteSubscriptionGroup(final String addr, final String groupName, f throw new MQClientException(response.getCode(), response.getRemark()); } + public void deleteSubscriptionGroupList(final String addr, final List groupNameList, final boolean cleanOffset, + final long timeoutMillis) + throws RemotingException, InterruptedException, MQClientException { + DeleteSubscriptionGroupListRequestBody requestBody = new DeleteSubscriptionGroupListRequestBody(groupNameList, cleanOffset); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST, null); + request.setBody(requestBody.encode()); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + if (response == null) { + throw new MQClientException(ResponseCode.SYSTEM_ERROR, "response is null when deleting subscription group list"); + } + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index b32dbbc87ea..7d2b2ec526a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -311,4 +311,7 @@ public class RequestCode { public static final int AUTH_LIST_ACL = 3010; public static final int SWITCH_TIMER_ENGINE = 5001; + + public static final int DELETE_TOPIC_IN_BROKER_LIST = 5002; + public static final int DELETE_SUBSCRIPTIONGROUP_LIST = 5003; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteSubscriptionGroupListRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteSubscriptionGroupListRequestBody.java new file mode 100644 index 00000000000..7c78fb69af0 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteSubscriptionGroupListRequestBody.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import java.util.List; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class DeleteSubscriptionGroupListRequestBody extends RemotingSerializable { + @CFNotNull + private List groupNameList; + + private boolean cleanOffset = false; + + public DeleteSubscriptionGroupListRequestBody() { + } + + public DeleteSubscriptionGroupListRequestBody(List groupNameList) { + this.groupNameList = groupNameList; + } + + public DeleteSubscriptionGroupListRequestBody(List groupNameList, boolean cleanOffset) { + this.groupNameList = groupNameList; + this.cleanOffset = cleanOffset; + } + + public List getGroupNameList() { + return groupNameList; + } + + public void setGroupNameList(List groupNameList) { + this.groupNameList = groupNameList; + } + + public boolean isCleanOffset() { + return cleanOffset; + } + + public void setCleanOffset(boolean cleanOffset) { + this.cleanOffset = cleanOffset; + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteTopicListRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteTopicListRequestBody.java new file mode 100644 index 00000000000..8677e3b338d --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DeleteTopicListRequestBody.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.body; + +import java.util.List; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class DeleteTopicListRequestBody extends RemotingSerializable { + @CFNotNull + private List topicList; + + public DeleteTopicListRequestBody() { + } + + public DeleteTopicListRequestBody(List topicList) { + this.topicList = topicList; + } + + public List getTopicList() { + return topicList; + } + + public void setTopicList(List topicList) { + this.topicList = topicList; + } +}