Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
Expand Down Expand Up @@ -100,10 +101,14 @@ public Admin getAdmin() {

@Override
public Table getTable(TablePath tablePath) {
// force to update the table info from server to avoid stale data in cache.
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
Admin admin = getOrCreateAdmin();
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
TableInfo tableInfo = admin.getTableInfo(tablePath).join();
if (!tableInfo.isSystemView()) {
// System views have no buckets/partitions, skip TabletServer metadata update.
// Regular tables need fresh bucket leader info.
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
}
return new FlussTable(this, tablePath, tableInfo);
}

public MetadataUpdater getMetadataUpdater() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.SystemTableConstants;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -315,7 +316,10 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
// clusters do not include the remote data dir
r.hasRemoteDataDir() ? r.getRemoteDataDir() : null,
r.getCreatedTime(),
r.getModifiedTime()));
r.getModifiedTime(),
r.hasTableKind()
? r.getTableKind()
: SystemTableConstants.TABLE_KIND_TABLE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.client.lookup.TableLookup;
import org.apache.fluss.client.metadata.ClientSchemaGetter;
import org.apache.fluss.client.table.scanner.Scan;
import org.apache.fluss.client.table.scanner.SystemViewScan;
import org.apache.fluss.client.table.scanner.TableScan;
import org.apache.fluss.client.table.writer.Append;
import org.apache.fluss.client.table.writer.TableAppend;
Expand Down Expand Up @@ -64,17 +65,22 @@ public TableInfo getTableInfo() {

@Override
public Scan newScan() {
if (tableInfo.isSystemView()) {
return new SystemViewScan(tableInfo, conn);
}
return new TableScan(conn, tableInfo, schemaGetter);
}

@Override
public Lookup newLookup() {
checkState(!tableInfo.isSystemView(), "System view %s doesn't support lookups.", tablePath);
return new TableLookup(
tableInfo, schemaGetter, conn.getMetadataUpdater(), conn.getOrCreateLookupClient());
}

@Override
public Append newAppend() {
checkState(!tableInfo.isSystemView(), "System view %s doesn't support writes.", tablePath);
checkState(
!hasPrimaryKey,
"Table %s is not a Log Table and doesn't support AppendWriter.",
Expand All @@ -84,6 +90,7 @@ public Append newAppend() {

@Override
public Upsert newUpsert() {
checkState(!tableInfo.isSystemView(), "System view %s doesn't support writes.", tablePath);
checkState(
hasPrimaryKey,
"Table %s is not a Primary Key Table and doesn't support UpsertWriter.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.table.scanner;

import org.apache.fluss.client.FlussConnection;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.scanner.batch.SystemViewBatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;

import java.util.List;

/**
* A {@link Scan} implementation for system views.
*
* <p>System views are virtual tables with no independent storage, no buckets, and no partitions.
* They only support bounded batch scans via {@link #createBatchScanner()}. Streaming scans and
* bucket-based batch scans are not supported.
*/
public class SystemViewScan implements Scan {

private final TableInfo tableInfo;
private final FlussConnection conn;
@Nullable private final int[] projectedColumns;
@Nullable private final Predicate filterPredicate;

public SystemViewScan(TableInfo tableInfo, FlussConnection conn) {
this(tableInfo, conn, null, null);
}

private SystemViewScan(
TableInfo tableInfo,
FlussConnection conn,
@Nullable int[] projectedColumns,
@Nullable Predicate filterPredicate) {
this.tableInfo = tableInfo;
this.conn = conn;
this.projectedColumns = projectedColumns;
this.filterPredicate = filterPredicate;
}

@Override
public Scan project(@Nullable int[] projectedColumns) {
return new SystemViewScan(tableInfo, conn, projectedColumns, filterPredicate);
}

@Override
public Scan project(List<String> projectedColumnNames) {
int[] columnIndexes = new int[projectedColumnNames.size()];
RowType rowType = tableInfo.getRowType();
for (int i = 0; i < projectedColumnNames.size(); i++) {
int index = rowType.getFieldIndex(projectedColumnNames.get(i));
if (index < 0) {
throw new IllegalArgumentException(
String.format(
"Field '%s' not found in system view schema. "
+ "Available fields: %s, View: %s",
projectedColumnNames.get(i),
rowType.getFieldNames(),
tableInfo.getTablePath()));
}
columnIndexes[i] = index;
}
return new SystemViewScan(tableInfo, conn, columnIndexes, filterPredicate);
}

@Override
public Scan limit(int rowNumber) {
throw new UnsupportedOperationException(
"System views do not support limit pushdown. View: " + tableInfo.getTablePath());
}

@Override
public Scan filter(@Nullable Predicate predicate) {
return new SystemViewScan(tableInfo, conn, projectedColumns, predicate);
}

@Override
public LogScanner createLogScanner() {
throw new UnsupportedOperationException(
"System views do not support streaming log scan. "
+ "Use createBatchScanner() instead. View: "
+ tableInfo.getTablePath());
}

@Override
public <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass) {
throw new UnsupportedOperationException(
"System views do not support streaming log scan. "
+ "Use createBatchScanner() instead. View: "
+ tableInfo.getTablePath());
}

@Override
public BatchScanner createBatchScanner(TableBucket tableBucket) {
throw new UnsupportedOperationException(
"System views do not support bucket-based batch scan. "
+ "Use createBatchScanner() (no args) instead. View: "
+ tableInfo.getTablePath());
}

@Override
public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) {
throw new UnsupportedOperationException(
"System views do not support snapshot batch scan. "
+ "Use createBatchScanner() (no args) instead. View: "
+ tableInfo.getTablePath());
}

@Override
public BatchScanner createBatchScanner() {
return new SystemViewBatchScanner(
tableInfo, conn.getMetadataUpdater(), projectedColumns, filterPredicate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.table.scanner.batch;

import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.record.DefaultValueRecordBatch;
import org.apache.fluss.record.ValueRecord;
import org.apache.fluss.record.ValueRecordBatch;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.decode.RowDecoder;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.ScanSystemViewRequest;
import org.apache.fluss.rpc.messages.ScanSystemViewResponse;
import org.apache.fluss.rpc.util.PredicateMessageUtils;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.CloseableIterator;

import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A {@link BatchScanner} that reads data from a system view by sending a {@code
* ScanSystemViewRequest} to the coordinator server.
*
* <p>System views have no buckets or partitions — the coordinator returns all matching rows in a
* single response. This scanner is always bounded (one-shot request/response).
*/
public class SystemViewBatchScanner implements BatchScanner {

private final CompletableFuture<ScanSystemViewResponse> scanFuture;
private final RowType responseRowType;
private boolean endOfInput;

public SystemViewBatchScanner(
TableInfo tableInfo,
MetadataUpdater metadataUpdater,
@Nullable int[] projectedFields,
@Nullable Predicate filterPredicate) {
this.endOfInput = false;

TablePath tablePath = tableInfo.getTablePath();
RowType rowType = tableInfo.getRowType();

// Build RowType with sequential field IDs for predicate serialization
List<DataField> fieldsWithIds = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
DataField f = rowType.getFields().get(i);
fieldsWithIds.add(new DataField(f.getName(), f.getType(), i));
}
RowType fullRowType = new RowType(rowType.isNullable(), fieldsWithIds);

// Determine the row type of the response (projected if applicable)
this.responseRowType =
projectedFields != null ? fullRowType.project(projectedFields) : fullRowType;

ScanSystemViewRequest request = new ScanSystemViewRequest();
request.setDatabaseName(tablePath.getDatabaseName());
request.setViewName(tablePath.getTableName());
request.setSchemaId(String.valueOf(tableInfo.getSchemaId()));

if (projectedFields != null) {
request.setProjectedFields(projectedFields);
}

if (filterPredicate != null) {
request.setFilterPredicate(
PredicateMessageUtils.toPbPredicate(filterPredicate, fullRowType));
}

CoordinatorGateway coordinatorGateway = metadataUpdater.newCoordinatorServerClient();
this.scanFuture = coordinatorGateway.scanSystemView(request);
}

@Nullable
@Override
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
if (endOfInput) {
return null;
}
try {
ScanSystemViewResponse response =
scanFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (response.hasErrorCode()) {
throw new FlussRuntimeException(
"Failed to scan system view: " + response.getErrorMessage());
}
List<InternalRow> rows = decodeResponse(response);
endOfInput = true;
return CloseableIterator.wrap(rows.iterator());
} catch (TimeoutException e) {
return CloseableIterator.emptyIterator();
} catch (FlussRuntimeException e) {
throw new IOException(e);
} catch (Exception e) {
throw new IOException(e);
}
}

private List<InternalRow> decodeResponse(ScanSystemViewResponse response) {
if (!response.hasRecords()) {
return new ArrayList<>();
}

byte[] recordBytes = response.getRecords();
DataType[] fieldTypes = responseRowType.getFieldTypes().toArray(new DataType[0]);
RowDecoder decoder = RowDecoder.create(KvFormat.INDEXED, fieldTypes);
ValueRecordBatch.ReadContext readContext = schemaId -> decoder;

DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(recordBytes);

List<InternalRow> rows = new ArrayList<>();
for (ValueRecord record : batch.records(readContext)) {
InternalRow row = record.getRow();
if (row != null) {
rows.add(row);
}
}
return rows;
}

@Override
public void close() throws IOException {
scanFuture.cancel(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ void testListDatabasesAndTables() throws Exception {
admin.createDatabase("db2", DatabaseDescriptor.EMPTY, true).get();
admin.createDatabase("db3", DatabaseDescriptor.EMPTY, true).get();
assertThat(admin.listDatabases().get())
.containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss");
.containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss", "sys");
Map<String, Integer> databaseSummaries =
admin.listDatabaseSummaries().get().stream()
.collect(
Expand Down
Loading