From 341bca71ce044eb1995c2bd663f3ad73920c79f8 Mon Sep 17 00:00:00 2001 From: donPain Date: Mon, 20 Apr 2026 15:43:00 -0300 Subject: [PATCH 1/5] Flink: Close catalog in dynamic sink operators to prevent resource leaks Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator, DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache) load a catalog via CatalogLoader but never close it. When the catalog is a RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and other resources held by the catalog's CloseableGroup. Fix each operator to close its catalog on shutdown if it implements Closeable. For TableSerializerCache, cache the catalog instance to avoid creating a new (leaked) catalog on every serializer update. Add tests for operator lifecycle with both Closeable and non-Closeable catalogs, and an integration test with RESTCatalog + S3FileIO. --- .../sink/dynamic/DynamicRecordProcessor.java | 12 +- .../dynamic/DynamicTableUpdateOperator.java | 12 +- .../dynamic/DynamicWriteResultAggregator.java | 9 ++ .../sink/dynamic/TableSerializerCache.java | 8 +- flink/v2.1/build.gradle | 9 ++ .../sink/dynamic/DynamicRecordProcessor.java | 12 +- .../dynamic/DynamicTableUpdateOperator.java | 12 +- .../dynamic/DynamicWriteResultAggregator.java | 9 ++ .../sink/dynamic/TableSerializerCache.java | 8 +- .../TestDynamicTableUpdateOperator.java | 86 ++++++++++ ...micTableUpdateOperatorWithRESTCatalog.java | 153 ++++++++++++++++++ 11 files changed, 324 insertions(+), 6 deletions(-) create mode 100644 flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 07dfad2780f7..6ddc63318714 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; @@ -47,6 +48,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction serializers; + private transient Catalog catalog; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -120,7 +122,11 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); + if (catalog == null) { + catalog = catalogLoader.loadCatalog(); + } + + Table table = catalog.loadTable(TableIdentifier.parse(tableName)); schemas = table.schemas(); specs = table.specs(); } diff --git a/flink/v2.1/build.gradle b/flink/v2.1/build.gradle index a08cb1d5ebdd..76d5e193af1f 100644 --- a/flink/v2.1/build.gradle +++ b/flink/v2.1/build.gradle @@ -74,6 +74,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { testImplementation(libs.flink21.test.utilsjunit) { exclude group: 'junit' } + testImplementation project(':iceberg-aws') + testImplementation(platform(libs.awssdk.bom)) + testImplementation "software.amazon.awssdk:s3" + testImplementation "software.amazon.awssdk:auth" + testImplementation "software.amazon.awssdk:kms" + testImplementation "software.amazon.awssdk:sts" + testImplementation libs.testcontainers + testImplementation libs.testcontainers.junit.jupiter + testImplementation libs.testcontainers.minio testImplementation(libs.flink21.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index fc6892b2cd9e..d2f6a9315087 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; @@ -49,6 +50,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction serializers; + private transient Catalog catalog; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -120,7 +122,11 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); + if (catalog == null) { + catalog = catalogLoader.loadCatalog(); + } + + Table table = catalog.loadTable(TableIdentifier.parse(tableName)); schemas = table.schemas(); specs = table.specs(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 1c8e6df8591d..c6d5a0fb5bad 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -22,16 +22,21 @@ import static org.apache.iceberg.flink.TestFixtures.TABLE; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; import java.util.Collections; import org.apache.flink.table.data.GenericRowData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,6 +48,8 @@ class TestDynamicTableUpdateOperator { private static final boolean DROP_COLUMNS = true; private static final boolean PRESERVE_COLUMNS = false; + @TempDir private Path tempDir; + @RegisterExtension private static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension(DATABASE, TABLE); @@ -132,6 +139,85 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } + @Test + void testCloseOperatorWithNonCloseableCatalog() throws Exception { + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + 10, + 1000, + 10, + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + operator.map(input); + + // HadoopCatalog is not Closeable, so close should complete without error + operator.close(); + } + + @Test + void testCloseOperatorWithCloseableCatalog() throws Exception { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of(DATABASE)); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + closeableCatalogLoader, + 10, + 1000, + 10, + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + DATABASE + "." + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + operator.map(input); + + // InMemoryCatalog implements Closeable, so close should invoke catalog.close() + operator.close(); + + // After close, the catalog's internal state should be cleared + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testCaseInSensitivity(boolean caseSensitive) throws Exception { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java new file mode 100644 index 000000000000..407045742a49 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.table.data.GenericRowData; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTCatalogAdapter; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MinIOContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; + +@Testcontainers +class TestDynamicTableUpdateOperatorWithRESTCatalog { + + private static final String BUCKET = "iceberg-test-bucket"; + private static final String MINIO_IMAGE = "minio/minio:RELEASE.2023-09-04T19-57-37Z"; + + @Container + private static final MinIOContainer MINIO = + new MinIOContainer(MINIO_IMAGE).withUserName("testuser").withPassword("testpassword"); + + private static final Schema SCHEMA = + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + private DynamicTableUpdateOperator operator; + private RESTCatalog restCatalog; + private S3Client s3Client; + + @BeforeEach + void before() throws Exception { + s3Client = + S3Client.builder() + .endpointOverride(URI.create(MINIO.getS3URL())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(MINIO.getUserName(), MINIO.getPassword()))) + .region(Region.US_EAST_1) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build(); + + s3Client.createBucket(b -> b.bucket(BUCKET)); + + Map s3Props = Maps.newHashMap(); + s3Props.put("s3.endpoint", MINIO.getS3URL()); + s3Props.put("s3.access-key-id", MINIO.getUserName()); + s3Props.put("s3.secret-access-key", MINIO.getPassword()); + s3Props.put("s3.path-style-access", "true"); + s3Props.put("client.region", "us-east-1"); + + InMemoryCatalog backendCatalog = new InMemoryCatalog(); + Map backendProps = Maps.newHashMap(s3Props); + backendProps.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + BUCKET + "/warehouse"); + backendProps.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); + backendCatalog.initialize("backend", backendProps); + backendCatalog.createNamespace(Namespace.of("default")); + + RESTCatalogAdapter adapter = new RESTCatalogAdapter(backendCatalog); + restCatalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), properties -> adapter); + restCatalog.initialize("test-rest", Maps.newHashMap(backendProps)); + + CatalogLoader restCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return restCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + operator = + new DynamicTableUpdateOperator( + restCatalogLoader, 10, 1000, 10, TableCreator.DEFAULT, true, false); + } + + @AfterEach + void after() throws Exception { + if (operator != null) { + // operator.close() closes the catalog (restCatalog) via Closeable check + operator.close(); + } else if (restCatalog != null) { + restCatalog.close(); + } + + if (s3Client != null) { + s3Client.close(); + } + } + + @Test + void testOperatorWithS3FileIO() throws Exception { + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + "default.test_table", + "main", + SCHEMA, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + + DynamicRecordInternal output = operator.map(input); + assertThat(output).isEqualTo(input); + } +} From d1812876f8f6bc4af5cf00e774a03208f888b136 Mon Sep 17 00:00:00 2001 From: donPain Date: Tue, 21 Apr 2026 18:38:22 -0300 Subject: [PATCH 2/5] fix: enable auto-cast for instance of, remove unnecessary dependencies, and add missing close catalog. --- .../org/apache/iceberg/flink/TableLoader.java | 4 +- .../sink/dynamic/DynamicRecordProcessor.java | 4 +- .../dynamic/DynamicTableUpdateOperator.java | 4 +- .../dynamic/DynamicWriteResultAggregator.java | 4 +- .../sink/dynamic/TableSerializerCache.java | 27 +++++++++---- .../dynamic/TestTableSerializerCache.java | 40 ++++++++++++++++++- .../org/apache/iceberg/flink/TableLoader.java | 4 +- .../sink/dynamic/TableSerializerCache.java | 25 ++++++++++-- .../dynamic/TestTableSerializerCache.java | 38 ++++++++++++++++++ flink/v2.1/build.gradle | 2 - .../org/apache/iceberg/flink/TableLoader.java | 4 +- .../sink/dynamic/TableSerializerCache.java | 27 +++++++++---- .../TestDynamicTableUpdateOperator.java | 29 -------------- .../dynamic/TestTableSerializerCache.java | 39 ++++++++++++++++++ 14 files changed, 190 insertions(+), 61 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..769bf3da1179 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable rs) { + rs.close(); } catalog = null; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 6ddc63318714..cfe41c8b5482 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -198,9 +198,9 @@ public void close() { throw new RuntimeException(e); } - if (catalog instanceof Closeable) { + if (catalog instanceof Closeable rs) { try { - ((Closeable) catalog).close(); + rs.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index fb70dff743d2..5941e297bc9a 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -107,8 +107,8 @@ public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable rs) { + rs.close(); } } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index d954c03348b2..b65cbe447049 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -224,8 +224,8 @@ private PartitionSpec spec(String tableName, int specId) { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable rs) { + rs.close(); } } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index b9549858aaae..d138fb22ce5f 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Map; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; @@ -52,7 +55,6 @@ class TableSerializerCache implements Serializable { private final CatalogLoader catalogLoader; private final int maximumSize; private transient Map serializers; - private transient Catalog catalog; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -108,6 +110,16 @@ int maximumSize() { return maximumSize; } + private static void closeCatalog(Catalog catalog) { + if (catalog instanceof Closeable closeableCatalog) { + try { + closeableCatalog.close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close catalog", e); + } + } + } + private class SerializerInfo { private final String tableName; private final Map serializers; @@ -122,13 +134,14 @@ private class SerializerInfo { } private void update() { - if (catalog == null) { - catalog = catalogLoader.loadCatalog(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); } - - Table table = catalog.loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index ec610a3357ba..1492d0ae1ae6 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,13 +34,18 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; -public class TestTableSerializerCache { +class TestTableSerializerCache { + + @TempDir private Path tempDir; @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,35 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..769bf3da1179 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable rs) { + rs.close(); } catalog = null; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index 84d0ed9be5d0..d138fb22ce5f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Map; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; @@ -28,6 +31,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -106,6 +110,16 @@ int maximumSize() { return maximumSize; } + private static void closeCatalog(Catalog catalog) { + if (catalog instanceof Closeable closeableCatalog) { + try { + closeableCatalog.close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close catalog", e); + } + } + } + private class SerializerInfo { private final String tableName; private final Map serializers; @@ -120,9 +134,14 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); + } } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1cf2c8bae001..1492d0ae1ae6 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,14 +34,19 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; class TestTableSerializerCache { + @TempDir private Path tempDir; + @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,35 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } } diff --git a/flink/v2.1/build.gradle b/flink/v2.1/build.gradle index 76d5e193af1f..3f9e13bb52f7 100644 --- a/flink/v2.1/build.gradle +++ b/flink/v2.1/build.gradle @@ -77,10 +77,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { testImplementation project(':iceberg-aws') testImplementation(platform(libs.awssdk.bom)) testImplementation "software.amazon.awssdk:s3" - testImplementation "software.amazon.awssdk:auth" testImplementation "software.amazon.awssdk:kms" testImplementation "software.amazon.awssdk:sts" - testImplementation libs.testcontainers testImplementation libs.testcontainers.junit.jupiter testImplementation libs.testcontainers.minio testImplementation(libs.flink21.test.utils) { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..769bf3da1179 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable rs) { + rs.close(); } catalog = null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index b9549858aaae..d138fb22ce5f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Map; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; @@ -52,7 +55,6 @@ class TableSerializerCache implements Serializable { private final CatalogLoader catalogLoader; private final int maximumSize; private transient Map serializers; - private transient Catalog catalog; TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { this.catalogLoader = catalogLoader; @@ -108,6 +110,16 @@ int maximumSize() { return maximumSize; } + private static void closeCatalog(Catalog catalog) { + if (catalog instanceof Closeable closeableCatalog) { + try { + closeableCatalog.close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close catalog", e); + } + } + } + private class SerializerInfo { private final String tableName; private final Map serializers; @@ -122,13 +134,14 @@ private class SerializerInfo { } private void update() { - if (catalog == null) { - catalog = catalogLoader.loadCatalog(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); } - - Table table = catalog.loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index c6d5a0fb5bad..efecefffb42d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -139,35 +139,6 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } - @Test - void testCloseOperatorWithNonCloseableCatalog() throws Exception { - DynamicTableUpdateOperator operator = - new DynamicTableUpdateOperator( - CATALOG_EXTENSION.catalogLoader(), - 10, - 1000, - 10, - TableCreator.DEFAULT, - CASE_SENSITIVE, - PRESERVE_COLUMNS); - operator.open(null); - - DynamicRecordInternal input = - new DynamicRecordInternal( - TABLE, - "branch", - SCHEMA1, - GenericRowData.of(1), - PartitionSpec.unpartitioned(), - 42, - false, - Collections.emptySet()); - operator.map(input); - - // HadoopCatalog is not Closeable, so close should complete without error - operator.close(); - } - @Test void testCloseOperatorWithCloseableCatalog() throws Exception { InMemoryCatalog closeableCatalog = new InMemoryCatalog(); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1cf2c8bae001..62b70af7c039 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,14 +34,19 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; class TestTableSerializerCache { + @TempDir private Path tempDir; + @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,36 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } } From cd4e4a611ca33e2d6cc063a6d855d6b0b5a81177 Mon Sep 17 00:00:00 2001 From: donPain Date: Tue, 21 Apr 2026 18:58:52 -0300 Subject: [PATCH 3/5] fix: run spotlessApply --- .../iceberg/flink/sink/dynamic/TestTableSerializerCache.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1492d0ae1ae6..62b70af7c039 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -150,7 +150,8 @@ public CatalogLoader clone() { } }; - TableSerializerCache tableSerializerCache = new TableSerializerCache(closeableCatalogLoader, 10); + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); Tuple3 serializerWithSchemaAndSpec = tableSerializerCache.serializerWithSchemaAndSpec( From f030d6bc3ad34162864dde9500f51a4dd71f0227 Mon Sep 17 00:00:00 2001 From: donPain Date: Tue, 21 Apr 2026 20:19:56 -0300 Subject: [PATCH 4/5] fix: run spotlessApply --- .../iceberg/flink/sink/dynamic/TestTableSerializerCache.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1492d0ae1ae6..62b70af7c039 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -150,7 +150,8 @@ public CatalogLoader clone() { } }; - TableSerializerCache tableSerializerCache = new TableSerializerCache(closeableCatalogLoader, 10); + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); Tuple3 serializerWithSchemaAndSpec = tableSerializerCache.serializerWithSchemaAndSpec( From e2c4b719e5ffb313e117fa76f0dc95343d38f974 Mon Sep 17 00:00:00 2001 From: donPain Date: Wed, 22 Apr 2026 00:01:39 -0300 Subject: [PATCH 5/5] Rename closeable --- .../src/main/java/org/apache/iceberg/flink/TableLoader.java | 4 ++-- .../iceberg/flink/sink/dynamic/DynamicRecordProcessor.java | 4 ++-- .../flink/sink/dynamic/DynamicTableUpdateOperator.java | 4 ++-- .../flink/sink/dynamic/DynamicWriteResultAggregator.java | 4 ++-- .../src/main/java/org/apache/iceberg/flink/TableLoader.java | 4 ++-- .../src/main/java/org/apache/iceberg/flink/TableLoader.java | 4 ++-- .../iceberg/flink/sink/dynamic/DynamicRecordProcessor.java | 4 ++-- .../flink/sink/dynamic/DynamicTableUpdateOperator.java | 4 ++-- .../flink/sink/dynamic/DynamicWriteResultAggregator.java | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index 769bf3da1179..d3f19d1a9676 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index cfe41c8b5482..d79f43486c14 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -198,9 +198,9 @@ public void close() { throw new RuntimeException(e); } - if (catalog instanceof Closeable rs) { + if (catalog instanceof Closeable closeableCatalog) { try { - rs.close(); + closeableCatalog.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 5941e297bc9a..dc4d08691e7c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -107,8 +107,8 @@ public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index b65cbe447049..a67da1e6eac6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -224,8 +224,8 @@ private PartitionSpec spec(String tableName, int specId) { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index 769bf3da1179..d3f19d1a9676 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index 769bf3da1179..d3f19d1a9676 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index d2f6a9315087..34b4b38604df 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -218,9 +218,9 @@ public void close() { throw new RuntimeException(e); } - if (catalog instanceof Closeable rs) { + if (catalog instanceof Closeable closeableCatalog) { try { - rs.close(); + closeableCatalog.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 5941e297bc9a..dc4d08691e7c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -107,8 +107,8 @@ public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 632b437d485a..07ddbf7119ac 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -247,8 +247,8 @@ private PartitionSpec spec(String tableName, int specId) { @Override public void close() throws Exception { super.close(); - if (catalog instanceof Closeable rs) { - rs.close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } } }