From 9901ece865c93e40adb02a51d20d4bffec8b2af1 Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Tue, 14 Apr 2026 23:45:38 +0530 Subject: [PATCH 1/3] [server] Add authorization to databaseExists and tableExists RPC calls (#2007) Currently, the databaseExists and tableExists RPC methods do not enforce authorization, allowing any authenticated user to check if databases or tables exist, which poses a security risk. This commit adds authorization checks to both methods: - databaseExists: Checks DESCRIBE permission on the database - tableExists: Checks DESCRIBE permission on the table For security, both methods return false for unauthorized access, preventing information disclosure. Unauthorized users cannot distinguish between "resource doesn't exist" and "no permission", which prevents reconnaissance attacks to map database structure. Changes: - Modified RpcServiceBase.databaseExists() to catch authorization exceptions - Modified RpcServiceBase.tableExists() to catch authorization exceptions - Added testDatabaseExistsAuthorization() integration test - Added testTableExistsAuthorization() integration test Co-Authored-By: Claude Sonnet 4.5 --- .../acl/FlussAuthorizationITCase.java | 98 +++++++++++++++++++ .../apache/fluss/server/RpcServiceBase.java | 40 +++++++- 2 files changed, 134 insertions(+), 4 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index e606052efd..0ac000359d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1272,6 +1272,104 @@ void testDeleteProducerOffsetsAuthorization() throws Exception { guestAdmin.deleteProducerOffsets(producerId).get(); } + @Test + void testDatabaseExistsAuthorization() throws Exception { + String testDbName = "test_db_exists_auth"; + + // 1. Create database as root (super user can do everything) + rootAdmin.createDatabase(testDbName, DatabaseDescriptor.EMPTY, false).get(); + + // 2. Guest user WITHOUT DESCRIBE permission should get false (hiding existence) + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest without DESCRIBE permission should not see database exists") + .isFalse(); + + // 3. Non-existent database should also return false + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should return false") + .isFalse(); + + // 4. Root user (super user) should see database exists + assertThat(rootAdmin.databaseExists(testDbName).get()) + .as("Root user should always see database exists") + .isTrue(); + + // 5. Grant DESCRIBE permission to guest on the database + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.database(testDbName), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 6. Now guest WITH DESCRIBE permission should see database exists + assertThat(guestAdmin.databaseExists(testDbName).get()) + .as("Guest with DESCRIBE permission should see database exists") + .isTrue(); + + // 7. Verify non-existent database still returns false (not an authorization issue) + assertThat(guestAdmin.databaseExists("non_existent_db").get()) + .as("Non-existent database should still return false even with permissions") + .isFalse(); + + // Cleanup + rootAdmin.dropDatabase(testDbName, true, true).get(); + } + + @Test + void testTableExistsAuthorization() throws Exception { + TablePath testTablePath = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_table_exists_auth"); + + // 1. Create table as root + rootAdmin.createTable(testTablePath, DATA1_TABLE_DESCRIPTOR_PK, false).get(); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady( + rootAdmin.getTableInfo(testTablePath).get().getTableId()); + + // 2. Guest WITHOUT permission should get false + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest without permission should not see table exists") + .isFalse(); + + // 3. Root user should see table exists + assertThat(rootAdmin.tableExists(testTablePath).get()) + .as("Root user should see table exists") + .isTrue(); + + // 4. Grant DESCRIBE permission on the table + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.table(testTablePath), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + // 5. Guest WITH permission should now see table exists + assertThat(guestAdmin.tableExists(testTablePath).get()) + .as("Guest with DESCRIBE permission should see table exists") + .isTrue(); + + // 6. Verify non-existent table returns false + TablePath nonExistentTable = + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "non_existent_table"); + assertThat(guestAdmin.tableExists(nonExistentTable).get()) + .as("Non-existent table should return false") + .isFalse(); + + // Cleanup + rootAdmin.dropTable(testTablePath, true).get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 20b0b71bef..92f6112e62 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -245,9 +245,25 @@ public CompletableFuture getDatabaseInfo( @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { - // By design: database exists not need to check database authorization. + String databaseName = request.getDatabaseName(); DatabaseExistsResponse response = new DatabaseExistsResponse(); - boolean exists = metadataManager.databaseExists(request.getDatabaseName()); + boolean exists = metadataManager.databaseExists(databaseName); + + // For security, return false for both non-existent and unauthorized databases + if (exists) { + try { + authorizeDatabase(OperationType.DESCRIBE, databaseName); + // If we get here, user is authorized + } catch (Exception e) { + // Not authorized - hide database existence + LOG.debug( + "User {} not authorized to access database '{}', returning false", + currentSession().getPrincipal(), + databaseName); + exists = false; + } + } + response.setExists(exists); return CompletableFuture.completedFuture(response); } @@ -308,9 +324,25 @@ public CompletableFuture getTableSchema(GetTableSchemaRe @Override public CompletableFuture tableExists(TableExistsRequest request) { - // By design: table exists not need to check table authorization. + TablePath tablePath = toTablePath(request.getTablePath()); TableExistsResponse response = new TableExistsResponse(); - boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath())); + boolean exists = metadataManager.tableExists(tablePath); + + // For security, return false for both non-existent and unauthorized tables + if (exists) { + try { + authorizeTable(OperationType.DESCRIBE, tablePath); + // If we get here, user is authorized + } catch (Exception e) { + // Not authorized - hide table existence + LOG.debug( + "User {} not authorized to access table '{}', returning false", + currentSession().getPrincipal(), + tablePath); + exists = false; + } + } + response.setExists(exists); return CompletableFuture.completedFuture(response); } From 43b39c2f479e60ee6bcfdc43bcf085d91ab36be4 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 5 May 2026 14:52:49 +0800 Subject: [PATCH 2/3] address comments --- .../apache/fluss/server/RpcServiceBase.java | 54 +++++++++---------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 92f6112e62..e03747df1b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -247,24 +247,22 @@ public CompletableFuture getDatabaseInfo( public CompletableFuture databaseExists(DatabaseExistsRequest request) { String databaseName = request.getDatabaseName(); DatabaseExistsResponse response = new DatabaseExistsResponse(); - boolean exists = metadataManager.databaseExists(databaseName); - // For security, return false for both non-existent and unauthorized databases - if (exists) { - try { - authorizeDatabase(OperationType.DESCRIBE, databaseName); - // If we get here, user is authorized - } catch (Exception e) { - // Not authorized - hide database existence - LOG.debug( - "User {} not authorized to access database '{}', returning false", - currentSession().getPrincipal(), - databaseName); - exists = false; - } + // Check authorization first for efficiency - avoids unnecessary metadata lookup + if (authorizer != null + && !authorizer.isAuthorized( + currentSession(), + OperationType.DESCRIBE, + Resource.database(databaseName))) { + LOG.debug( + "User {} not authorized to access database '{}', returning false", + currentSession().getPrincipal(), + databaseName); + response.setExists(false); + return CompletableFuture.completedFuture(response); } - response.setExists(exists); + response.setExists(metadataManager.databaseExists(databaseName)); return CompletableFuture.completedFuture(response); } @@ -326,24 +324,20 @@ public CompletableFuture getTableSchema(GetTableSchemaRe public CompletableFuture tableExists(TableExistsRequest request) { TablePath tablePath = toTablePath(request.getTablePath()); TableExistsResponse response = new TableExistsResponse(); - boolean exists = metadataManager.tableExists(tablePath); - // For security, return false for both non-existent and unauthorized tables - if (exists) { - try { - authorizeTable(OperationType.DESCRIBE, tablePath); - // If we get here, user is authorized - } catch (Exception e) { - // Not authorized - hide table existence - LOG.debug( - "User {} not authorized to access table '{}', returning false", - currentSession().getPrincipal(), - tablePath); - exists = false; - } + // Check authorization first for efficiency - avoids unnecessary metadata lookup + if (authorizer != null + && !authorizer.isAuthorized( + currentSession(), OperationType.DESCRIBE, Resource.table(tablePath))) { + LOG.debug( + "User {} not authorized to access table '{}', returning false", + currentSession().getPrincipal(), + tablePath); + response.setExists(false); + return CompletableFuture.completedFuture(response); } - response.setExists(exists); + response.setExists(metadataManager.tableExists(tablePath)); return CompletableFuture.completedFuture(response); } From 224289e732568b8bde066bfe733199136ad877b8 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 5 May 2026 18:03:03 +0800 Subject: [PATCH 3/3] fix tests --- .../src/main/java/org/apache/fluss/metadata/TablePath.java | 3 +++ .../main/java/org/apache/fluss/server/RpcServiceBase.java | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java index 23cd22aa73..eef9e2adea 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java @@ -39,6 +39,9 @@ public class TablePath implements Serializable { private static final long serialVersionUID = 1L; + // Default database name used when none is specified. + public static final String DEFAULT_DATABASE_NAME = "fluss"; + // database name and table name are used as local folder names in Fluss. The name of such // folders consists of "{database_name}/{table_name}-{table_id}/log-{bucket_id}/xxx.log". Since // a typical folder name can not be over 255 characters long, there will be a limitation on the diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index e03747df1b..dedef6ea8b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static org.apache.fluss.metadata.TablePath.DEFAULT_DATABASE_NAME; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec; import static org.apache.fluss.security.acl.Resource.TABLE_SPLITTER; @@ -249,7 +250,10 @@ public CompletableFuture databaseExists(DatabaseExistsRe DatabaseExistsResponse response = new DatabaseExistsResponse(); // Check authorization first for efficiency - avoids unnecessary metadata lookup - if (authorizer != null + // We skip authorization for the default database for backward compatibilities, as + // FlinkCatalog checks existence for the default database when open(). + if (!DEFAULT_DATABASE_NAME.equals(databaseName) + && authorizer != null && !authorizer.isAuthorized( currentSession(), OperationType.DESCRIBE,