diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index e606052efd..0ac000359d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1272,6 +1272,104 @@ void testDeleteProducerOffsetsAuthorization() throws Exception { guestAdmin.deleteProducerOffsets(producerId).get(); } + @Test + void testDatabaseExistsAuthorization() throws Exception { + String testDbName = "test_db_exists_auth"; + + // 1. Create database as root (super user can do everything) + rootAdmin.createDatabase(testDbName, DatabaseDescriptor.EMPTY, false).get(); + + // 2. Guest user WITHOUT DESCRIBE permission should get false (hiding existence) + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest without DESCRIBE permission should not see database exists") + .isFalse(); + + // 3. Non-existent database should also return false + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should return false") + .isFalse(); + + // 4. Root user (super user) should see database exists + assertThat(rootAdmin.databaseExists(testDbName).get()) + .as("Root user should always see database exists") + .isTrue(); + + // 5. Grant DESCRIBE permission to guest on the database + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.database(testDbName), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 6. Now guest WITH DESCRIBE permission should see database exists + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest with DESCRIBE permission should see database exists") + .isTrue(); + + // 7. Verify non-existent database still returns false (not an authorization issue) + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should still return false even with permissions") + .isFalse(); + + // Cleanup + rootAdmin.dropDatabase(testDbName, true, true).get(); + } + + @Test + void testTableExistsAuthorization() throws Exception { + TablePath testTablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_table_exists_auth"); + + // 1. Create table as root + rootAdmin.createTable(testTablePath, DATA1_TABLE_DESCRIPTOR_PK, false).get(); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady( + rootAdmin.getTableInfo(testTablePath).get().getTableId()); + + // 2. Guest WITHOUT permission should get false + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest without permission should not see table exists") + .isFalse(); + + // 3. Root user should see table exists + assertThat(rootAdmin.tableExists(testTablePath).get()) + .as("Root user should see table exists") + .isTrue(); + + // 4. Grant DESCRIBE permission on the table + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.table(testTablePath), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 5. Guest WITH permission should now see table exists + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest with DESCRIBE permission should see table exists") + .isTrue(); + + // 6. Verify non-existent table returns false + TablePath nonExistentTable = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "non_existent_table"); + assertThat(guestAdmin.tableExists(nonExistentTable).get()) + .as("Non-existent table should return false") + .isFalse(); + + // Cleanup + rootAdmin.dropTable(testTablePath, true).get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java index 23cd22aa73..eef9e2adea 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java @@ -39,6 +39,9 @@ public class TablePath implements Serializable { private static final long serialVersionUID = 1L; + // Default database name used when none is specified. + public static final String DEFAULT_DATABASE_NAME = "fluss"; + // database name and table name are used as local folder names in Fluss. The name of such // folders consists of "{database_name}/{table_name}-{table_id}/log-{bucket_id}/xxx.log". Since // a typical folder name can not be over 255 characters long, there will be a limitation on the diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 20b0b71bef..dedef6ea8b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static org.apache.fluss.metadata.TablePath.DEFAULT_DATABASE_NAME; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec; import static org.apache.fluss.security.acl.Resource.TABLE_SPLITTER; @@ -245,10 +246,27 @@ public CompletableFuture getDatabaseInfo( @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { - // By design: database exists not need to check database authorization. + String databaseName = request.getDatabaseName(); DatabaseExistsResponse response = new DatabaseExistsResponse(); - boolean exists = metadataManager.databaseExists(request.getDatabaseName()); - response.setExists(exists); + + // Check authorization first for efficiency - avoids unnecessary metadata lookup + // We skip authorization for the default database for backward compatibilities, as + // FlinkCatalog checks existence for the default database when open(). + if (!DEFAULT_DATABASE_NAME.equals(databaseName) + && authorizer != null + && !authorizer.isAuthorized( + currentSession(), + OperationType.DESCRIBE, + Resource.database(databaseName))) { + LOG.debug( + "User {} not authorized to access database '{}', returning false", + currentSession().getPrincipal(), + databaseName); + response.setExists(false); + return CompletableFuture.completedFuture(response); + } + + response.setExists(metadataManager.databaseExists(databaseName)); return CompletableFuture.completedFuture(response); } @@ -308,10 +326,22 @@ public CompletableFuture getTableSchema(GetTableSchemaRe @Override public CompletableFuture tableExists(TableExistsRequest request) { - // By design: table exists not need to check table authorization. + TablePath tablePath = toTablePath(request.getTablePath()); TableExistsResponse response = new TableExistsResponse(); - boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath())); - response.setExists(exists); + + // Check authorization first for efficiency - avoids unnecessary metadata lookup + if (authorizer != null + && !authorizer.isAuthorized( + currentSession(), OperationType.DESCRIBE, Resource.table(tablePath))) { + LOG.debug( + "User {} not authorized to access table '{}', returning false", + currentSession().getPrincipal(), + tablePath); + response.setExists(false); + return CompletableFuture.completedFuture(response); + } + + response.setExists(metadataManager.tableExists(tablePath)); return CompletableFuture.completedFuture(response); }