Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,104 @@ void testDeleteProducerOffsetsAuthorization() throws Exception {
guestAdmin.deleteProducerOffsets(producerId).get();
}

@Test
void testDatabaseExistsAuthorization() throws Exception {
String testDbName = "test_db_exists_auth";

// 1. Create database as root (super user can do everything)
rootAdmin.createDatabase(testDbName, DatabaseDescriptor.EMPTY, false).get();

// 2. Guest user WITHOUT DESCRIBE permission should get false (hiding existence)
assertThat(guestAdmin.databaseExists(testDbName).get())
.as("Guest without DESCRIBE permission should not see database exists")
.isFalse();

// 3. Non-existent database should also return false
assertThat(guestAdmin.databaseExists("non_existent_db").get())
.as("Non-existent database should return false")
.isFalse();

// 4. Root user (super user) should see database exists
assertThat(rootAdmin.databaseExists(testDbName).get())
.as("Root user should always see database exists")
.isTrue();

// 5. Grant DESCRIBE permission to guest on the database
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.database(testDbName),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

// 6. Now guest WITH DESCRIBE permission should see database exists
assertThat(guestAdmin.databaseExists(testDbName).get())
.as("Guest with DESCRIBE permission should see database exists")
.isTrue();

// 7. Verify non-existent database still returns false (not an authorization issue)
assertThat(guestAdmin.databaseExists("non_existent_db").get())
.as("Non-existent database should still return false even with permissions")
.isFalse();

// Cleanup
rootAdmin.dropDatabase(testDbName, true, true).get();
}

@Test
void testTableExistsAuthorization() throws Exception {
TablePath testTablePath =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_table_exists_auth");

// 1. Create table as root
rootAdmin.createTable(testTablePath, DATA1_TABLE_DESCRIPTOR_PK, false).get();
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(
rootAdmin.getTableInfo(testTablePath).get().getTableId());

// 2. Guest WITHOUT permission should get false
assertThat(guestAdmin.tableExists(testTablePath).get())
.as("Guest without permission should not see table exists")
.isFalse();

// 3. Root user should see table exists
assertThat(rootAdmin.tableExists(testTablePath).get())
.as("Root user should see table exists")
.isTrue();

// 4. Grant DESCRIBE permission on the table
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.table(testTablePath),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.DESCRIBE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

// 5. Guest WITH permission should now see table exists
assertThat(guestAdmin.tableExists(testTablePath).get())
.as("Guest with DESCRIBE permission should see table exists")
.isTrue();

// 6. Verify non-existent table returns false
TablePath nonExistentTable =
TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "non_existent_table");
assertThat(guestAdmin.tableExists(nonExistentTable).get())
.as("Non-existent table should return false")
.isFalse();

// Cleanup
rootAdmin.dropTable(testTablePath, true).get();
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
public class TablePath implements Serializable {
private static final long serialVersionUID = 1L;

// Default database name used when none is specified.
public static final String DEFAULT_DATABASE_NAME = "fluss";

// database name and table name are used as local folder names in Fluss. The name of such
// folders consists of "{database_name}/{table_name}-{table_id}/log-{bucket_id}/xxx.log". Since
// a typical folder name can not be over 255 characters long, there will be a limitation on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static org.apache.fluss.metadata.TablePath.DEFAULT_DATABASE_NAME;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
import static org.apache.fluss.security.acl.Resource.TABLE_SPLITTER;
Expand Down Expand Up @@ -245,10 +246,27 @@ public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(

@Override
public CompletableFuture<DatabaseExistsResponse> databaseExists(DatabaseExistsRequest request) {
// By design: database exists not need to check database authorization.
String databaseName = request.getDatabaseName();
DatabaseExistsResponse response = new DatabaseExistsResponse();
boolean exists = metadataManager.databaseExists(request.getDatabaseName());
response.setExists(exists);

// Check authorization first for efficiency - avoids unnecessary metadata lookup
// We skip authorization for the default database for backward compatibilities, as
// FlinkCatalog checks existence for the default database when open().
if (!DEFAULT_DATABASE_NAME.equals(databaseName)
&& authorizer != null
&& !authorizer.isAuthorized(
currentSession(),
OperationType.DESCRIBE,
Resource.database(databaseName))) {
LOG.debug(
"User {} not authorized to access database '{}', returning false",
currentSession().getPrincipal(),
databaseName);
response.setExists(false);
return CompletableFuture.completedFuture(response);
}

response.setExists(metadataManager.databaseExists(databaseName));
return CompletableFuture.completedFuture(response);
}

Expand Down Expand Up @@ -308,10 +326,22 @@ public CompletableFuture<GetTableSchemaResponse> getTableSchema(GetTableSchemaRe

@Override
public CompletableFuture<TableExistsResponse> tableExists(TableExistsRequest request) {
// By design: table exists not need to check table authorization.
TablePath tablePath = toTablePath(request.getTablePath());
TableExistsResponse response = new TableExistsResponse();
boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath()));
response.setExists(exists);

// Check authorization first for efficiency - avoids unnecessary metadata lookup
if (authorizer != null
&& !authorizer.isAuthorized(
currentSession(), OperationType.DESCRIBE, Resource.table(tablePath))) {
LOG.debug(
"User {} not authorized to access table '{}', returning false",
currentSession().getPrincipal(),
tablePath);
response.setExists(false);
return CompletableFuture.completedFuture(response);
}

response.setExists(metadataManager.tableExists(tablePath));
return CompletableFuture.completedFuture(response);
}

Expand Down