diff --git a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java index e5eaa41c3e..95f3256048 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java @@ -34,6 +34,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.GatewayClientProxy; @@ -100,10 +101,14 @@ public Admin getAdmin() { @Override public Table getTable(TablePath tablePath) { - // force to update the table info from server to avoid stale data in cache. - metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); Admin admin = getOrCreateAdmin(); - return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join()); + TableInfo tableInfo = admin.getTableInfo(tablePath).join(); + if (!tableInfo.isSystemView()) { + // System views have no buckets/partitions, skip TabletServer metadata update. + // Regular tables need fresh bucket leader info. + metadataUpdater.updateTableOrPartitionMetadata(tablePath, null); + } + return new FlussTable(this, tablePath, tableInfo); } public MetadataUpdater getMetadataUpdater() { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index a1d429c99a..9bf6dc3e23 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -41,6 +41,7 @@ import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.SystemTableConstants; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; @@ -315,7 +316,10 @@ public CompletableFuture getTableInfo(TablePath tablePath) { // clusters do not include the remote data dir r.hasRemoteDataDir() ? r.getRemoteDataDir() : null, r.getCreatedTime(), - r.getModifiedTime())); + r.getModifiedTime(), + r.hasTableKind() + ? r.getTableKind() + : SystemTableConstants.TABLE_KIND_TABLE)); } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java index 8532f2a856..eafc6f14ca 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java @@ -23,6 +23,7 @@ import org.apache.fluss.client.lookup.TableLookup; import org.apache.fluss.client.metadata.ClientSchemaGetter; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.SystemViewScan; import org.apache.fluss.client.table.scanner.TableScan; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.TableAppend; @@ -64,17 +65,22 @@ public TableInfo getTableInfo() { @Override public Scan newScan() { + if (tableInfo.isSystemView()) { + return new SystemViewScan(tableInfo, conn); + } return new TableScan(conn, tableInfo, schemaGetter); } @Override public Lookup newLookup() { + checkState(!tableInfo.isSystemView(), "System view %s doesn't support lookups.", tablePath); return new TableLookup( tableInfo, schemaGetter, conn.getMetadataUpdater(), conn.getOrCreateLookupClient()); } @Override public Append newAppend() { + checkState(!tableInfo.isSystemView(), "System view %s doesn't support writes.", tablePath); checkState( !hasPrimaryKey, "Table %s is not a Log Table and doesn't support AppendWriter.", @@ -84,6 +90,7 @@ public Append newAppend() { @Override public Upsert newUpsert() { + checkState(!tableInfo.isSystemView(), "System view %s doesn't support writes.", tablePath); checkState( hasPrimaryKey, "Table %s is not a Primary Key Table and doesn't support UpsertWriter.", diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SystemViewScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SystemViewScan.java new file mode 100644 index 0000000000..2df9676a29 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SystemViewScan.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.scanner.batch.SystemViewBatchScanner; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.TypedLogScanner; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * A {@link Scan} implementation for system views. + * + *

System views are virtual tables with no independent storage, no buckets, and no partitions. + * They only support bounded batch scans via {@link #createBatchScanner()}. Streaming scans and + * bucket-based batch scans are not supported. + */ +public class SystemViewScan implements Scan { + + private final TableInfo tableInfo; + private final FlussConnection conn; + @Nullable private final int[] projectedColumns; + @Nullable private final Predicate filterPredicate; + + public SystemViewScan(TableInfo tableInfo, FlussConnection conn) { + this(tableInfo, conn, null, null); + } + + private SystemViewScan( + TableInfo tableInfo, + FlussConnection conn, + @Nullable int[] projectedColumns, + @Nullable Predicate filterPredicate) { + this.tableInfo = tableInfo; + this.conn = conn; + this.projectedColumns = projectedColumns; + this.filterPredicate = filterPredicate; + } + + @Override + public Scan project(@Nullable int[] projectedColumns) { + return new SystemViewScan(tableInfo, conn, projectedColumns, filterPredicate); + } + + @Override + public Scan project(List projectedColumnNames) { + int[] columnIndexes = new int[projectedColumnNames.size()]; + RowType rowType = tableInfo.getRowType(); + for (int i = 0; i < projectedColumnNames.size(); i++) { + int index = rowType.getFieldIndex(projectedColumnNames.get(i)); + if (index < 0) { + throw new IllegalArgumentException( + String.format( + "Field '%s' not found in system view schema. " + + "Available fields: %s, View: %s", + projectedColumnNames.get(i), + rowType.getFieldNames(), + tableInfo.getTablePath())); + } + columnIndexes[i] = index; + } + return new SystemViewScan(tableInfo, conn, columnIndexes, filterPredicate); + } + + @Override + public Scan limit(int rowNumber) { + throw new UnsupportedOperationException( + "System views do not support limit pushdown. View: " + tableInfo.getTablePath()); + } + + @Override + public Scan filter(@Nullable Predicate predicate) { + return new SystemViewScan(tableInfo, conn, projectedColumns, predicate); + } + + @Override + public LogScanner createLogScanner() { + throw new UnsupportedOperationException( + "System views do not support streaming log scan. " + + "Use createBatchScanner() instead. View: " + + tableInfo.getTablePath()); + } + + @Override + public TypedLogScanner createTypedLogScanner(Class pojoClass) { + throw new UnsupportedOperationException( + "System views do not support streaming log scan. " + + "Use createBatchScanner() instead. View: " + + tableInfo.getTablePath()); + } + + @Override + public BatchScanner createBatchScanner(TableBucket tableBucket) { + throw new UnsupportedOperationException( + "System views do not support bucket-based batch scan. " + + "Use createBatchScanner() (no args) instead. View: " + + tableInfo.getTablePath()); + } + + @Override + public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) { + throw new UnsupportedOperationException( + "System views do not support snapshot batch scan. " + + "Use createBatchScanner() (no args) instead. View: " + + tableInfo.getTablePath()); + } + + @Override + public BatchScanner createBatchScanner() { + return new SystemViewBatchScanner( + tableInfo, conn.getMetadataUpdater(), projectedColumns, filterPredicate); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScanner.java new file mode 100644 index 0000000000..8b054fec34 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScanner.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordBatch; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.RowDecoder; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; +import org.apache.fluss.rpc.util.PredicateMessageUtils; +import org.apache.fluss.types.DataField; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A {@link BatchScanner} that reads data from a system view by sending a {@code + * ScanSystemViewRequest} to the coordinator server. + * + *

System views have no buckets or partitions — the coordinator returns all matching rows in a + * single response. This scanner is always bounded (one-shot request/response). + */ +public class SystemViewBatchScanner implements BatchScanner { + + private final CompletableFuture scanFuture; + private final RowType responseRowType; + private boolean endOfInput; + + public SystemViewBatchScanner( + TableInfo tableInfo, + MetadataUpdater metadataUpdater, + @Nullable int[] projectedFields, + @Nullable Predicate filterPredicate) { + this.endOfInput = false; + + TablePath tablePath = tableInfo.getTablePath(); + RowType rowType = tableInfo.getRowType(); + + // Build RowType with sequential field IDs for predicate serialization + List fieldsWithIds = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField f = rowType.getFields().get(i); + fieldsWithIds.add(new DataField(f.getName(), f.getType(), i)); + } + RowType fullRowType = new RowType(rowType.isNullable(), fieldsWithIds); + + // Determine the row type of the response (projected if applicable) + this.responseRowType = + projectedFields != null ? fullRowType.project(projectedFields) : fullRowType; + + ScanSystemViewRequest request = new ScanSystemViewRequest(); + request.setDatabaseName(tablePath.getDatabaseName()); + request.setViewName(tablePath.getTableName()); + request.setSchemaId(String.valueOf(tableInfo.getSchemaId())); + + if (projectedFields != null) { + request.setProjectedFields(projectedFields); + } + + if (filterPredicate != null) { + request.setFilterPredicate( + PredicateMessageUtils.toPbPredicate(filterPredicate, fullRowType)); + } + + CoordinatorGateway coordinatorGateway = metadataUpdater.newCoordinatorServerClient(); + this.scanFuture = coordinatorGateway.scanSystemView(request); + } + + @Nullable + @Override + public CloseableIterator pollBatch(Duration timeout) throws IOException { + if (endOfInput) { + return null; + } + try { + ScanSystemViewResponse response = + scanFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (response.hasErrorCode()) { + throw new FlussRuntimeException( + "Failed to scan system view: " + response.getErrorMessage()); + } + List rows = decodeResponse(response); + endOfInput = true; + return CloseableIterator.wrap(rows.iterator()); + } catch (TimeoutException e) { + return CloseableIterator.emptyIterator(); + } catch (FlussRuntimeException e) { + throw new IOException(e); + } catch (Exception e) { + throw new IOException(e); + } + } + + private List decodeResponse(ScanSystemViewResponse response) { + if (!response.hasRecords()) { + return new ArrayList<>(); + } + + byte[] recordBytes = response.getRecords(); + DataType[] fieldTypes = responseRowType.getFieldTypes().toArray(new DataType[0]); + RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, fieldTypes); + ValueRecordBatch.ReadContext readContext = schemaId -> decoder; + + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(recordBytes); + + List rows = new ArrayList<>(); + for (ValueRecord record : batch.records(readContext)) { + InternalRow row = record.getRow(); + if (row != null) { + rows.add(row); + } + } + return rows; + } + + @Override + public void close() throws IOException { + scanFuture.cancel(true); + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 4b1dc28ac7..79e5e35a81 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -977,7 +977,7 @@ void testListDatabasesAndTables() throws Exception { admin.createDatabase("db2", DatabaseDescriptor.EMPTY, true).get(); admin.createDatabase("db3", DatabaseDescriptor.EMPTY, true).get(); assertThat(admin.listDatabases().get()) - .containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss"); + .containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss", "sys"); Map databaseSummaries = admin.listDatabaseSummaries().get().stream() .collect( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index e606052efd..8aaea1afce 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -91,6 +91,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; @@ -348,7 +349,8 @@ void testListDatabases() throws ExecutionException, InterruptedException { .containsExactlyInAnyOrderElementsOf(Collections.emptyList()); assertThat(rootAdmin.listDatabases().get()) .containsExactlyInAnyOrderElementsOf( - Lists.newArrayList("fluss", DATA1_TABLE_PATH_PK.getDatabaseName())); + Lists.newArrayList( + "fluss", DATA1_TABLE_PATH_PK.getDatabaseName(), SYSTEM_DATABASE)); List aclBindings = Collections.singletonList( @@ -361,7 +363,7 @@ void testListDatabases() throws ExecutionException, InterruptedException { PermissionType.ALLOW))); rootAdmin.createAcls(aclBindings).all().get(); FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); - assertThat(guestAdmin.listDatabases().get()).isEqualTo(Collections.singletonList("fluss")); + assertThat(guestAdmin.listDatabases().get()).containsExactlyInAnyOrder("fluss"); aclBindings = Collections.singletonList( @@ -376,7 +378,8 @@ void testListDatabases() throws ExecutionException, InterruptedException { FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); assertThat(guestAdmin.listDatabases().get()) .containsExactlyInAnyOrderElementsOf( - Lists.newArrayList("fluss", DATA1_TABLE_PATH_PK.getDatabaseName())); + Lists.newArrayList( + "fluss", DATA1_TABLE_PATH_PK.getDatabaseName(), SYSTEM_DATABASE)); } @Test diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScannerITCase.java new file mode 100644 index 0000000000..2e1a79476f --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/SystemViewBatchScannerITCase.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.metadata.SystemTableConstants; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.DataField; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for reading system views via the client scanner API. */ +class SystemViewBatchScannerITCase { + + private static final int NUM_TABLET_SERVERS = 3; + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER = + FlussClusterExtension.builder().setNumOfTabletServers(NUM_TABLET_SERVERS).build(); + + @Test + void testScanSystemViewReturnsServers() throws Exception { + TablePath serversPath = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, + SystemTableConstants.TABLET_SERVERS_VIEW); + + try (Connection connection = + ConnectionFactory.createConnection(FLUSS_CLUSTER.getClientConfig()); + Table table = connection.getTable(serversPath); + BatchScanner scanner = table.newScan().createBatchScanner()) { + TableInfo tableInfo = table.getTableInfo(); + assertThat(tableInfo.isSystemView()).isTrue(); + + RowType rowType = tableInfo.getRowType(); + List fields = rowType.getFields(); + // Schema should have at least one column + assertThat(fields).isNotEmpty(); + + List rows = BatchScanUtils.collectRows(scanner); + assertThat(rows).hasSize(NUM_TABLET_SERVERS); + } + } + + @Test + void testScanSystemViewWithProjection() throws Exception { + TablePath serversPath = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, + SystemTableConstants.TABLET_SERVERS_VIEW); + + try (Connection connection = + ConnectionFactory.createConnection(FLUSS_CLUSTER.getClientConfig()); + Table table = connection.getTable(serversPath)) { + RowType rowType = table.getTableInfo().getRowType(); + int fieldCount = rowType.getFieldCount(); + assertThat(fieldCount).isGreaterThanOrEqualTo(2); + + // Project the first and last columns + int[] projection = new int[] {0, fieldCount - 1}; + + try (BatchScanner scanner = table.newScan().project(projection).createBatchScanner()) { + List rows = BatchScanUtils.collectRows(scanner); + assertThat(rows).hasSize(NUM_TABLET_SERVERS); + + // Verify each row has exactly 2 fields (projected) + // and all rows are distinct (different server data) + for (InternalRow row : rows) { + assertThat(row.isNullAt(0)).isFalse(); + } + + // Verify rows are not all identical (projection bug would cause this) + assertThat(rows.stream().map(r -> r.toString()).distinct().count()) + .as("Projected rows should not all be identical") + .isEqualTo(NUM_TABLET_SERVERS); + } + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/SystemTableConstants.java b/fluss-common/src/main/java/org/apache/fluss/metadata/SystemTableConstants.java new file mode 100644 index 0000000000..8e8d0ebc72 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/SystemTableConstants.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metadata; + +import org.apache.fluss.annotation.Internal; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Constants and utility methods for system tables and system views. + * + *

System tables are managed by the system, reflecting cluster internal state and information. + * They all reside under the fixed {@code sys} database. Currently only system views are supported: + * + *

+ * + *

Table names use plain identifiers without any prefix. + */ +@Internal +public final class SystemTableConstants { + + /** The name of the system database that contains all system tables and views. */ + public static final String SYSTEM_DATABASE = "sys"; + + /** Name of the tablet servers system view. */ + public static final String TABLET_SERVERS_VIEW = "tablet_servers"; + + /** Name of the table buckets system view. */ + public static final String TABLE_BUCKETS_VIEW = "table_buckets"; + + /** All known system view names. */ + private static final Set SYSTEM_VIEW_NAMES = + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(TABLET_SERVERS_VIEW, TABLE_BUCKETS_VIEW))); + + // ---- table_kind values used in GetTableInfoResponse ---- + + /** Table kind constant for regular user tables. */ + public static final int TABLE_KIND_TABLE = 0; + + /** Table kind constant for system views (virtual, no independent storage). */ + public static final int TABLE_KIND_SYSTEM_VIEW = 1; + + private SystemTableConstants() {} + + /** Returns {@code true} if the given database name is the system database. */ + public static boolean isSystemDatabase(String databaseName) { + return SYSTEM_DATABASE.equals(databaseName); + } + + /** + * Returns {@code true} if the given table is a known system table or view in the system + * database. + * + *

This checks both that the database is {@code sys} and that the table name is a recognized + * system view. + */ + public static boolean isSystemTableName(String databaseName, String tableName) { + if (databaseName == null || tableName == null) { + return false; + } + return isSystemView(databaseName, tableName); + } + + /** + * Returns {@code true} if the given table path is a known system table or view in the system + * database. + */ + public static boolean isSystemTablePath(TablePath tablePath) { + if (tablePath == null) { + return false; + } + return isSystemTableName(tablePath.getDatabaseName(), tablePath.getTableName()); + } + + /** + * Returns {@code true} if the given table in the given database is a known system view. System + * views are virtual tables that do not have independent storage. + */ + public static boolean isSystemView(String databaseName, String tableName) { + if (databaseName == null || tableName == null) { + return false; + } + return isSystemDatabase(databaseName) && SYSTEM_VIEW_NAMES.contains(tableName); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index 00f58b81f0..a900ff8f22 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -68,6 +68,7 @@ public final class TableInfo { private final long createdTime; private final long modifiedTime; + private final int tableKind; private int[] cachedStatsIndexMapping = null; @@ -85,6 +86,38 @@ public TableInfo( @Nullable String comment, long createdTime, long modifiedTime) { + this( + tablePath, + tableId, + schemaId, + schema, + bucketKeys, + partitionKeys, + numBuckets, + properties, + customProperties, + remoteDataDir, + comment, + createdTime, + modifiedTime, + SystemTableConstants.TABLE_KIND_TABLE); + } + + public TableInfo( + TablePath tablePath, + long tableId, + int schemaId, + Schema schema, + List bucketKeys, + List partitionKeys, + int numBuckets, + Configuration properties, + Configuration customProperties, + @Nullable String remoteDataDir, + @Nullable String comment, + long createdTime, + long modifiedTime, + int tableKind) { this.tablePath = tablePath; this.tableId = tableId; this.schemaId = schemaId; @@ -102,6 +135,7 @@ public TableInfo( this.comment = comment; this.createdTime = createdTime; this.modifiedTime = modifiedTime; + this.tableKind = tableKind; } /** @@ -392,6 +426,19 @@ public long getModifiedTime() { return modifiedTime; } + /** + * Returns the table kind. See {@link SystemTableConstants#TABLE_KIND_TABLE} and {@link + * SystemTableConstants#TABLE_KIND_SYSTEM_VIEW}. + */ + public int getTableKind() { + return tableKind; + } + + /** Returns {@code true} if this table is a system view (virtual, no independent storage). */ + public boolean isSystemView() { + return tableKind == SystemTableConstants.TABLE_KIND_SYSTEM_VIEW; + } + /** * Converts this table info to a {@link TableDescriptor}. * @@ -420,15 +467,40 @@ public static TableInfo of( String remoteDataDir, long createdTime, long modifiedTime) { + return of( + tablePath, + tableId, + schemaId, + tableDescriptor, + remoteDataDir, + createdTime, + modifiedTime, + SystemTableConstants.TABLE_KIND_TABLE); + } + + /** Utility to create a {@link TableInfo} with an explicit table kind. */ + public static TableInfo of( + TablePath tablePath, + long tableId, + int schemaId, + TableDescriptor tableDescriptor, + String remoteDataDir, + long createdTime, + long modifiedTime, + int tableKind) { Schema schema = tableDescriptor.getSchema(); - int numBuckets = - tableDescriptor - .getTableDistribution() - .flatMap(TableDescriptor.TableDistribution::getBucketCount) - .orElseThrow( - () -> - new IllegalArgumentException( - "Bucket count is required for creating table info.")); + int numBuckets = 0; + if (tableKind != SystemTableConstants.TABLE_KIND_SYSTEM_VIEW) { + numBuckets = + tableDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount) + .orElseThrow( + () -> + new IllegalArgumentException( + "Bucket count is required for creating table info.")); + } + return new TableInfo( tablePath, tableId, @@ -442,7 +514,8 @@ public static TableInfo of( remoteDataDir, tableDescriptor.getComment().orElse(null), createdTime, - modifiedTime); + modifiedTime, + tableKind); } @Override @@ -455,6 +528,7 @@ public boolean equals(Object o) { return tableId == that.tableId && schemaId == that.schemaId && numBuckets == that.numBuckets + && tableKind == that.tableKind && Objects.equals(tablePath, that.tablePath) && Objects.equals(rowType, that.rowType) && Objects.equals(primaryKeys, that.primaryKeys) @@ -483,7 +557,8 @@ public int hashCode() { properties, customProperties, remoteDataDir, - comment); + comment, + tableKind); } @Override @@ -518,6 +593,8 @@ public String toString() { + createdTime + ", modifiedTime=" + modifiedTime + + ", tableKind=" + + tableKind + '}'; } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java index 23cd22aa73..66cf98ff91 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TablePath.java @@ -85,6 +85,16 @@ public boolean isValid() { return detectInvalidName(databaseName) == null && detectInvalidName(tableName) == null; } + /** + * Returns {@code true} if this table path belongs to the system database. + * + *

System paths use the reserved {@code sys} database and are managed internally by Fluss. + * They bypass the normal name validation that rejects the {@code __} prefix. + */ + public boolean isSystemTablePath() { + return SystemTableConstants.isSystemTablePath(this); + } + /** * Validate the table path. A table path is valid if both the database and table names are * valid. diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/SystemTableConstantsTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/SystemTableConstantsTest.java new file mode 100644 index 0000000000..d2987ad5ee --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/SystemTableConstantsTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metadata; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SystemTableConstants}. */ +class SystemTableConstantsTest { + + @Test + void testSystemDatabaseName() { + assertThat(SystemTableConstants.SYSTEM_DATABASE).isEqualTo("sys"); + } + + @Test + void testIsSystemDatabase() { + assertThat(SystemTableConstants.isSystemDatabase("sys")).isTrue(); + assertThat(SystemTableConstants.isSystemDatabase("fluss")).isFalse(); + assertThat(SystemTableConstants.isSystemDatabase(null)).isFalse(); + } + + @Test + void testIsSystemTableName() { + // Known system view + assertThat(SystemTableConstants.isSystemTableName("sys", "tablet_servers")).isTrue(); + // Unknown table in sys database should return false + assertThat(SystemTableConstants.isSystemTableName("sys", "unknown_table")).isFalse(); + // User database + assertThat(SystemTableConstants.isSystemTableName("fluss", "my_table")).isFalse(); + // Null inputs + assertThat(SystemTableConstants.isSystemTableName(null, null)).isFalse(); + assertThat(SystemTableConstants.isSystemTableName("sys", null)).isFalse(); + assertThat(SystemTableConstants.isSystemTableName(null, "tablet_servers")).isFalse(); + } + + @Test + void testIsSystemTablePath() { + TablePath viewPath = TablePath.of("sys", "tablet_servers"); + TablePath userPath = TablePath.of("fluss", "my_table"); + TablePath unknownSysPath = TablePath.of("sys", "user_table"); + + assertThat(SystemTableConstants.isSystemTablePath(viewPath)).isTrue(); + assertThat(SystemTableConstants.isSystemTablePath(userPath)).isFalse(); + // Unknown table in sys database should return false + assertThat(SystemTableConstants.isSystemTablePath(unknownSysPath)).isFalse(); + assertThat(SystemTableConstants.isSystemTablePath(null)).isFalse(); + } + + @Test + void testTabletServersViewConstant() { + assertThat(SystemTableConstants.TABLET_SERVERS_VIEW).isEqualTo("tablet_servers"); + } + + @Test + void testIsSystemView() { + assertThat(SystemTableConstants.isSystemView("sys", "tablet_servers")).isTrue(); + assertThat(SystemTableConstants.isSystemView("sys", "user_table")).isFalse(); + assertThat(SystemTableConstants.isSystemView("fluss", "tablet_servers")).isFalse(); + assertThat(SystemTableConstants.isSystemView(null, null)).isFalse(); + } +} diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118SystemViewScanITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118SystemViewScanITCase.java new file mode 100644 index 0000000000..b08f3a5192 --- /dev/null +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118SystemViewScanITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for system view scan in Flink 1.18. */ +public class Flink118SystemViewScanITCase extends SystemViewScanITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119SystemViewScanITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119SystemViewScanITCase.java new file mode 100644 index 0000000000..7dc6423aee --- /dev/null +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119SystemViewScanITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for system view scan in Flink 1.19. */ +public class Flink119SystemViewScanITCase extends SystemViewScanITCase {} diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120SystemViewScanITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120SystemViewScanITCase.java new file mode 100644 index 0000000000..4f1876b6bc --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120SystemViewScanITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for system view scan in Flink 1.20. */ +public class Flink120SystemViewScanITCase extends SystemViewScanITCase {} diff --git a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22SystemViewScanITCase.java b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22SystemViewScanITCase.java new file mode 100644 index 0000000000..63f5375c63 --- /dev/null +++ b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22SystemViewScanITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for system view scan in Flink 2.2. */ +public class Flink22SystemViewScanITCase extends SystemViewScanITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index c7a3b44c28..bab6907f9b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -228,6 +228,13 @@ public class FlinkConnectorOptions { BUCKET_KEY.key(), BOOTSTRAP_SERVERS.key()); + // ------------------------------------------------------------------------------------------- + // Only used internally to identify system views + // ------------------------------------------------------------------------------------------- + + /** Internal option key to mark a table as a system view. */ + public static final String SYSTEM_VIEW_OPTION_KEY = "fluss.system-view"; + // ------------------------------------------------------------------------------------------- // Only used internally to support materialized table // ------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 3cb46aa78e..16e4ab6490 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -412,6 +412,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath) newOptions.put(key, lakePropertyEntry.getValue()); } } + // mark system views using tableKind from server response + if (tableInfo.isSystemView()) { + newOptions.put(FlinkConnectorOptions.SYSTEM_VIEW_OPTION_KEY, "true"); + } if (CatalogBaseTable.TableKind.TABLE == catalogBaseTable.getTableKind()) { CatalogTable table = ((CatalogTable) catalogBaseTable).copy(newOptions); if (supportIndex()) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 408a703058..b15457c6dd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -28,6 +28,7 @@ import org.apache.fluss.flink.source.BinlogFlinkTableSource; import org.apache.fluss.flink.source.ChangelogFlinkTableSource; import org.apache.fluss.flink.source.FlinkTableSource; +import org.apache.fluss.flink.source.SystemViewTableSource; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.metadata.MergeEngineType; @@ -104,6 +105,16 @@ public DynamicTableSource createDynamicTableSource(Context context) { return createBinlogTableSource(context, tableIdentifier, tableName); } + // Check if this is a system view + Map options = context.getCatalogTable().getOptions(); + if ("true".equals(options.get(FlinkConnectorOptions.SYSTEM_VIEW_OPTION_KEY))) { + RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + return new SystemViewTableSource( + toFlussTablePath(context.getObjectIdentifier()), + toFlussClientConfig(options, context.getConfiguration()), + tableOutputType); + } + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); validateSourceOptions(tableOptions); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/SystemViewTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/SystemViewTableSource.java new file mode 100644 index 0000000000..c2af86d045 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/SystemViewTableSource.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.batch.BatchScanUtils; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate; + +/** + * A Flink table source for system views. + * + *

System views are virtual tables that only support bounded batch scans. This source fetches all + * rows from the coordinator server via the client {@link BatchScanner} API and emits them as a + * bounded collection. + */ +public class SystemViewTableSource + implements ScanTableSource, SupportsProjectionPushDown, SupportsFilterPushDown { + + private final TablePath tablePath; + private final Configuration flussConfig; + private final org.apache.flink.table.types.logical.RowType tableOutputType; + + @Nullable private int[] projectedFields; + private LogicalType producedDataType; + @Nullable private Predicate filterPredicate; + + public SystemViewTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType tableOutputType) { + this.tablePath = tablePath; + this.flussConfig = flussConfig; + this.tableOutputType = tableOutputType; + this.producedDataType = tableOutputType; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + Collection results = scanSystemView(); + TypeInformation resultTypeInfo = + scanContext.createTypeInformation(producedDataType); + return new DataStreamScanProvider() { + @Override + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + return execEnv.fromCollection(results, resultTypeInfo); + } + + @Override + public boolean isBounded() { + return true; + } + }; + } + + private Collection scanSystemView() { + try (Connection connection = ConnectionFactory.createConnection(flussConfig); + Table table = connection.getTable(tablePath); + BatchScanner batchScanner = + table.newScan() + .project(projectedFields) + .filter(filterPredicate) + .createBatchScanner()) { + List scannedRows = BatchScanUtils.collectRows(batchScanner); + + RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); + RowType converterRowType = + projectedFields != null ? flussRowType.project(projectedFields) : flussRowType; + FlussRowToFlinkRowConverter converter = + new FlussRowToFlinkRowConverter(converterRowType); + + List results = new ArrayList<>(); + for (InternalRow row : scannedRows) { + results.add(converter.toFlinkRowData(row)); + } + return results; + } catch (Exception e) { + throw new FlussRuntimeException("Failed to scan system view '" + tablePath + "'.", e); + } + } + + @Override + public boolean supportsNestedProjection() { + return false; + } + + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray(); + this.producedDataType = producedDataType.getLogicalType(); + } + + @Override + public Result applyFilters(List filters) { + List converted = new ArrayList<>(); + List acceptedFilters = new ArrayList<>(); + for (ResolvedExpression filter : filters) { + Optional predicateOpt = convertToFlussPredicate(tableOutputType, filter); + if (predicateOpt.isPresent()) { + converted.add(predicateOpt.get()); + acceptedFilters.add(filter); + } + } + filterPredicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); + return Result.of(acceptedFilters, filters); + } + + @Override + public DynamicTableSource copy() { + SystemViewTableSource copy = + new SystemViewTableSource(tablePath, flussConfig, tableOutputType); + copy.projectedFields = projectedFields; + copy.producedDataType = producedDataType; + copy.filterPredicate = filterPredicate; + return copy; + } + + @Override + public String asSummaryString() { + return "FlussSystemViewTableSource"; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index eebe586112..f167c9af73 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -74,6 +74,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -741,10 +742,14 @@ void testCreateDatabase() { assertThat(databases.stream().map(Row::toString).collect(Collectors.toList())) .containsExactlyInAnyOrderElementsOf( - Arrays.asList(String.format("+I[%s]", DEFAULT_DB), "+I[test_db]")); + Arrays.asList( + String.format("+I[%s]", DEFAULT_DB), + String.format("+I[%s]", SYSTEM_DATABASE), + "+I[test_db]")); tEnv.executeSql("drop database test_db"); databases = CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect()); - assertThat(databases.toString()).isEqualTo(String.format("[+I[%s]]", DEFAULT_DB)); + assertThat(databases) + .containsExactlyInAnyOrder(Row.of(DEFAULT_DB), Row.of(SYSTEM_DATABASE)); } @Test @@ -908,7 +913,7 @@ void testAuthentication() throws Exception { Collections::emptyMap); authenticateCatalog.open(); assertThat(authenticateCatalog.listDatabases()) - .containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB)); + .containsExactlyInAnyOrder(DEFAULT_DB, SYSTEM_DATABASE); } finally { if (authenticateCatalog != null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 0c8d919180..90097e6d83 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -84,6 +84,7 @@ import static org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema; import static org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema; import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -611,7 +612,8 @@ void testDatabase() throws Exception { CatalogTable expectedTable = addOptions(table, addedOptions); checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable); assertThat(catalog.listTables("db1")).isEqualTo(Collections.singletonList("t1")); - assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db1", "db2", DEFAULT_DB)); + assertThat(catalog.listDatabases()) + .containsExactlyInAnyOrder("db1", "db2", DEFAULT_DB, SYSTEM_DATABASE); // test drop db1; // should throw exception since db1 is not empty and we set cascade = false assertThatThrownBy(() -> catalog.dropDatabase("db1", false, false)) @@ -627,10 +629,11 @@ void testDatabase() throws Exception { // should be ok since we set ignoreIfNotExists = true catalog.dropDatabase("db1", true, true); // test list db - assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db2", DEFAULT_DB)); + assertThat(catalog.listDatabases()) + .containsExactlyInAnyOrder("db2", DEFAULT_DB, SYSTEM_DATABASE); catalog.dropDatabase("db2", false, true); // should be empty - assertThat(catalog.listDatabases()).isEqualTo(Collections.singletonList(DEFAULT_DB)); + assertThat(catalog.listDatabases()).containsExactlyInAnyOrder(DEFAULT_DB, SYSTEM_DATABASE); // should throw exception since the db is not exist and we set ignoreIfNotExists = false assertThatThrownBy(() -> catalog.listTables("unknown")) .isInstanceOf(DatabaseNotExistException.class) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java index 2fd8899891..f3b92bde5f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java @@ -57,6 +57,7 @@ import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder; import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; import static org.apache.fluss.security.acl.OperationType.CREATE; import static org.apache.fluss.security.acl.OperationType.DESCRIBE; import static org.apache.fluss.security.acl.OperationType.DROP; @@ -159,7 +160,8 @@ void testCreateAndDropDatabase() throws Exception { addAcl(Resource.cluster(), CREATE); tEnv.executeSql(createDatabaseDDL).await(); assertThat(CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect())) - .containsExactlyInAnyOrder(Row.of(DEFAULT_DB), Row.of(databaseName)); + .containsExactlyInAnyOrder( + Row.of(DEFAULT_DB), Row.of(SYSTEM_DATABASE), Row.of(databaseName)); // test drop database String dropDatabaseDDL = "drop database " + databaseName; @@ -173,7 +175,7 @@ void testCreateAndDropDatabase() throws Exception { addAcl(Resource.database(databaseName), DROP); tEnv.executeSql(dropDatabaseDDL).await(); assertThat(CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect())) - .containsExactlyInAnyOrder(Row.of(DEFAULT_DB)); + .containsExactlyInAnyOrder(Row.of(DEFAULT_DB), Row.of(SYSTEM_DATABASE)); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/SystemViewScanITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/SystemViewScanITCase.java new file mode 100644 index 0000000000..4e84848362 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/SystemViewScanITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.SystemTableConstants; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for reading system views via Flink SQL. */ +abstract class SystemViewScanITCase extends FlinkTestBase { + + private static final String CATALOG_NAME = "testcatalog"; + private static final int NUM_TABLET_SERVERS = 3; + + private StreamTableEnvironment tEnv; + + @BeforeEach + void before() { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inBatchMode()); + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.executeSql("use catalog " + CATALOG_NAME); + } + + @Test + void testScanServersView() throws Exception { + tEnv.useDatabase(SystemTableConstants.SYSTEM_DATABASE); + CloseableIterator collected = + tEnv.executeSql("SELECT * FROM `tablet_servers`").collect(); + List results = collectRowsWithTimeout(collected, NUM_TABLET_SERVERS); + + assertThat(results).hasSize(NUM_TABLET_SERVERS); + // Each row should contain server_id, endpoints, rack, register_timestamp + for (String row : results) { + // Row format: +I[server_id, endpoints, rack, register_timestamp] + assertThat(row).startsWith("+I["); + } + } + + @Test + void testScanServersViewWithProjection() throws Exception { + tEnv.useDatabase(SystemTableConstants.SYSTEM_DATABASE); + CloseableIterator collected = + tEnv.executeSql("SELECT server_id, rack FROM `tablet_servers`").collect(); + List results = collectRowsWithTimeout(collected, NUM_TABLET_SERVERS); + + assertThat(results).hasSize(NUM_TABLET_SERVERS); + + List expectedResults = new ArrayList<>(); + expectedResults.add("+I[0, rack-0]"); + expectedResults.add("+I[1, rack-1]"); + expectedResults.add("+I[2, rack-2]"); + + assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults); + } + + @Test + void testScanServersViewWithFilter() throws Exception { + tEnv.useDatabase(SystemTableConstants.SYSTEM_DATABASE); + CloseableIterator collected = + tEnv.executeSql("SELECT * FROM `tablet_servers` WHERE server_id = 0").collect(); + List results = collectRowsWithTimeout(collected, 1); + + assertThat(results).hasSize(1); + // The returned row should have server_id = 0 + assertThat(results.get(0)).startsWith("+I[0, "); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java index 272d1b4a11..44d47d69b7 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java @@ -46,6 +46,8 @@ import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiKeys; @@ -192,4 +194,15 @@ CompletableFuture listPartitionInfos( @RPC(api = ApiKeys.DESCRIBE_CLUSTER_CONFIGS) CompletableFuture describeClusterConfigs( DescribeClusterConfigsRequest request); + + // ------ system views ------ + + /** + * Scan a system view and return its records. + * + * @param request the scan system view request specifying database and view name + * @return a future returns the scan system view response containing records + */ + @RPC(api = ApiKeys.SCAN_SYSTEM_VIEW) + CompletableFuture scanSystemView(ScanSystemViewRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 92a4680eef..fd4eeed8ec 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -103,7 +103,8 @@ public enum ApiKeys { DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC), GET_TABLE_STATS(1059, 0, 0, PUBLIC), ALTER_DATABASE(1060, 0, 0, PUBLIC), - SCAN_KV(1061, 0, 0, PUBLIC); + SCAN_KV(1061, 0, 0, PUBLIC), + SCAN_SYSTEM_VIEW(1062, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index b0f56cffbc..699d281b30 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -145,12 +145,13 @@ message GetTableInfoRequest { } message GetTableInfoResponse { - required int64 table_id = 1; - required int32 schema_id = 2; - required bytes table_json = 3; - required int64 created_time = 4; - required int64 modified_time = 5; - optional string remote_data_dir = 6; + required int64 table_id = 1; + required int32 schema_id = 2; + required bytes table_json = 3; + required int64 created_time = 4; + required int64 modified_time = 5; + optional string remote_data_dir = 6; + optional int32 table_kind = 7; // New: TABLE=1, SYSTEM_VIEW=2 } // list tables request and response @@ -299,7 +300,6 @@ message LimitScanResponse{ optional bytes records = 4; } - // Full KV scan request and response. // A new scan is initiated with bucket_scan_req; subsequent batches use scanner_id. message PbScanReqForBucket { @@ -336,7 +336,6 @@ message ScanKvResponse { optional int64 log_offset = 6; } - // Get table statistics request and response. // Sent to TabletServer to get per-bucket statistics. message GetTableStatsRequest { @@ -1366,4 +1365,20 @@ message PbLiteralValue { optional int64 timestamp_millis_value = 11; // Epoch millis optional int32 timestamp_nano_of_millis_value = 12; // Nano of millis optional bytes decimal_bytes = 13; // Serialized decimal (non-compact mode) +} + +// scan system view request and response +message ScanSystemViewRequest { + required string database_name = 1; + required string view_name = 2; + required string schema_id = 3; + // Column indices to project; if empty, all columns are returned. + repeated int32 projected_fields = 4 [packed = true]; + optional PbPredicate filter_predicate = 5; +} + +message ScanSystemViewResponse { + optional int32 error_code = 1; + optional string error_message = 2; + optional bytes records = 3; } \ No newline at end of file diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 3517680932..006d009c69 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -75,6 +75,8 @@ import org.apache.fluss.rpc.messages.PutKvResponse; import org.apache.fluss.rpc.messages.ScanKvRequest; import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -260,6 +262,11 @@ public CompletableFuture describeClusterConfigs( return null; } + @Override + public CompletableFuture scanSystemView(ScanSystemViewRequest request) { + return null; + } + @Override public CompletableFuture scanKv(ScanKvRequest request) { return null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 20b0b71bef..b36de235d7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -20,6 +20,7 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.DatabaseNotExistException; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; @@ -34,6 +35,7 @@ import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.SystemTableConstants; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -71,6 +73,8 @@ import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbApiVersion; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.netty.server.Session; @@ -83,6 +87,10 @@ import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.coordinator.CoordinatorService; import org.apache.fluss.server.coordinator.MetadataManager; +import org.apache.fluss.server.coordinator.system.SystemTableResolver; +import org.apache.fluss.server.coordinator.system.SystemViewDefinition; +import org.apache.fluss.server.coordinator.system.TableBucketsViewProvider; +import org.apache.fluss.server.coordinator.system.TabletServersViewProvider; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.metadata.MetadataProvider; import org.apache.fluss.server.metadata.PartitionMetadata; @@ -94,6 +102,7 @@ import org.apache.fluss.server.zk.data.BucketSnapshot; import org.apache.fluss.server.zk.data.PartitionRegistration; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.utils.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,6 +120,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static org.apache.fluss.metadata.SystemTableConstants.TABLE_KIND_SYSTEM_VIEW; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec; import static org.apache.fluss.security.acl.Resource.TABLE_SPLITTER; @@ -143,6 +153,8 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR protected final @Nullable Authorizer authorizer; protected final DynamicConfigManager dynamicConfigManager; + protected final SystemTableResolver systemTableResolver; + private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; @@ -164,8 +176,37 @@ public RpcServiceBase( this.authorizer = authorizer; this.dynamicConfigManager = dynamicConfigManager; this.ioExecutor = ioExecutor; + this.systemTableResolver = createSystemTableResolver(); + } + + /** Returns the system table resolver initialized for this server. */ + public SystemTableResolver getSystemTableResolver() { + return systemTableResolver; + } + + /** + * Creates and initializes the {@link SystemTableResolver} for this server. + * + *

