Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.RequestHeaderRegistry;
import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
Expand Down Expand Up @@ -322,6 +324,39 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
}
}
break;
case RequestCode.DELETE_TOPIC_IN_BROKER_LIST:
// Batch APIs carry their target list in the request body, not in an annotated
// CommandCustomHeader, so the annotation-based path in
// RequestHeaderRegistry would otherwise produce an empty context list and let
// the request through without a DELETE permission check. Decode the body and
// emit one DELETE context per topic instead.
DeleteTopicListRequestBody deleteTopicListRequestBody =
DeleteTopicListRequestBody.decode(command.getBody(), DeleteTopicListRequestBody.class);
if (CollectionUtils.isNotEmpty(deleteTopicListRequestBody.getTopicList())) {
for (String topicName : deleteTopicListRequestBody.getTopicList()) {
if (StringUtils.isBlank(topicName)) {
continue;
}
topic = Resource.ofTopic(topicName);
result.add(DefaultAuthorizationContext.of(subject, topic, Action.DELETE, sourceIp));
}
}
break;
case RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST:
// See DELETE_TOPIC_IN_BROKER_LIST: emit one DELETE context per group from the
// request body so authorization can enforce per-group DELETE permission.
DeleteSubscriptionGroupListRequestBody deleteGroupListRequestBody =
DeleteSubscriptionGroupListRequestBody.decode(command.getBody(), DeleteSubscriptionGroupListRequestBody.class);
if (CollectionUtils.isNotEmpty(deleteGroupListRequestBody.getGroupNameList())) {
for (String groupName : deleteGroupListRequestBody.getGroupNameList()) {
if (StringUtils.isBlank(groupName)) {
continue;
}
group = Resource.ofGroup(groupName);
result.add(DefaultAuthorizationContext.of(subject, group, Action.DELETE, sourceIp));
}
}
break;
default:
result = buildContextByAnnotation(subject, command, sourceIp);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.RequestHeaderRegistry;
import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
Expand Down Expand Up @@ -570,6 +572,49 @@ public void buildRemoting() {
Assert.assertEquals("channel-id", getContext(result, ResourceType.TOPIC).getChannelId());
Assert.assertEquals(String.valueOf(RequestCode.UNLOCK_BATCH_MQ), getContext(result, ResourceType.TOPIC).getRpcCode());

// DELETE_TOPIC_IN_BROKER_LIST: body-driven, must yield one DELETE context per topic.
DeleteTopicListRequestBody deleteTopicListBody = new DeleteTopicListRequestBody();
deleteTopicListBody.setTopicList(Arrays.asList("topicA", "topicB", "", " "));

request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER_LIST, null);
request.setBody(JSON.toJSONBytes(deleteTopicListBody));
request.setVersion(441);
request.addExtField("AccessKey", "rocketmq");
request.makeCustomHeaderToNet();

result = builder.build(channelHandlerContext, request);
// Blank entries are filtered, so 2 valid topics produce 2 contexts.
Assert.assertEquals(2, result.size());
for (DefaultAuthorizationContext ctx : result) {
Assert.assertEquals(ResourceType.TOPIC, ctx.getResource().getResourceType());
Assert.assertEquals("User:rocketmq", ctx.getSubject().getSubjectKey());
Assert.assertTrue(ctx.getActions().contains(Action.DELETE));
Assert.assertEquals(String.valueOf(RequestCode.DELETE_TOPIC_IN_BROKER_LIST), ctx.getRpcCode());
}
Assert.assertTrue(result.stream().anyMatch(ctx -> "Topic:topicA".equals(ctx.getResource().getResourceKey())));
Assert.assertTrue(result.stream().anyMatch(ctx -> "Topic:topicB".equals(ctx.getResource().getResourceKey())));

// DELETE_SUBSCRIPTIONGROUP_LIST: body-driven, must yield one DELETE context per group.
DeleteSubscriptionGroupListRequestBody deleteGroupListBody = new DeleteSubscriptionGroupListRequestBody();
deleteGroupListBody.setGroupNameList(Arrays.asList("groupX", "groupY"));

request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST, null);
request.setBody(JSON.toJSONBytes(deleteGroupListBody));
request.setVersion(441);
request.addExtField("AccessKey", "rocketmq");
request.makeCustomHeaderToNet();

result = builder.build(channelHandlerContext, request);
Assert.assertEquals(2, result.size());
for (DefaultAuthorizationContext ctx : result) {
Assert.assertEquals(ResourceType.GROUP, ctx.getResource().getResourceType());
Assert.assertEquals("User:rocketmq", ctx.getSubject().getSubjectKey());
Assert.assertTrue(ctx.getActions().contains(Action.DELETE));
Assert.assertEquals(String.valueOf(RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST), ctx.getRpcCode());
}
Assert.assertTrue(result.stream().anyMatch(ctx -> "Group:groupX".equals(ctx.getResource().getResourceKey())));
Assert.assertTrue(result.stream().anyMatch(ctx -> "Group:groupY".equals(ctx.getResource().getResourceKey())));

}

