diff --git a/auth/src/test/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerTest.java b/auth/src/test/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerTest.java index 72504f1cb34..fa5876b1871 100644 --- a/auth/src/test/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerTest.java +++ b/auth/src/test/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.auth.authorization.manager; import java.util.List; +import java.util.Arrays; import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory; import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager; @@ -157,19 +158,34 @@ public void deleteAcl() { "192.168.0.0/24,10.10.0.0/24", Decision.ALLOW); this.authorizationMetadataManager.createAcl(acl1).join(); - this.authorizationMetadataManager.deleteAcl(Subject.of("User:test"), PolicyType.CUSTOM, Resource.ofTopic("abc")).join(); + Acl defaultAcl = Acl.of(Subject.of("User:test"), Arrays.asList( + AuthTestHelper.buildPolicy(PolicyType.CUSTOM, "Topic:test,Group:test", "PUB,SUB", + "192.168.0.0/24,10.10.0.0/24", Decision.ALLOW), + AuthTestHelper.buildPolicy(PolicyType.DEFAULT, "Topic:defaultTopic", "PUB,SUB", + "192.168.0.0/24,10.10.0.0/24", Decision.ALLOW) + )); + this.authorizationMetadataManager.createAcl(AuthTestHelper.buildAcl("User:test", PolicyType.DEFAULT, + "Topic:defaultTopic", "PUB,SUB", "192.168.0.0/24,10.10.0.0/24", Decision.ALLOW)).join(); + Acl aclWithDefault = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); + Assert.assertTrue(AuthTestHelper.isEquals(defaultAcl, aclWithDefault)); + + this.authorizationMetadataManager.deleteAcl(Subject.of("User:test"), PolicyType.DEFAULT, Resource.ofTopic("defaultTopic")).join(); Acl acl2 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); Assert.assertTrue(AuthTestHelper.isEquals(acl1, acl2)); + this.authorizationMetadataManager.deleteAcl(Subject.of("User:test"), PolicyType.CUSTOM, Resource.ofTopic("abc")).join(); + Acl acl3 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); + Assert.assertTrue(AuthTestHelper.isEquals(acl1, acl3)); + this.authorizationMetadataManager.deleteAcl(Subject.of("User:test"), PolicyType.CUSTOM, Resource.ofTopic("test")).join(); - Acl acl3 = AuthTestHelper.buildAcl("User:test", "Group:test", "PUB,SUB", + Acl acl4 = AuthTestHelper.buildAcl("User:test", "Group:test", "PUB,SUB", "192.168.0.0/24,10.10.0.0/24", Decision.ALLOW); - Acl acl4 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); - Assert.assertTrue(AuthTestHelper.isEquals(acl3, acl4)); + Acl acl5 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); + Assert.assertTrue(AuthTestHelper.isEquals(acl4, acl5)); this.authorizationMetadataManager.deleteAcl(Subject.of("User:test")); - Acl acl5 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); - Assert.assertNull(acl5); + Acl acl6 = this.authorizationMetadataManager.getAcl(Subject.of("User:test")).join(); + Assert.assertNull(acl6); Assert.assertThrows(AuthorizationException.class, () -> { try { @@ -287,4 +303,4 @@ private void clearAllAcls() { } acls.forEach(acl -> this.authorizationMetadataManager.deleteAcl(acl.getSubject(), null, null).join()); } -} \ No newline at end of file +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index c342400d141..6c0a82f40a3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.auth.authentication.model.Subject; import org.apache.rocketmq.auth.authentication.model.User; import org.apache.rocketmq.auth.authorization.enums.Decision; +import org.apache.rocketmq.auth.authorization.enums.PolicyType; import org.apache.rocketmq.auth.authorization.manager.AuthorizationMetadataManager; import org.apache.rocketmq.auth.authorization.model.Acl; import org.apache.rocketmq.auth.authorization.model.Environment; @@ -141,6 +142,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @@ -1186,6 +1188,31 @@ public void testDeleteAcl() throws RemotingCommandException { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testDeleteAclWithPolicyType() throws RemotingCommandException { + when(authorizationMetadataManager.deleteAcl(any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + DeleteAclRequestHeader deleteAclRequestHeader = new DeleteAclRequestHeader(); + deleteAclRequestHeader.setSubject("User:abc"); + deleteAclRequestHeader.setPolicyType("Default"); + deleteAclRequestHeader.setResource("Topic:test"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.AUTH_DELETE_ACL, deleteAclRequestHeader); + request.setVersion(441); + request.addExtField("AccessKey", "rocketmq"); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + ArgumentCaptor subjectCaptor = ArgumentCaptor.forClass(Subject.class); + ArgumentCaptor policyTypeCaptor = ArgumentCaptor.forClass(PolicyType.class); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + verify(authorizationMetadataManager).deleteAcl(subjectCaptor.capture(), policyTypeCaptor.capture(), resourceCaptor.capture()); + assertThat(subjectCaptor.getValue().getSubjectKey()).isEqualTo("User:abc"); + assertThat(policyTypeCaptor.getValue()).isEqualTo(PolicyType.DEFAULT); + assertThat(resourceCaptor.getValue()).isEqualTo(Resource.of("Topic:test")); + } + @Test public void testGetAcl() throws RemotingCommandException { Acl aclInfo = Acl.of(User.of("abc"), Arrays.asList(Resource.of("Topic:*")), Arrays.asList(Action.PUB), Environment.of("192.168.0.1"), Decision.ALLOW); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 8294ffd422f..d9e8156f0bc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -3656,7 +3656,11 @@ public void updateAcl(String addr, AclInfo aclInfo, long millis) throws Remoting } public void deleteAcl(String addr, String subject, String resource, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { - DeleteAclRequestHeader requestHeader = new DeleteAclRequestHeader(subject, resource); + this.deleteAcl(addr, subject, null, resource, millis); + } + + public void deleteAcl(String addr, String subject, String policyType, String resource, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + DeleteAclRequestHeader requestHeader = new DeleteAclRequestHeader(subject, policyType, resource); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.AUTH_DELETE_ACL, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis); assert response != null; diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 27b3d685715..7750565a285 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -1900,6 +1900,22 @@ public void testDeleteAcl() throws RemotingException, InterruptedException, MQBr mqClientAPI.deleteAcl(defaultBrokerAddr, "", "", defaultTimeout); } + @Test + public void testDeleteAclWithPolicyType() throws RemotingException, InterruptedException, MQBrokerException { + when(response.getCode()).thenReturn(ResponseCode.SUCCESS); + doAnswer(invocation -> { + RemotingCommand request = invocation.getArgument(1); + request.makeCustomHeaderToNet(); + assertThat(request.getExtFields()).isNotNull(); + assertThat(request.getExtFields().get("subject")).isEqualTo("User:abc"); + assertThat(request.getExtFields().get("policyType")).isEqualTo("Default"); + assertThat(request.getExtFields().get("resource")).isEqualTo("Topic:test"); + return response; + }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong()); + + mqClientAPI.deleteAcl(defaultBrokerAddr, "User:abc", "Default", "Topic:test", defaultTimeout); + } + @Test public void assertGetAcl() throws RemotingException, InterruptedException, MQBrokerException { mockInvokeSync(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteAclRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteAclRequestHeader.java index a1f06a2b8de..bddb01b39f0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteAclRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteAclRequestHeader.java @@ -36,7 +36,12 @@ public DeleteAclRequestHeader() { } public DeleteAclRequestHeader(String subject, String resource) { + this(subject, null, resource); + } + + public DeleteAclRequestHeader(String subject, String policyType, String resource) { this.subject = subject; + this.policyType = policyType; this.resource = resource; } diff --git a/tools/BUILD.bazel b/tools/BUILD.bazel index 023b04757c8..65d19c27fa2 100644 --- a/tools/BUILD.bazel +++ b/tools/BUILD.bazel @@ -21,6 +21,7 @@ java_library( srcs = glob(["src/main/java/**/*.java"]), visibility = ["//visibility:public"], deps = [ + "//auth", "//remoting", "//client", "//common", diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index d29ffad2540..49e0d26ed36 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -978,6 +978,12 @@ public void deleteAcl(String brokerAddr, String subject, defaultMQAdminExtImpl.deleteAcl(brokerAddr, subject, resource); } + @Override + public void deleteAcl(String brokerAddr, String subject, String policyType, String resource) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + defaultMQAdminExtImpl.deleteAcl(brokerAddr, subject, policyType, resource); + } + @Override public AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index d96b4b03bcc..ec45d9ea320 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -2039,7 +2039,13 @@ public void updateAcl(String brokerAddr, @Override public void deleteAcl(String brokerAddr, String subject, String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { - this.mqClientInstance.getMQClientAPIImpl().deleteAcl(brokerAddr, subject, resource, timeoutMillis); + this.deleteAcl(brokerAddr, subject, null, resource); + } + + @Override + public void deleteAcl(String brokerAddr, String subject, String policyType, String resource) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + this.mqClientInstance.getMQClientAPIImpl().deleteAcl(brokerAddr, subject, policyType, resource, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 980ff5acdb4..4f246eb91eb 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -514,6 +514,8 @@ String setCommitLogReadAheadMode(final String brokerAddr, String mode) void deleteAcl(String brokerAddr, String subject, String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; + void deleteAcl(String brokerAddr, String subject, String policyType, String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; + AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/auth/DeleteAclSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/auth/DeleteAclSubCommand.java index a3553e0cb39..c3792a32539 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/auth/DeleteAclSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/auth/DeleteAclSubCommand.java @@ -22,6 +22,7 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.auth.authorization.enums.PolicyType; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -62,6 +63,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("t", "policyType", true, "the policyType of acl to delete"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -82,11 +87,16 @@ public void execute(CommandLine commandLine, Options options, resource = StringUtils.trim(commandLine.getOptionValue("r")); } + String policyType = null; + if (commandLine.hasOption('t')) { + policyType = normalizePolicyType(StringUtils.trim(commandLine.getOptionValue("t"))); + } + if (commandLine.hasOption('b')) { String addr = StringUtils.trim(commandLine.getOptionValue('b')); defaultMQAdminExt.start(); - defaultMQAdminExt.deleteAcl(addr, subject, resource); + defaultMQAdminExt.deleteAcl(addr, subject, policyType, resource); System.out.printf("delete acl to %s success.%n", addr); return; @@ -97,7 +107,7 @@ public void execute(CommandLine commandLine, Options options, Set brokerAddrSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : brokerAddrSet) { - defaultMQAdminExt.deleteAcl(addr, subject, resource); + defaultMQAdminExt.deleteAcl(addr, subject, policyType, resource); System.out.printf("delete acl to %s success.%n", addr); } return; @@ -110,4 +120,15 @@ public void execute(CommandLine commandLine, Options options, defaultMQAdminExt.shutdown(); } } + + private String normalizePolicyType(String policyType) { + if (StringUtils.isBlank(policyType)) { + return null; + } + PolicyType parsedPolicyType = PolicyType.getByName(policyType); + if (parsedPolicyType == null) { + throw new IllegalArgumentException("Invalid policyType: " + policyType); + } + return parsedPolicyType.getName(); + } }