From bf31d9ac8369cdb884024ff61856e9bc23050923 Mon Sep 17 00:00:00 2001 From: rmoff Date: Wed, 22 Apr 2026 15:45:40 +0100 Subject: [PATCH] Flink: Throw UnsupportedOperationException for materialized tables Co-Authored-By: Claude Opus 4.6 --- .../apache/iceberg/flink/FlinkCatalog.java | 6 +++++ .../iceberg/flink/TestFlinkCatalogTable.java | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..be5e2de06079 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; @@ -426,6 +427,11 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig + "create table without 'connector'='iceberg' related properties in an iceberg table."); } + if (table instanceof CatalogMaterializedTable) { + throw new UnsupportedOperationException( + "Materialized tables are not supported by Iceberg's Flink catalog."); + } + Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved"); createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index f7848a5d22ef..6d20f8a33d51 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -33,8 +33,10 @@ import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.iceberg.BaseTable; @@ -706,6 +708,31 @@ private void validateTableFiles(Table tbl, DataFile... expectedFiles) { assertThat(actualFilePaths).as("Files should match").isEqualTo(expectedFilePaths); } + @TestTemplate + public void testCreateMaterializedTableIsUnsupported() { + CatalogMaterializedTable materializedTable = + CatalogMaterializedTable.newBuilder() + .schema( + org.apache.flink.table.api.Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .build()) + .definitionQuery("SELECT id FROM tl") + .freshness(IntervalFreshness.ofMinute("5")) + .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC) + .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build(); + + assertThatThrownBy( + () -> + getTableEnv() + .getCatalog(catalogName) + .get() + .createTable(new ObjectPath(DATABASE, "mt_table"), materializedTable, false)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Materialized tables are not supported by Iceberg's Flink catalog."); + } + private Table table(String name) { return validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, name)); }