private DefaultAuthorizationContext getContext(List<DefaultAuthorizationContext> contexts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -128,6 +130,8 @@
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.DeleteSubscriptionGroupListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.DeleteTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
Expand Down Expand Up @@ -270,6 +274,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.updateAndCreateTopicList(ctx, request);
case RequestCode.DELETE_TOPIC_IN_BROKER:
return this.deleteTopic(ctx, request);
case RequestCode.DELETE_TOPIC_IN_BROKER_LIST:
return this.deleteTopicList(ctx, request);
case RequestCode.GET_ALL_TOPIC_CONFIG:
return this.getAllTopicConfig(ctx, request);
case RequestCode.GET_TIMER_CHECK_POINT:
Expand Down Expand Up @@ -310,6 +316,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.getAllSubscriptionGroup(ctx, request);
case RequestCode.DELETE_SUBSCRIPTIONGROUP:
return this.deleteSubscriptionGroup(ctx, request);
case RequestCode.DELETE_SUBSCRIPTIONGROUP_LIST:
return this.deleteSubscriptionGroupList(ctx, request);
case RequestCode.GET_TOPIC_STATS_INFO:
return this.getTopicStatsInfo(ctx, request);
case RequestCode.GET_CONSUMER_CONNECTION_LIST:
Expand Down Expand Up @@ -787,17 +795,7 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
topicsToClean.add(topic);

if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
for (String group : groups) {
final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) {
topicsToClean.add(popRetryTopicV2);
}
final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
topicsToClean.add(popRetryTopicV1);
}
}
collectPopRetryTopics(topic, topicsToClean);
}

try {
Expand All @@ -816,6 +814,85 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
return response;
}

private synchronized RemotingCommand deleteTopicList(ChannelHandlerContext ctx,
RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

DeleteTopicListRequestBody requestBody = DeleteTopicListRequestBody.decode(
request.getBody(), DeleteTopicListRequestBody.class);
List<String> topicList = requestBody == null ? null : requestBody.getTopicList();

if (CollectionUtils.isEmpty(topicList)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("The specified topic list is blank.");
return response;
}

LOGGER.info("AdminBrokerProcessor#deleteTopicList: broker receive request to delete topics={}, caller={}",
topicList, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

boolean validateSystemTopic = brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic();
// dedup while preserving the input order
Set<String> topicsToClean = new LinkedHashSet<>();
for (String topic : topicList) {
if (UtilAll.isBlank(topic)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("The specified topic is blank.");
return response;
}
if (validateSystemTopic && TopicValidator.isSystemTopic(topic)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("The topic[" + topic + "] is conflict with system topic.");
return response;
}
topicsToClean.add(topic);
}

if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
// snapshot the inputs before mutating the set, so retry topics for already-added retry topics are not collected
for (String topic : new ArrayList<>(topicsToClean)) {
collectPopRetryTopics(topic, topicsToClean);
}
}

try {
for (String topic : topicsToClean) {
if (LiteMetadataUtil.isLiteMessageType(topic, brokerController)) {
brokerController.getLiteLifecycleManager().cleanByParentTopic(topic);
}
}
for (String topic : topicsToClean) {
this.brokerController.getTopicQueueMappingManager().delete(topic);
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
}
this.brokerController.getMessageStore().deleteTopics(new HashSet<>(topicsToClean));
// delete topic configs last so that a retry after partial failure can still
// rediscover derived POP retry topics via collectPopRetryTopics → selectTopicConfig
this.brokerController.getTopicConfigManager().deleteTopicConfigList(new ArrayList<>(topicsToClean));
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

private void collectPopRetryTopics(String topic, Collection<String> topicsToClean) {
final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
for (String group : groups) {
final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) {
topicsToClean.add(popRetryTopicV2);
}
final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
topicsToClean.add(popRetryTopicV1);
}
}
}

private void deleteTopicInBroker(String topic) {
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getTopicQueueMappingManager().delete(topic);
Expand Down Expand Up @@ -1714,6 +1791,58 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
return response;
}

private RemotingCommand deleteSubscriptionGroupList(ChannelHandlerContext ctx,
RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

DeleteSubscriptionGroupListRequestBody requestBody = DeleteSubscriptionGroupListRequestBody.decode(
request.getBody(), DeleteSubscriptionGroupListRequestBody.class);
List<String> groupNameList = requestBody == null ? null : requestBody.getGroupNameList();

if (CollectionUtils.isEmpty(groupNameList)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("The specified group name list is blank.");
return response;
}

// dedup while preserving the input order
Set<String> groupNames = new LinkedHashSet<>();
for (String groupName : groupNameList) {
if (UtilAll.isBlank(groupName)) {
response.setCode(ResponseCode.INVALID_PARAMETER);
response.setRemark("The specified group name is blank.");
return response;
}
groupNames.add(groupName);
}

LOGGER.info("AdminBrokerProcessor#deleteSubscriptionGroupList: groupNames={}, caller={}",
groupNames, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

try {
// batch delete subscription group config in one persist
this.brokerController.getSubscriptionGroupManager()
.deleteSubscriptionGroupConfigList(new ArrayList<>(groupNames));

boolean cleanOffset = requestBody.isCleanOffset();
boolean autoDeleteUnusedStats = this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats();
for (String groupName : groupNames) {
if (cleanOffset || LiteMetadataUtil.isLiteGroupType(groupName, this.brokerController)) {
this.brokerController.getConsumerOffsetManager().removeOffset(groupName);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByGroupName(groupName);
}
if (autoDeleteUnusedStats) {
this.brokerController.getBrokerStatsManager().onGroupDeleted(groupName);
}
}
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,15 @@ public void deleteSubscriptionGroupConfig(final String groupName) {
}
}

public void deleteSubscriptionGroupConfigList(final List<String> groupNames) {
if (groupNames == null || groupNames.isEmpty()) {
return;
}
for (String groupName : groupNames) {
deleteSubscriptionGroupConfig(groupName);
}
}


public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
this.subscriptionGroupTable = subscriptionGroupTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,15 @@ public void deleteTopicConfig(final String topic) {
}
}

public void deleteTopicConfigList(final List<String> topics) {
if (topics == null || topics.isEmpty()) {
return;
}
for (String topic : topics) {
deleteTopicConfig(topic);
}
}

public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
Expand Down
Loading
Loading