The base implementation registers common system views (e.g., {@code tablet_servers}) that + * are shared across all server types, then delegates to {@link + * #registerServerSpecificSystemViews(SystemTableResolver)} for server-specific registration. + */ + private SystemTableResolver createSystemTableResolver() { + SystemTableResolver resolver = new SystemTableResolver(); + // Register common system view providers for all server types. + // it is registered as a full provider (definition + data serving). + resolver.registerViewProvider(new TabletServersViewProvider(zkClient)); + resolver.registerViewProvider(new TableBucketsViewProvider(zkClient)); + registerServerSpecificSystemViews(resolver); + return resolver; } + /** + * Override by subclasses (TabletService or CoordinatorService) to register server-specific + * system views. + */ + protected void registerServerSpecificSystemViews(SystemTableResolver resolver) {} + @Override public ServerType providerType() { return provider; @@ -205,7 +246,12 @@ public CompletableFuture apiVersions(ApiVersionsRequest req @Override public CompletableFuture listDatabases(ListDatabasesRequest request) { ListDatabasesResponse response = new ListDatabasesResponse(); - Collection databaseNames = metadataManager.listDatabases(); + Collection databaseNames = new ArrayList<>(metadataManager.listDatabases()); + + // Ensure the system database is always listed + if (!databaseNames.contains(SystemTableConstants.SYSTEM_DATABASE)) { + databaseNames.add(SystemTableConstants.SYSTEM_DATABASE); + } if (authorizer != null) { Collection authorizedDatabase = @@ -247,7 +293,11 @@ public CompletableFuture getDatabaseInfo( public CompletableFuture databaseExists(DatabaseExistsRequest request) { // By design: database exists not need to check database authorization. DatabaseExistsResponse response = new DatabaseExistsResponse(); - boolean exists = metadataManager.databaseExists(request.getDatabaseName()); + String databaseName = request.getDatabaseName(); + // The system database always exists when system views are registered + boolean exists = + metadataManager.databaseExists(databaseName) + || SystemTableConstants.isSystemDatabase(databaseName); response.setExists(exists); return CompletableFuture.completedFuture(response); } @@ -255,7 +305,34 @@ public CompletableFuture databaseExists(DatabaseExistsRe @Override public CompletableFuture listTables(ListTablesRequest request) { ListTablesResponse response = new ListTablesResponse(); - List tableNames = metadataManager.listTables(request.getDatabaseName()); + String databaseName = request.getDatabaseName(); + boolean isSystemDb = SystemTableConstants.isSystemDatabase(databaseName); + + // For the system database, the ZK database may not exist. + // We still need to list system views even if the ZK database is absent. + List tableNames; + try { + tableNames = metadataManager.listTables(databaseName); + } catch (DatabaseNotExistException e) { + if (isSystemDb) { + tableNames = new ArrayList<>(); + } else { + throw e; + } + } + + // Collect system view names for the system database (bypass authorization). + // System views are always listed because their definitions are available in-memory + // on every server. + List systemNames = new ArrayList<>(); + if (isSystemDb) { + for (String name : systemTableResolver.getAllSystemViewNames()) { + if (!tableNames.contains(name)) { + systemNames.add(name); + } + } + } + if (authorizer != null) { List resources = tableNames.stream() @@ -270,6 +347,12 @@ public CompletableFuture listTables(ListTablesRequest reques .collect(Collectors.toList()); } + // Add system names after authorization filtering (system tables/views bypass auth) + if (!systemNames.isEmpty()) { + tableNames = new ArrayList<>(tableNames); + tableNames.addAll(systemNames); + } + response.addAllTableNames(tableNames); return CompletableFuture.completedFuture(response); } @@ -277,6 +360,24 @@ public CompletableFuture listTables(ListTablesRequest reques @Override public CompletableFuture getTableInfo(GetTableInfoRequest request) { TablePath tablePath = toTablePath(request.getTablePath()); + + // Check if this is a system view -- bypass authorization for views + if (systemTableResolver.isSystemView(tablePath)) { + Optional viewDefinitionOpt = + systemTableResolver.getViewDefinition(tablePath); + if (viewDefinitionOpt.isPresent()) { + SystemViewDefinition viewDefinition = viewDefinitionOpt.get(); + GetTableInfoResponse response = new GetTableInfoResponse(); + response.setTableJson(viewDefinition.tableDescriptor().toJsonBytes()) + .setSchemaId(viewDefinition.schemaId()) + .setTableId(TableInfo.UNKNOWN_TABLE_ID) + .setCreatedTime(0) + .setModifiedTime(0) + .setTableKind(TABLE_KIND_SYSTEM_VIEW); + return CompletableFuture.completedFuture(response); + } + } + authorizeTable(OperationType.DESCRIBE, tablePath); GetTableInfoResponse response = new GetTableInfoResponse(); @@ -310,7 +411,10 @@ public CompletableFuture getTableSchema(GetTableSchemaRe public CompletableFuture tableExists(TableExistsRequest request) { // By design: table exists not need to check table authorization. TableExistsResponse response = new TableExistsResponse(); - boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath())); + TablePath tablePath = toTablePath(request.getTablePath()); + boolean exists = + metadataManager.tableExists(tablePath) + || systemTableResolver.isSystemView(tablePath); response.setExists(exists); return CompletableFuture.completedFuture(response); } @@ -543,6 +647,13 @@ public CompletableFuture describeClusterConfigs( new DescribeClusterConfigsResponse().addAllConfigs(toPbConfigEntries(configs))); } + @Override + public CompletableFuture scanSystemView(ScanSystemViewRequest request) { + return FutureUtils.completedExceptionally( + new FlussRuntimeException( + "scanSystemView is only supported on the CoordinatorServer.")); + } + protected MetadataResponse processMetadataRequest( MetadataRequest request, String listenerName, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 10ba95c05d..9bf1757be7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; /** * Coordinator server implementation. The coordinator server is responsible to: @@ -243,6 +244,7 @@ protected void initCoordinatorStandby() throws Exception { new LakeTieringMetricGroup(metricRegistry, serverMetricGroup)); this.metadataManager = new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); + this.ioExecutor = Executors.newFixedThreadPool( conf.get(ConfigOptions.SERVER_IO_POOL_SIZE), @@ -326,6 +328,7 @@ protected void initCoordinatorLeader() throws Exception { kvSnapshotLeaseManager); coordinatorEventProcessor.startup(); + createSystemDatabase(); createDefaultDatabase(); } } @@ -352,6 +355,7 @@ protected void cleanupCoordinatorLeader() { } // Clean up leader-specific resources in reverse order of initialization + try { if (coordinatorEventProcessor != null) { coordinatorEventProcessor.shutdown(); @@ -490,18 +494,26 @@ private interface ThrowingRunnable { void run() throws Exception; } + private void createSystemDatabase() { + MetadataManager metadataManager = + new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); + if (!metadataManager.databaseExists(SYSTEM_DATABASE)) { + metadataManager.createDatabase(SYSTEM_DATABASE, DatabaseDescriptor.EMPTY, true); + LOG.info("Created system database '{}'.", SYSTEM_DATABASE); + } + } + private void createDefaultDatabase() { MetadataManager metadataManager = new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); - List databases = metadataManager.listDatabases(); - if (databases.isEmpty()) { + if (!metadataManager.databaseExists(DEFAULT_DATABASE)) { metadataManager.createDatabase(DEFAULT_DATABASE, DatabaseDescriptor.EMPTY, true); - LOG.info("Created default database '{}' because no database exists.", DEFAULT_DATABASE); + LOG.info("Created default database '{}'.", DEFAULT_DATABASE); } // create Kafka default database if Kafka is enabled. if (conf.get(ConfigOptions.KAFKA_ENABLED)) { String kafkaDB = conf.get(ConfigOptions.KAFKA_DATABASE); - if (!databases.contains(kafkaDB)) { + if (!metadataManager.databaseExists(kafkaDB)) { metadataManager.createDatabase(kafkaDB, DatabaseDescriptor.EMPTY, true); LOG.info("Created default database '{}' for Kafka protocol.", kafkaDB); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index b5cec6a0e2..58cbfb507b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -55,6 +55,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest; import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse; @@ -125,9 +126,12 @@ import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.netty.server.Session; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.rpc.util.PredicateMessageUtils; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; import org.apache.fluss.security.acl.FlussPrincipal; @@ -154,6 +158,7 @@ import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.producer.ProducerOffsetsManager; import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.system.SystemViewProvider; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.DatabasePropertyChanges; import org.apache.fluss.server.entity.LakeTieringTableInfo; @@ -424,6 +429,7 @@ private DatabasePropertyChanges toDatabasePropertyChanges( @Override public CompletableFuture dropDatabase(DropDatabaseRequest request) { authorizeDatabase(OperationType.DROP, request.getDatabaseName()); + DropDatabaseResponse response = new DropDatabaseResponse(); metadataManager.dropDatabase( request.getDatabaseName(), request.isIgnoreIfNotExists(), request.isCascade()); @@ -1302,6 +1308,57 @@ public TableDescriptor getExpectedTable() { } } + // ================================================================================== + // System View Scan API + // ================================================================================== + + @Override + public CompletableFuture scanSystemView(ScanSystemViewRequest request) { + TablePath viewPath = TablePath.of(request.getDatabaseName(), request.getViewName()); + + Optional viewProvider = systemTableResolver.getViewProvider(viewPath); + if (!viewProvider.isPresent()) { + ScanSystemViewResponse response = new ScanSystemViewResponse(); + response.setErrorCode(Errors.TABLE_NOT_EXIST.code()); + response.setErrorMessage( + "System view provider for'" + + viewPath + + "' does not exist in " + + providerType().name()); + return CompletableFuture.completedFuture(response); + } + + try { + SystemViewProvider provider = viewProvider.get(); + + // Deserialize filter predicate if present + Predicate filterPredicate = null; + if (request.hasFilterPredicate()) { + filterPredicate = + PredicateMessageUtils.toPredicate( + request.getFilterPredicate(), provider.schema().getRowType()); + } + + // Extract projected fields if present + int[] projectedFields = null; + if (request.getProjectedFieldsCount() > 0) { + projectedFields = request.getProjectedFields(); + } + + byte[] records = provider.scanRows(projectedFields, filterPredicate); + + ScanSystemViewResponse response = new ScanSystemViewResponse(); + response.setRecords(records); + return CompletableFuture.completedFuture(response); + } catch (Exception e) { + ScanSystemViewResponse response = new ScanSystemViewResponse(); + response.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()); + response.setErrorMessage( + "Failed to scan system view '" + viewPath + "': " + e.getMessage()); + return CompletableFuture.completedFuture(response); + } + } + // ================================================================================== // Producer Offset Management APIs (for Exactly-Once Semantics) // ================================================================================== diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemTableResolver.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemTableResolver.java new file mode 100644 index 0000000000..74c3239002 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemTableResolver.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.TablePath; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Registry of all system view definitions. + * + *

The resolver maintains two separate registries for system views: + * + *

+ */ +@Internal +public class SystemTableResolver { + + /** Registered system view definitions (metadata only), keyed by table path. */ + private final Map viewDefinitions; + + /** Registered system view providers (data serving), keyed by table path. */ + private final Map viewProviders; + + public SystemTableResolver() { + this.viewDefinitions = new HashMap<>(); + this.viewProviders = new HashMap<>(); + } + + // ------------------------------------------------------------------------- + // Registration + // ------------------------------------------------------------------------- + + /** + * Registers a system view definition (metadata only). + * + *

This makes the view visible in metadata operations ({@code listTables}, {@code + * tableExists}, {@code getTableInfo}) without requiring the server to produce data for the + * view. All servers should register definitions for all known views. + */ + public void registerViewDefinition(SystemViewDefinition definition) { + viewDefinitions.put(definition.viewPath(), definition); + } + + /** + * Registers a system view provider (data serving). + * + *

A provider is a {@link SystemViewDefinition} that can also produce data. This method + * registers both the definition and the provider. Only servers that can serve the view's data + * should call this method. + */ + public void registerViewProvider(SystemViewProvider provider) { + viewDefinitions.put(provider.viewPath(), provider); + viewProviders.put(provider.viewPath(), provider); + } + + // ------------------------------------------------------------------------- + // System view definition accessors + // ------------------------------------------------------------------------- + + /** Returns the system view definition for the given path, if registered. */ + public Optional getViewDefinition(TablePath tablePath) { + return Optional.ofNullable(viewDefinitions.get(tablePath)); + } + + /** Returns all registered system view definitions. */ + public List getAllViewDefinitions() { + return new ArrayList<>(viewDefinitions.values()); + } + + /** Checks whether the given path is a registered system view. */ + public boolean isSystemView(TablePath tablePath) { + return viewDefinitions.containsKey(tablePath); + } + + // ------------------------------------------------------------------------- + // System view provider accessors + // ------------------------------------------------------------------------- + + /** Returns the system view provider for the given path, if registered. */ + public Optional getViewProvider(TablePath tablePath) { + return Optional.ofNullable(viewProviders.get(tablePath)); + } + + /** Returns all registered system view providers. */ + public List getAllViewProviders() { + return new ArrayList<>(viewProviders.values()); + } + + // ------------------------------------------------------------------------- + // Combined accessors + // ------------------------------------------------------------------------- + + /** + * Returns all system view names for the system database. + * + *

System views are always listed because their definitions are available in-memory on every + * server. + */ + public List getAllSystemViewNames() { + List names = new ArrayList<>(); + for (TablePath path : viewDefinitions.keySet()) { + names.add(path.getTableName()); + } + return names; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewDefinition.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewDefinition.java new file mode 100644 index 0000000000..2c9069f2eb --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewDefinition.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; + +/** + * Lightweight metadata definition of a system view. + * + *

A system view definition describes the view's path, schema, and schema version without + * providing the ability to produce data. This allows all servers (coordinator and tablet) to be + * aware of which system views exist for metadata operations such as {@code listTables}, {@code + * tableExists}, and {@code getTableInfo}, regardless of whether the server can actually serve the + * view's data. + * + *

To produce data, a {@link SystemViewProvider} implementation is needed, which extends this + * interface with a {@code scanRows} method. + * + * @see SystemViewProvider + */ +@Internal +public interface SystemViewDefinition { + + /** Returns the table path for this system view (e.g., {@code sys.tablet_servers}). */ + TablePath viewPath(); + + /** Returns the schema for this system view. */ + Schema schema(); + + /** + * Returns the schema version for this system view. + * + *

Schema evolution is append-only: new columns may be added at the end. The version starts + * at 0 and increments with each schema change. Clients using {@code KvFormat.INDEXED} can + * safely decode records with trailing fields they don't know about. + * + * @return the schema version, defaults to 0 + */ + default int schemaId() { + return 0; + } + + /** Returns the table descriptor for this system view. */ + default TableDescriptor tableDescriptor() { + return TableDescriptor.builder() + .schema(schema()) + .comment("System view: " + viewPath().getTableName()) + .distributedBy(1) + .build(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewProvider.java new file mode 100644 index 0000000000..9a9efb6c0d --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/SystemViewProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.serializer.RowSerializer; +import org.apache.fluss.types.DataType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Provider for a system view — a virtual table with no independent storage. + * + *

A provider extends {@link SystemViewDefinition} with the ability to actually produce data via + * {@link #scanRows(int[], Predicate)}. Providers are registered only on servers that have access to + * the required data sources (e.g., the coordinator server for coordinator-specific views). + * + *

For servers that only need to know a view exists (for metadata queries like {@code listTables} + * or {@code getTableInfo}), the {@link SystemViewDefinition} is sufficient. + * + * @see SystemViewDefinition + */ +@Internal +public interface SystemViewProvider extends SystemViewDefinition { + + /** + * Fetches rows for this system view, applies an optional projection and filter, and returns + * them as a serialized byte array in {@link DefaultValueRecordBatch} format. + * + * @param projectedFields optional column indices to project, or {@code null} for all columns + * @param filterPredicate optional server-side filter, or {@code null} for no filtering + * @return serialized record batch bytes + * @throws Exception if fetching or serializing data fails + */ + byte[] scanRows(@Nullable int[] projectedFields, @Nullable Predicate filterPredicate) + throws Exception; + + /** + * Utility method to serialize a list of {@link InternalRow} into a byte array using {@link + * DefaultValueRecordBatch} format. + * + *

Implementations can call this from {@link #scanRows(int[], Predicate)} after fetching and + * filtering rows. + * + * @param rows the rows to serialize + * @param schema the schema of the rows + * @param schemaVersion the schema version to stamp on each record + * @return serialized record batch bytes + */ + static byte[] serializeRows(List rows, Schema schema, int schemaVersion) { + DataType[] fieldTypes = schema.getRowType().getFieldTypes().toArray(new DataType[0]); + + RowSerializer rowSerializer = + new RowSerializer(fieldTypes, BinaryRow.BinaryRowFormat.INDEXED); + + try { + DefaultValueRecordBatch.Builder batchBuilder = DefaultValueRecordBatch.builder(); + for (InternalRow row : rows) { + BinaryRow binaryRow = rowSerializer.toBinaryRow(row); + batchBuilder.append((short) schemaVersion, binaryRow); + } + DefaultValueRecordBatch batch = batchBuilder.build(); + + byte[] result = new byte[batch.sizeInBytes()]; + batch.getSegment().get(batch.getPosition(), result, 0, result.length); + return result; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProvider.java new file mode 100644 index 0000000000..394a0bf62c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProvider.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SystemTableConstants; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Equal; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.types.DataTypes; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Provider for the {@code sys.table_buckets} system view. + * + *

This view exposes per-bucket metadata for all tables in the cluster, including replica + * assignments, leader info, and ISR distribution. Data is fetched from ZooKeeper. + * + *

Supports filter pushdown on {@code database_name} and {@code table_name} to narrow the scope + * of ZK queries. + * + *

Schema: + * + *

    + *
  • {@code database_name} (STRING) — database name + *
  • {@code table_name} (STRING) — table name + *
  • {@code partition_name} (STRING, nullable) — partition name, NULL for non-partitioned tables + *
  • {@code bucket_id} (INT) — bucket ID + *
  • {@code leader_id} (INT, nullable) — leader tablet server ID + *
  • {@code replicas} (STRING) — comma-separated replica server IDs + *
  • {@code isr} (STRING, nullable) — comma-separated ISR server IDs + *
  • {@code replica_count} (INT) — total number of replicas + *
  • {@code isr_count} (INT, nullable) — number of in-sync replicas + *
+ */ +@Internal +public class TableBucketsViewProvider implements SystemViewProvider { + + static final int MAX_ROWS_LIMIT = 10_000; + + private static final int NUM_COLUMNS = 9; + + private static final TablePath VIEW_PATH = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, SystemTableConstants.TABLE_BUCKETS_VIEW); + + private static final Schema SCHEMA = + Schema.newBuilder() + .column("database_name", DataTypes.STRING()) + .column("table_name", DataTypes.STRING()) + .column("partition_name", DataTypes.STRING()) + .column("bucket_id", DataTypes.INT()) + .column("leader_id", DataTypes.INT()) + .column("replicas", DataTypes.STRING()) + .column("isr", DataTypes.STRING()) + .column("replica_count", DataTypes.INT()) + .column("isr_count", DataTypes.INT()) + .build(); + + private final ZooKeeperClient zkClient; + + public TableBucketsViewProvider(ZooKeeperClient zkClient) { + this.zkClient = zkClient; + } + + @Override + public TablePath viewPath() { + return VIEW_PATH; + } + + @Override + public Schema schema() { + return SCHEMA; + } + + @Override + public int schemaId() { + return 2; + } + + @Override + public byte[] scanRows(@Nullable int[] projectedFields, @Nullable Predicate filterPredicate) + throws Exception { + // Step 0: Extract filter pushdown conditions. + String filterDbName = null; + String filterTableName = null; + + List conjuncts = PredicateBuilder.splitAnd(filterPredicate); + for (Predicate p : conjuncts) { + if (p instanceof LeafPredicate) { + LeafPredicate leaf = (LeafPredicate) p; + if (leaf.function() instanceof Equal) { + switch (leaf.fieldName()) { + case "database_name": + filterDbName = ((BinaryString) leaf.literals().get(0)).toString(); + break; + case "table_name": + filterTableName = ((BinaryString) leaf.literals().get(0)).toString(); + break; + default: + break; + } + } + } + } + + boolean enforceRowLimit = (filterTableName == null); + + // Step 1: List databases and tables (narrowed by filter). + // Collect table metadata: tablePath -> TableRegistration + Map matchedTables = new HashMap<>(); + List databases; + if (filterDbName != null) { + databases = Collections.singletonList(filterDbName); + } else { + databases = zkClient.listDatabases(); + } + + for (String database : databases) { + if (SystemTableConstants.isSystemDatabase(database)) { + continue; + } + List tableNames; + if (filterTableName != null) { + tableNames = Collections.singletonList(filterTableName); + } else { + tableNames = zkClient.listTables(database); + } + + for (String tableName : tableNames) { + TablePath tablePath = TablePath.of(database, tableName); + Optional optReg = zkClient.getTable(tablePath); + if (!optReg.isPresent()) { + continue; + } + TableRegistration reg = optReg.get(); + matchedTables.put(tablePath, reg); + } + } + + if (matchedTables.isEmpty()) { + return SystemViewProvider.serializeRows( + new ArrayList<>(), projectSchema(projectedFields), schemaId()); + } + + // Step 2: Classify tables into non-partitioned and partitioned. + List nonPartitionedPaths = new ArrayList<>(); + List nonPartitionedIds = new ArrayList<>(); + List partitionedPaths = new ArrayList<>(); + + for (Map.Entry entry : matchedTables.entrySet()) { + if (entry.getValue().isPartitioned()) { + partitionedPaths.add(entry.getKey()); + } else { + nonPartitionedPaths.add(entry.getKey()); + nonPartitionedIds.add(entry.getValue().tableId); + } + } + + List rows = new ArrayList<>(); + + // Step 3: Fetch bucket metadata for non-partitioned tables. + if (!nonPartitionedIds.isEmpty()) { + Map tableAssignments = + zkClient.getTablesAssignments(nonPartitionedIds); + + List tableBuckets = new ArrayList<>(); + tableAssignments.forEach( + (tableId, assignment) -> { + for (Integer bucketId : assignment.getBuckets()) { + tableBuckets.add(new TableBucket(tableId, bucketId)); + } + }); + + Map leaderAndIsrs = zkClient.getLeaderAndIsrs(tableBuckets); + + for (TablePath tablePath : nonPartitionedPaths) { + TableRegistration reg = matchedTables.get(tablePath); + TableAssignment assignment = tableAssignments.get(reg.tableId); + if (assignment == null) { + continue; + } + for (Map.Entry bucketEntry : + assignment.getBucketAssignments().entrySet()) { + int bucketId = bucketEntry.getKey(); + List replicas = bucketEntry.getValue().getReplicas(); + TableBucket tb = new TableBucket(reg.tableId, bucketId); + LeaderAndIsr leaderAndIsr = leaderAndIsrs.get(tb); + + GenericRow row = + buildRow( + tablePath.getDatabaseName(), + tablePath.getTableName(), + null, + bucketId, + replicas, + leaderAndIsr); + + if (filterPredicate == null || filterPredicate.test(row)) { + checkRowLimit(enforceRowLimit, rows.size()); + rows.add(projectRow(row, projectedFields)); + } + } + } + } + + // Step 4: Fetch bucket metadata for partitioned tables. + if (!partitionedPaths.isEmpty()) { + // Collect all partition IDs and their names across all partitioned tables. + // partitionId -> (tablePath, partitionName) + Map partitionIdToTablePath = new HashMap<>(); + Map partitionIdToName = new HashMap<>(); + + for (TablePath tablePath : partitionedPaths) { + Map partitions = zkClient.getPartitionIdAndNames(tablePath); + for (Map.Entry entry : partitions.entrySet()) { + partitionIdToTablePath.put(entry.getKey(), tablePath); + partitionIdToName.put(entry.getKey(), entry.getValue()); + } + } + + if (!partitionIdToName.isEmpty()) { + Map partitionAssignments = + zkClient.getPartitionsAssignments(partitionIdToName.keySet()); + + List partitionBuckets = new ArrayList<>(); + partitionAssignments.forEach( + (partitionId, assignment) -> { + for (Integer bucketId : assignment.getBuckets()) { + partitionBuckets.add( + new TableBucket( + assignment.getTableId(), partitionId, bucketId)); + } + }); + + Map leaderAndIsrs = + zkClient.getLeaderAndIsrs(partitionBuckets); + + for (Map.Entry paEntry : + partitionAssignments.entrySet()) { + long partitionId = paEntry.getKey(); + PartitionAssignment assignment = paEntry.getValue(); + TablePath tablePath = partitionIdToTablePath.get(partitionId); + String partitionName = partitionIdToName.get(partitionId); + TableRegistration reg = matchedTables.get(tablePath); + + for (Map.Entry bucketEntry : + assignment.getBucketAssignments().entrySet()) { + int bucketId = bucketEntry.getKey(); + List replicas = bucketEntry.getValue().getReplicas(); + TableBucket tb = new TableBucket(reg.tableId, partitionId, bucketId); + LeaderAndIsr leaderAndIsr = leaderAndIsrs.get(tb); + + GenericRow row = + buildRow( + tablePath.getDatabaseName(), + tablePath.getTableName(), + partitionName, + bucketId, + replicas, + leaderAndIsr); + + if (filterPredicate == null || filterPredicate.test(row)) { + checkRowLimit(enforceRowLimit, rows.size()); + rows.add(projectRow(row, projectedFields)); + } + } + } + } + } + + return SystemViewProvider.serializeRows(rows, projectSchema(projectedFields), schemaId()); + } + + private static GenericRow buildRow( + String databaseName, + String tableName, + @Nullable String partitionName, + int bucketId, + List replicas, + @Nullable LeaderAndIsr leaderAndIsr) { + GenericRow row = new GenericRow(NUM_COLUMNS); + row.setField(0, BinaryString.fromString(databaseName)); + row.setField(1, BinaryString.fromString(tableName)); + row.setField(2, partitionName != null ? BinaryString.fromString(partitionName) : null); + row.setField(3, bucketId); + + if (leaderAndIsr != null) { + int leader = leaderAndIsr.leader(); + row.setField(4, leader == LeaderAndIsr.NO_LEADER ? null : leader); + } else { + row.setField(4, null); + } + + row.setField(5, BinaryString.fromString(joinInts(replicas))); + row.setField( + 6, + leaderAndIsr != null + ? BinaryString.fromString(joinInts(leaderAndIsr.isr())) + : null); + row.setField(7, replicas.size()); + row.setField(8, leaderAndIsr != null ? leaderAndIsr.isr().size() : null); + + return row; + } + + private static void checkRowLimit(boolean enforceRowLimit, int currentSize) { + if (enforceRowLimit && currentSize >= MAX_ROWS_LIMIT) { + throw new FlussRuntimeException( + "System view 'sys.table_buckets' result exceeds maximum row limit (" + + MAX_ROWS_LIMIT + + "). Please add a filter condition on table_name" + + " to narrow the query scope."); + } + } + + private static String joinInts(List values) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < values.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append(values.get(i)); + } + return sb.toString(); + } + + private Schema projectSchema(@Nullable int[] projectedFields) { + if (projectedFields == null) { + return SCHEMA; + } + Schema.Builder builder = Schema.newBuilder(); + List columns = SCHEMA.getColumns(); + for (int idx : projectedFields) { + Schema.Column col = columns.get(idx); + builder.column(col.getName(), col.getDataType()); + } + return builder.build(); + } + + private static InternalRow projectRow(GenericRow fullRow, @Nullable int[] projectedFields) { + if (projectedFields == null) { + return fullRow; + } + GenericRow projected = new GenericRow(projectedFields.length); + for (int i = 0; i < projectedFields.length; i++) { + projected.setField(i, fullRow.getField(projectedFields[i])); + } + return projected; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TabletServersViewProvider.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TabletServersViewProvider.java new file mode 100644 index 0000000000..2b5f6e91e4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/system/TabletServersViewProvider.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.cluster.Endpoint; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SystemTableConstants; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.TabletServerRegistration; +import org.apache.fluss.types.DataTypes; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Provider for the {@code sys.servers} system view. + * + *

This view exposes information about all registered tablet servers by querying ZooKeeper. The + * schema is: + * + *

    + *
  • {@code server_id} (INT) — tablet server id + *
  • {@code endpoints} (STRING) — comma-separated list of endpoints in {@code + * ://:} format + *
  • {@code rack} (STRING, nullable) — rack identifier + *
  • {@code register_timestamp} (BIGINT) — when the server registered + *
+ */ +@Internal +public class TabletServersViewProvider implements SystemViewProvider { + + private static final TablePath VIEW_PATH = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, SystemTableConstants.TABLET_SERVERS_VIEW); + + private static final Schema SCHEMA = + Schema.newBuilder() + .column("server_id", DataTypes.INT()) + .column("endpoints", DataTypes.STRING()) + .column("rack", DataTypes.STRING()) + .column("register_timestamp", DataTypes.BIGINT()) + .build(); + + private final ZooKeeperClient zkClient; + + public TabletServersViewProvider(ZooKeeperClient zkClient) { + this.zkClient = zkClient; + } + + @Override + public TablePath viewPath() { + return VIEW_PATH; + } + + @Override + public Schema schema() { + return SCHEMA; + } + + @Override + public int schemaId() { + return 1; + } + + @Override + public byte[] scanRows(@Nullable int[] projectedFields, @Nullable Predicate filterPredicate) + throws Exception { + int[] serverIds = zkClient.getSortedTabletServerList(); + if (serverIds.length == 0) { + return SystemViewProvider.serializeRows( + new ArrayList<>(), projectSchema(projectedFields), schemaId()); + } + Map registrations = zkClient.getTabletServers(serverIds); + + List rows = new ArrayList<>(registrations.size()); + for (Map.Entry entry : registrations.entrySet()) { + int serverId = entry.getKey(); + TabletServerRegistration reg = entry.getValue(); + + // Build comma-separated endpoints string: ://:,... + List endpoints = reg.getEndpoints(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < endpoints.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append(endpoints.get(i).listenerString()); + } + + GenericRow row = new GenericRow(4); + row.setField(0, serverId); + row.setField(1, BinaryString.fromString(sb.toString())); + row.setField(2, reg.getRack() != null ? BinaryString.fromString(reg.getRack()) : null); + row.setField(3, reg.getRegisterTimestamp()); + + if (filterPredicate == null || filterPredicate.test(row)) { + rows.add(projectRow(row, projectedFields)); + } + } + return SystemViewProvider.serializeRows(rows, projectSchema(projectedFields), schemaId()); + } + + private Schema projectSchema(@Nullable int[] projectedFields) { + if (projectedFields == null) { + return SCHEMA; + } + Schema.Builder builder = Schema.newBuilder(); + List columns = SCHEMA.getColumns(); + for (int idx : projectedFields) { + Schema.Column col = columns.get(idx); + builder.column(col.getName(), col.getDataType()); + } + return builder.build(); + } + + private static InternalRow projectRow(GenericRow fullRow, @Nullable int[] projectedFields) { + if (projectedFields == null) { + return fullRow; + } + GenericRow projected = new GenericRow(projectedFields.length); + for (int i = 0; i < projectedFields.length; i++) { + projected.setField(i, fullRow.getField(projectedFields[i])); + } + return projected; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index b2abac8307..42f8feae1d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -95,6 +95,7 @@ import static org.apache.fluss.config.ConfigOptions.CURRENT_KV_FORMAT_VERSION; import static org.apache.fluss.config.ConfigOptions.DEFAULT_LISTENER_NAME; +import static org.apache.fluss.metadata.SystemTableConstants.SYSTEM_DATABASE; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest; @@ -223,7 +224,8 @@ void testDatabaseManagement(boolean isCoordinatorServer) throws Exception { // list database assertThat(gateway.listDatabases(new ListDatabasesRequest()).get().getDatabaseNamesList()) - .containsExactlyInAnyOrderElementsOf(Arrays.asList(db1, db2, "fluss")); + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(db1, db2, "fluss", SYSTEM_DATABASE)); // list the table, should be empty assertThat(gateway.listTables(newListTablesRequest(db1)).get().getTableNamesList()) @@ -239,7 +241,7 @@ void testDatabaseManagement(boolean isCoordinatorServer) throws Exception { adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); assertThat(gateway.listDatabases(new ListDatabasesRequest()).get().getDatabaseNamesList()) - .isEqualTo(Arrays.asList(db2, "fluss")); + .containsExactlyInAnyOrder(db2, "fluss", SYSTEM_DATABASE); // drop a not exist database without ignore if not exists, should throw exception assertThatThrownBy( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index f55846a7ff..bcc278f513 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -110,6 +110,8 @@ import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -460,6 +462,11 @@ public CompletableFuture describeClusterConfigs( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture scanSystemView(ScanSystemViewRequest request) { + return null; + } + @Override public CompletableFuture registerProducerOffsets( RegisterProducerOffsetsRequest request) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManagerTest.java index b3d542e0e8..203c57f25d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManagerTest.java @@ -21,9 +21,10 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.metrics.Gauge; import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.metrics.registry.NOPMetricRegistry; import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.metadata.ServerInfo; -import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.zk.ZkEpoch; import org.junit.jupiter.api.Test; @@ -52,6 +53,10 @@ class CoordinatorEventManagerTest { */ @Test void testMetricsUpdatedImmediatelyOnStartup() { + // Create a local metric group to avoid shared static state issues + CoordinatorMetricGroup metricGroup = + new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", "host", "0"); + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); context.addLiveCoordinator("coordinator-0"); context.addLiveTabletServer( @@ -76,8 +81,7 @@ void testMetricsUpdatedImmediatelyOnStartup() { } }; - CoordinatorEventManager manager = - new CoordinatorEventManager(testProcessor, TestingMetricGroups.COORDINATOR_METRICS); + CoordinatorEventManager manager = new CoordinatorEventManager(testProcessor, metricGroup); manager.start(); try { @@ -88,14 +92,11 @@ void testMetricsUpdatedImmediatelyOnStartup() { () -> assertThat(metricsUpdateCount.get()).isGreaterThan(0)); Gauge activeTabletServerCount = - (Gauge) - TestingMetricGroups.COORDINATOR_METRICS - .getMetrics() - .get(MetricNames.ACTIVE_TABLET_SERVER_COUNT); + (Gauge) metricGroup.getMetrics().get(MetricNames.ACTIVE_TABLET_SERVER_COUNT); assertThat(activeTabletServerCount.getValue()).isEqualTo(2); } finally { manager.close(); - TestingMetricGroups.COORDINATOR_METRICS.close(); + metricGroup.close(); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemTableResolverTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemTableResolverTest.java new file mode 100644 index 0000000000..2ff6f7c174 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemTableResolverTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SystemTableResolver}. */ +class SystemTableResolverTest { + + private SystemTableResolver resolver; + + @BeforeEach + void setUp() { + resolver = new SystemTableResolver(); + } + + @Test + void testRegisterViewDefinitionOnly() { + TablePath viewPath = TablePath.of("sys", "test_view"); + SystemViewDefinition definition = createTestViewDefinition(viewPath); + + resolver.registerViewDefinition(definition); + + // Definition should be visible + assertThat(resolver.isSystemView(viewPath)).isTrue(); + assertThat(resolver.getViewDefinition(viewPath)).isPresent(); + assertThat(resolver.getViewDefinition(viewPath).get().viewPath()).isEqualTo(viewPath); + + // But no provider should be available (cannot serve data) + assertThat(resolver.getViewProvider(viewPath)).isNotPresent(); + } + + @Test + void testRegisterViewProvider() { + TablePath viewPath = TablePath.of("sys", "test_view"); + SystemViewProvider provider = createTestViewProvider(viewPath); + + resolver.registerViewProvider(provider); + + // Both definition and provider should be available + assertThat(resolver.isSystemView(viewPath)).isTrue(); + assertThat(resolver.getViewDefinition(viewPath)).isPresent(); + assertThat(resolver.getViewProvider(viewPath)).isPresent(); + } + + @Test + void testIsSystemViewNotFound() { + assertThat(resolver.isSystemView(TablePath.of("sys", "unknown"))).isFalse(); + } + + @Test + void testGetViewProviderNotFound() { + assertThat(resolver.getViewProvider(TablePath.of("sys", "unknown"))).isNotPresent(); + } + + @Test + void testGetViewDefinitionNotFound() { + assertThat(resolver.getViewDefinition(TablePath.of("sys", "unknown"))).isNotPresent(); + } + + @Test + void testGetAllSystemViewNames() { + TablePath viewPath = TablePath.of("sys", "test_view"); + resolver.registerViewDefinition(createTestViewDefinition(viewPath)); + + // Only view names + List viewNames = resolver.getAllSystemViewNames(); + assertThat(viewNames).containsExactly("test_view"); + } + + @Test + void testGetAllViewDefinitions() { + TablePath viewPath1 = TablePath.of("sys", "view1"); + TablePath viewPath2 = TablePath.of("sys", "view2"); + resolver.registerViewDefinition(createTestViewDefinition(viewPath1)); + resolver.registerViewProvider(createTestViewProvider(viewPath2)); + + List definitions = resolver.getAllViewDefinitions(); + assertThat(definitions).hasSize(2); + } + + @Test + void testGetAllViewProviders() { + TablePath viewPath1 = TablePath.of("sys", "view1"); + TablePath viewPath2 = TablePath.of("sys", "view2"); + // Register one as definition-only, one as full provider + resolver.registerViewDefinition(createTestViewDefinition(viewPath1)); + resolver.registerViewProvider(createTestViewProvider(viewPath2)); + + // Only view2 should have a provider + List providers = resolver.getAllViewProviders(); + assertThat(providers).hasSize(1); + assertThat(providers.get(0).viewPath()).isEqualTo(viewPath2); + } + + private static SystemViewDefinition createTestViewDefinition(TablePath viewPath) { + return new SystemViewDefinition() { + @Override + public TablePath viewPath() { + return viewPath; + } + + @Override + public Schema schema() { + return Schema.newBuilder().column("col1", DataTypes.INT()).build(); + } + + @Override + public TableDescriptor tableDescriptor() { + return null; + } + }; + } + + private static SystemViewProvider createTestViewProvider(TablePath viewPath) { + return new SystemViewProvider() { + @Override + public TablePath viewPath() { + return viewPath; + } + + @Override + public Schema schema() { + return Schema.newBuilder().column("col1", DataTypes.INT()).build(); + } + + @Override + public byte[] scanRows( + @Nullable int[] projectedFields, @Nullable Predicate filterPredicate) { + return new byte[0]; + } + }; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemViewITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemViewITCase.java new file mode 100644 index 0000000000..347a1843f0 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/SystemViewITCase.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.SystemTableConstants; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Equal; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordBatch; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.decode.RowDecoder; +import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; +import org.apache.fluss.rpc.messages.GetTableInfoResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.rpc.util.PredicateMessageUtils; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.DataField; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newListTablesRequest; +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for the system view scan API. */ +class SystemViewITCase { + + private static final int NUM_TABLET_SERVERS = 3; + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER = + FlussClusterExtension.builder().setNumOfTabletServers(NUM_TABLET_SERVERS).build(); + + private static AdminReadOnlyGateway gateway; + + @BeforeAll + static void setUp() { + gateway = FLUSS_CLUSTER.newCoordinatorClient(); + } + + @Test + void testListTablesIncludesServersView() throws Exception { + List tables = + gateway.listTables(newListTablesRequest(SystemTableConstants.SYSTEM_DATABASE)) + .get() + .getTableNamesList(); + assertThat(tables).contains(SystemTableConstants.TABLET_SERVERS_VIEW); + } + + @Test + void testGetTableInfoForServersView() throws Exception { + TablePath serversPath = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, + SystemTableConstants.TABLET_SERVERS_VIEW); + GetTableInfoResponse response = + gateway.getTableInfo(newGetTableInfoRequest(serversPath)).get(); + assertThat(response.hasTableKind()).isTrue(); + assertThat(response.getTableKind()).isEqualTo(SystemTableConstants.TABLE_KIND_SYSTEM_VIEW); + assertThat(response.getSchemaId()).isEqualTo(1); + } + + @Test + void testScanSystemViewWithFilterPredicate() throws Exception { + // Build a filter: server_id = 0 + List fields = new ArrayList<>(); + fields.add(new DataField("server_id", DataTypes.INT(), 0)); + fields.add(new DataField("endpoints", DataTypes.STRING(), 1)); + fields.add(new DataField("rack", DataTypes.STRING(), 2)); + fields.add(new DataField("register_timestamp", DataTypes.BIGINT(), 3)); + RowType serversRowType = new RowType(fields); + + LeafPredicate filter = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.INT(), + 0, + "server_id", + Collections.singletonList(0)); + + ScanSystemViewRequest request = new ScanSystemViewRequest(); + request.setDatabaseName(SystemTableConstants.SYSTEM_DATABASE); + request.setViewName(SystemTableConstants.TABLET_SERVERS_VIEW); + request.setSchemaId("1"); + request.setFilterPredicate(PredicateMessageUtils.toPbPredicate(filter, serversRowType)); + + ScanSystemViewResponse response = gateway.scanSystemView(request).get(); + + assertThat(response.hasErrorCode()).isFalse(); + assertThat(response.hasRecords()).isTrue(); + + byte[] recordBytes = response.getRecords(); + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(recordBytes); + + // Only server_id=0 should be returned + assertThat(batch.getRecordCount()).isEqualTo(1); + + DataType[] fieldTypes = + new DataType[] { + DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.BIGINT() + }; + RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, fieldTypes); + ValueRecordBatch.ReadContext readContext = schemaId -> decoder; + for (ValueRecord record : batch.records(readContext)) { + BinaryRow row = record.getRow(); + assertThat(row).isNotNull(); + assertThat(row.getInt(0)).isEqualTo(0); + System.out.printf( + "Filtered row: server_id=%d, endpoints=%s%n", + row.getInt(0), row.getString(1).toString()); + } + } + + @Test + void testListTablesIncludesTableBucketsView() throws Exception { + List tables = + gateway.listTables(newListTablesRequest(SystemTableConstants.SYSTEM_DATABASE)) + .get() + .getTableNamesList(); + assertThat(tables).contains(SystemTableConstants.TABLE_BUCKETS_VIEW); + } + + @Test + void testGetTableInfoForTableBucketsView() throws Exception { + TablePath bucketsPath = + TablePath.of( + SystemTableConstants.SYSTEM_DATABASE, + SystemTableConstants.TABLE_BUCKETS_VIEW); + GetTableInfoResponse response = + gateway.getTableInfo(newGetTableInfoRequest(bucketsPath)).get(); + assertThat(response.hasTableKind()).isTrue(); + assertThat(response.getTableKind()).isEqualTo(SystemTableConstants.TABLE_KIND_SYSTEM_VIEW); + assertThat(response.getSchemaId()).isEqualTo(2); + } + + @Test + void testScanNonExistentViewReturnsError() throws Exception { + ScanSystemViewRequest request = new ScanSystemViewRequest(); + request.setDatabaseName(SystemTableConstants.SYSTEM_DATABASE); + request.setViewName("nonexistent_view"); + request.setSchemaId("0"); + + ScanSystemViewResponse response = gateway.scanSystemView(request).get(); + + assertThat(response.hasErrorCode()).isTrue(); + assertThat(response.getErrorCode()).isEqualTo(Errors.TABLE_NOT_EXIST.code()); + assertThat(response.hasErrorMessage()).isTrue(); + assertThat(response.getErrorMessage()).contains("does not exist"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProviderTest.java new file mode 100644 index 0000000000..4e4f9d49dc --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TableBucketsViewProviderTest.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor.TableDistribution; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.And; +import org.apache.fluss.predicate.CompoundPredicate; +import org.apache.fluss.predicate.Equal; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordBatch; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.RowDecoder; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.DatabaseRegistration; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; +import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TableBucketsViewProvider}. */ +class TableBucketsViewProviderTest { + + @RegisterExtension + public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private static ZooKeeperClient zookeeperClient; + private static TableBucketsViewProvider provider; + + @BeforeAll + static void beforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + provider = new TableBucketsViewProvider(zookeeperClient); + } + + @AfterEach + void afterEach() { + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); + } + + @AfterAll + static void afterAll() throws Exception { + if (zookeeperClient != null) { + zookeeperClient.close(); + } + } + + @Test + void testViewPath() { + TablePath path = provider.viewPath(); + assertThat(path.getDatabaseName()).isEqualTo("sys"); + assertThat(path.getTableName()).isEqualTo("table_buckets"); + } + + @Test + void testSchemaColumns() { + Schema schema = provider.schema(); + List columns = schema.getColumns(); + assertThat(columns).hasSize(9); + assertThat(columns.get(0).getName()).isEqualTo("database_name"); + assertThat(columns.get(1).getName()).isEqualTo("table_name"); + assertThat(columns.get(2).getName()).isEqualTo("partition_name"); + assertThat(columns.get(3).getName()).isEqualTo("bucket_id"); + assertThat(columns.get(4).getName()).isEqualTo("leader_id"); + assertThat(columns.get(5).getName()).isEqualTo("replicas"); + assertThat(columns.get(6).getName()).isEqualTo("isr"); + assertThat(columns.get(7).getName()).isEqualTo("replica_count"); + assertThat(columns.get(8).getName()).isEqualTo("isr_count"); + } + + @Test + void testSchemaId() { + assertThat(provider.schemaId()).isEqualTo(2); + } + + @Test + void testScanRowsWithNoTables() throws Exception { + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).isEmpty(); + } + + @Test + void testScanRowsNonPartitionedTable() throws Exception { + long tableId = 1L; + String dbName = "testdb"; + String tableName = "orders"; + TablePath tablePath = TablePath.of(dbName, tableName); + + registerDatabase(dbName); + registerTable(tablePath, tableId, Collections.emptyList(), 2); + + // Register assignment with 2 buckets, replicas on servers 0 and 1. + Map assignments = new HashMap<>(); + assignments.put(0, BucketAssignment.of(0, 1)); + assignments.put(1, BucketAssignment.of(1, 0)); + zookeeperClient.registerTableAssignment(tableId, new TableAssignment(assignments)); + + // Register leader/ISR for bucket 0 only. + LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 5, Arrays.asList(0, 1), 1, 10); + zookeeperClient.registerLeaderAndIsr( + new TableBucket(tableId, 0), + leaderAndIsr, + ZkVersion.MATCH_ANY_VERSION.getVersion()); + + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).hasSize(2); + + // Verify bucket 0 with leader/ISR. + InternalRow row0 = findRowByBucketId(rows, 0); + assertThat(row0.getString(0).toString()).isEqualTo(dbName); + assertThat(row0.getString(1).toString()).isEqualTo(tableName); + assertThat(row0.isNullAt(2)).isTrue(); // partition_name + assertThat(row0.getInt(3)).isEqualTo(0); // bucket_id + assertThat(row0.getInt(4)).isEqualTo(0); // leader_id + assertThat(row0.getString(5).toString()).isEqualTo("0,1"); // replicas + assertThat(row0.getString(6).toString()).isEqualTo("0,1"); // isr + assertThat(row0.getInt(7)).isEqualTo(2); // replica_count + assertThat(row0.getInt(8)).isEqualTo(2); // isr_count + + // Verify bucket 1 without leader/ISR has nulls for leader fields. + InternalRow row1 = findRowByBucketId(rows, 1); + assertThat(row1.isNullAt(4)).isTrue(); // leader_id null + assertThat(row1.isNullAt(6)).isTrue(); // isr null + assertThat(row1.isNullAt(8)).isTrue(); // isr_count null + } + + @Test + void testScanRowsPartitionedTable() throws Exception { + long tableId = 2L; + long partitionId = 100L; + String dbName = "testdb"; + String tableName = "events"; + String partitionName = "2024-01-01"; + TablePath tablePath = TablePath.of(dbName, tableName); + + registerDatabase(dbName); + registerTable(tablePath, tableId, Collections.singletonList("dt"), 1); + + // Register partition assignment. + Map bucketMap = new HashMap<>(); + bucketMap.put(0, BucketAssignment.of(0)); + PartitionAssignment partAssignment = new PartitionAssignment(tableId, bucketMap); + zookeeperClient.registerPartitionAssignmentAndMetadata( + partitionId, partitionName, partAssignment, "/tmp/remote", tablePath, tableId); + + // Register leader/ISR for the partition bucket. + LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 1, Collections.singletonList(0), 0, 3); + zookeeperClient.registerLeaderAndIsr( + new TableBucket(tableId, partitionId, 0), + leaderAndIsr, + ZkVersion.MATCH_ANY_VERSION.getVersion()); + + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).hasSize(1); + + InternalRow row = rows.get(0); + assertThat(row.getString(0).toString()).isEqualTo(dbName); + assertThat(row.getString(1).toString()).isEqualTo(tableName); + assertThat(row.getString(2).toString()).isEqualTo(partitionName); + assertThat(row.getInt(3)).isEqualTo(0); // bucket_id + } + + @Test + void testFilterPushdownByTableName() throws Exception { + String dbName = "mydb"; + registerDatabase(dbName); + + registerTable(TablePath.of(dbName, "orders"), 10L, Collections.emptyList(), 1); + registerTable(TablePath.of(dbName, "users"), 11L, Collections.emptyList(), 1); + + Map a1 = new HashMap<>(); + a1.put(0, BucketAssignment.of(0)); + zookeeperClient.registerTableAssignment(10L, new TableAssignment(a1)); + + Map a2 = new HashMap<>(); + a2.put(0, BucketAssignment.of(0)); + zookeeperClient.registerTableAssignment(11L, new TableAssignment(a2)); + + // Filter: table_name = 'orders' + LeafPredicate predicate = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 1, + "table_name", + Collections.singletonList(BinaryString.fromString("orders"))); + + byte[] result = provider.scanRows(null, predicate); + List rows = deserializeRows(result); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getString(1).toString()).isEqualTo("orders"); + } + + @Test + void testFilterPushdownByDatabaseAndTableName() throws Exception { + registerDatabase("db1"); + registerDatabase("db2"); + + registerTable(TablePath.of("db1", "t1"), 20L, Collections.emptyList(), 1); + registerTable(TablePath.of("db2", "t1"), 21L, Collections.emptyList(), 1); + + Map a1 = new HashMap<>(); + a1.put(0, BucketAssignment.of(0)); + zookeeperClient.registerTableAssignment(20L, new TableAssignment(a1)); + + Map a2 = new HashMap<>(); + a2.put(0, BucketAssignment.of(0)); + zookeeperClient.registerTableAssignment(21L, new TableAssignment(a2)); + + // Filter: database_name = 'db1' AND table_name = 't1' + LeafPredicate dbPredicate = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 0, + "database_name", + Collections.singletonList(BinaryString.fromString("db1"))); + LeafPredicate tablePredicate = + new LeafPredicate( + Equal.INSTANCE, + DataTypes.STRING(), + 1, + "table_name", + Collections.singletonList(BinaryString.fromString("t1"))); + CompoundPredicate predicate = + new CompoundPredicate(And.INSTANCE, Arrays.asList(dbPredicate, tablePredicate)); + + byte[] result = provider.scanRows(null, predicate); + List rows = deserializeRows(result); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getString(0).toString()).isEqualTo("db1"); + assertThat(rows.get(0).getString(1).toString()).isEqualTo("t1"); + } + + @Test + void testColumnProjection() throws Exception { + String dbName = "projdb"; + registerDatabase(dbName); + registerTable(TablePath.of(dbName, "mytable"), 30L, Collections.emptyList(), 1); + + Map assignments = new HashMap<>(); + assignments.put(0, BucketAssignment.of(0, 1)); + zookeeperClient.registerTableAssignment(30L, new TableAssignment(assignments)); + + // Project columns: table_name (1), bucket_id (3), replica_count (7) + int[] projectedFields = new int[] {1, 3, 7}; + byte[] result = provider.scanRows(projectedFields, null); + + DataType[] projectedTypes = + new DataType[] {DataTypes.STRING(), DataTypes.INT(), DataTypes.INT()}; + RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, projectedTypes); + ValueRecordBatch.ReadContext readContext = schemaId -> decoder; + + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(result); + List rows = new ArrayList<>(); + for (ValueRecord record : batch.records(readContext)) { + InternalRow row = record.getRow(); + if (row != null) { + rows.add(row); + } + } + + assertThat(rows).hasSize(1); + InternalRow row = rows.get(0); + assertThat(row.getString(0).toString()).isEqualTo("mytable"); + assertThat(row.getInt(1)).isEqualTo(0); + assertThat(row.getInt(2)).isEqualTo(2); + } + + @Test + void testSysDatabaseIsExcluded() throws Exception { + // With no user databases registered, only the sys database exists implicitly. + // The provider should exclude it and return empty results. + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).isEmpty(); + } + + // ---- Helper methods ---- + + private void registerDatabase(String dbName) throws Exception { + zookeeperClient.registerDatabase( + dbName, DatabaseRegistration.of(DatabaseDescriptor.builder().build())); + } + + private void registerTable( + TablePath tablePath, long tableId, List partitionKeys, int bucketCount) + throws Exception { + TableDistribution distribution = + new TableDistribution(bucketCount, Collections.emptyList()); + TableRegistration registration = + new TableRegistration( + tableId, + null, + partitionKeys, + distribution, + Collections.emptyMap(), + Collections.emptyMap(), + "/tmp/remote", + System.currentTimeMillis(), + System.currentTimeMillis()); + zookeeperClient.registerTable(tablePath, registration); + } + + private InternalRow findRowByBucketId(List rows, int bucketId) { + for (InternalRow row : rows) { + if (row.getInt(3) == bucketId) { + return row; + } + } + throw new AssertionError("No row found with bucket_id=" + bucketId); + } + + /** Deserializes a byte array produced by {@link SystemViewProvider#scanRows} into rows. */ + private List deserializeRows(byte[] bytes) { + DataType[] fieldTypes = + provider.schema().getRowType().getFieldTypes().toArray(new DataType[0]); + RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, fieldTypes); + ValueRecordBatch.ReadContext readContext = schemaId -> decoder; + + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(bytes); + List rows = new ArrayList<>(); + for (ValueRecord record : batch.records(readContext)) { + InternalRow row = record.getRow(); + if (row != null) { + rows.add(row); + } + } + return rows; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TabletServersViewProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TabletServersViewProviderTest.java new file mode 100644 index 0000000000..92d8e2c980 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/system/TabletServersViewProviderTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.system; + +import org.apache.fluss.cluster.Endpoint; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordBatch; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.RowDecoder; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.TabletServerRegistration; +import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.types.DataType; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TabletServersViewProvider}. */ +class TabletServersViewProviderTest { + + @RegisterExtension + public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private static ZooKeeperClient zookeeperClient; + private static TabletServersViewProvider provider; + + @BeforeAll + static void beforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + provider = new TabletServersViewProvider(zookeeperClient); + } + + @AfterEach + void afterEach() { + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); + } + + @AfterAll + static void afterAll() throws Exception { + if (zookeeperClient != null) { + zookeeperClient.close(); + } + } + + @Test + void testViewPath() { + TablePath path = provider.viewPath(); + assertThat(path.getDatabaseName()).isEqualTo("sys"); + assertThat(path.getTableName()).isEqualTo("tablet_servers"); + } + + @Test + void testSchemaColumns() { + Schema schema = provider.schema(); + List columns = schema.getColumns(); + assertThat(columns).hasSize(4); + assertThat(columns.get(0).getName()).isEqualTo("server_id"); + assertThat(columns.get(1).getName()).isEqualTo("endpoints"); + assertThat(columns.get(2).getName()).isEqualTo("rack"); + assertThat(columns.get(3).getName()).isEqualTo("register_timestamp"); + } + + @Test + void testSchemaId() { + assertThat(provider.schemaId()).isEqualTo(1); + } + + @Test + void testScanRowsWithNoServers() throws Exception { + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).isEmpty(); + } + + @Test + void testScanRowsWithRegisteredServer() throws Exception { + long timestamp = System.currentTimeMillis(); + TabletServerRegistration registration = + new TabletServerRegistration( + "rack-a", Endpoint.fromListenersString("CLIENT://myhost:9123"), timestamp); + zookeeperClient.registerTabletServer(42, registration); + + byte[] result = provider.scanRows(null, null); + List rows = deserializeRows(result); + assertThat(rows).hasSize(1); + + InternalRow row = rows.get(0); + assertThat(row.getInt(0)).isEqualTo(42); + assertThat(row.getString(1).toString()).isEqualTo("CLIENT://myhost:9123"); + assertThat(row.getString(2).toString()).isEqualTo("rack-a"); + assertThat(row.getLong(3)).isEqualTo(timestamp); + } + + /** Deserializes a byte array produced by {@link SystemViewProvider#scanRows} into rows. */ + private List deserializeRows(byte[] bytes) { + DataType[] fieldTypes = + provider.schema().getRowType().getFieldTypes().toArray(new DataType[0]); + RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, fieldTypes); + ValueRecordBatch.ReadContext readContext = schemaId -> decoder; + + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(bytes); + List rows = new ArrayList<>(); + for (ValueRecord record : batch.records(readContext)) { + InternalRow row = record.getRow(); + if (row != null) { + rows.add(row); + } + } + return rows; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index e54987f134..df0ec5a30e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -85,6 +85,8 @@ import org.apache.fluss.rpc.messages.PutKvResponse; import org.apache.fluss.rpc.messages.ScanKvRequest; import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScanSystemViewRequest; +import org.apache.fluss.rpc.messages.ScanSystemViewResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -366,6 +368,11 @@ public CompletableFuture describeClusterConfigs( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture scanSystemView(ScanSystemViewRequest request) { + throw new UnsupportedOperationException(); + } + public int pendingRequestSize() { return requests.size(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 74a0415963..1cce254fb1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -25,6 +25,7 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.fs.local.LocalFileSystem; import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.SystemTableConstants; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.MetricRegistry; @@ -185,14 +186,16 @@ public void afterEach(ExtensionContext extensionContext) throws Exception { CoordinatorGateway coordinatorGateway = newCoordinatorClient(); List> dropFutures = new ArrayList<>(); for (String db : dbs) { + if (SystemTableConstants.isSystemDatabase(db)) { + continue; + } if (BUILTIN_DATABASE.equals(db)) { - // if it's built-in database, we just drop all tables in it but not drop the - // database itself. - List tables = metadataManager.listTables(BUILTIN_DATABASE); + // if it's built-in database or system database, we just drop all tables in it + // but not drop the database itself. + List tables = metadataManager.listTables(db); for (String table : tables) { dropFutures.add( - coordinatorGateway.dropTable( - newDropTableRequest(BUILTIN_DATABASE, table, true))); + coordinatorGateway.dropTable(newDropTableRequest(db, table, true))); } } else { dropFutures.add( diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 11287fb7d4..3e6a51cb02 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -38,6 +38,7 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { protected val DEFAULT_CATALOG = "fluss_catalog" protected val DEFAULT_DATABASE = "fluss" + protected val SYSTEM_DATABASE = "sys"; protected var conn: Connection = _ protected var admin: Admin = _ diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index e30b03baac..0ffddfbf1b 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -37,11 +37,13 @@ class SparkCatalogTest extends FlussSparkTestBase { protected def lakeFormat: Option[DataLakeFormat] = None test("Catalog: namespaces") { - // Always a default database 'fluss'. - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + // Always a default database 'fluss' and system database 'sys'. + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(SYSTEM_DATABASE) :: Nil) sql("CREATE DATABASE testdb COMMENT 'created by spark'") - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("testdb") :: Nil) + checkAnswer( + sql("SHOW DATABASES"), + Row(DEFAULT_DATABASE) :: Row(SYSTEM_DATABASE) :: Row("testdb") :: Nil) checkAnswer( sql("DESC DATABASE testdb").filter("info_name != 'Owner'"), @@ -51,7 +53,7 @@ class SparkCatalogTest extends FlussSparkTestBase { ) sql("DROP DATABASE testdb") - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(SYSTEM_DATABASE) :: Nil) } test("Catalog: basic table") { @@ -161,7 +163,9 @@ class SparkCatalogTest extends FlussSparkTestBase { val dbDesc = DatabaseDescriptor.builder().comment("created by admin").build() admin.createDatabase(dbName, dbDesc, true).get() assert(catalog.namespaceExists(Array(dbName))) - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(dbName) :: Nil) + checkAnswer( + sql("SHOW DATABASES"), + Row(DEFAULT_DATABASE) :: Row(dbName) :: Row(SYSTEM_DATABASE) :: Nil) // check table val tablePath = TablePath.of(dbName, tblName) @@ -196,7 +200,7 @@ class SparkCatalogTest extends FlussSparkTestBase { checkAnswer(sql(s"SHOW TABLES IN $dbName"), Nil) admin.dropDatabase(dbName, true, true).get() - checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row(SYSTEM_DATABASE) :: Nil) } protected def modifyTablePropertiesWithCheck(): Unit = {