diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..d3f19d1a9676 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 07dfad2780f7..d79f43486c14 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; @@ -47,6 +48,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction serializers; @@ -120,9 +134,14 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); + } } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index ec610a3357ba..62b70af7c039 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,13 +34,18 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; -public class TestTableSerializerCache { +class TestTableSerializerCache { + + @TempDir private Path tempDir; @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,36 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..d3f19d1a9676 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java index 84d0ed9be5d0..d138fb22ce5f 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.Map; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; @@ -28,6 +31,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -106,6 +110,16 @@ int maximumSize() { return maximumSize; } + private static void closeCatalog(Catalog catalog) { + if (catalog instanceof Closeable closeableCatalog) { + try { + closeableCatalog.close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close catalog", e); + } + } + } + private class SerializerInfo { private final String tableName; private final Map serializers; @@ -120,9 +134,14 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); + } } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1cf2c8bae001..62b70af7c039 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,14 +34,19 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; class TestTableSerializerCache { + @TempDir private Path tempDir; + @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,36 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } } diff --git a/flink/v2.1/build.gradle b/flink/v2.1/build.gradle index a08cb1d5ebdd..3f9e13bb52f7 100644 --- a/flink/v2.1/build.gradle +++ b/flink/v2.1/build.gradle @@ -74,6 +74,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { testImplementation(libs.flink21.test.utilsjunit) { exclude group: 'junit' } + testImplementation project(':iceberg-aws') + testImplementation(platform(libs.awssdk.bom)) + testImplementation "software.amazon.awssdk:s3" + testImplementation "software.amazon.awssdk:kms" + testImplementation "software.amazon.awssdk:sts" + testImplementation libs.testcontainers.junit.jupiter + testImplementation libs.testcontainers.minio testImplementation(libs.flink21.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index da509451fee7..d3f19d1a9676 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -135,8 +135,8 @@ public Table loadTable() { @Override public void close() throws IOException { - if (catalog instanceof Closeable) { - ((Closeable) catalog).close(); + if (catalog instanceof Closeable closeableCatalog) { + closeableCatalog.close(); } catalog = null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index fc6892b2cd9e..34b4b38604df 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.dynamic; +import java.io.Closeable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.OpenContext; @@ -49,6 +50,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction serializers; @@ -120,9 +134,14 @@ private class SerializerInfo { } private void update() { - Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); - schemas = table.schemas(); - specs = table.specs(); + Catalog loadedCatalog = catalogLoader.loadCatalog(); + try { + Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } finally { + closeCatalog(loadedCatalog); + } } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 1c8e6df8591d..efecefffb42d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -22,16 +22,21 @@ import static org.apache.iceberg.flink.TestFixtures.TABLE; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; import java.util.Collections; import org.apache.flink.table.data.GenericRowData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,6 +48,8 @@ class TestDynamicTableUpdateOperator { private static final boolean DROP_COLUMNS = true; private static final boolean PRESERVE_COLUMNS = false; + @TempDir private Path tempDir; + @RegisterExtension private static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension(DATABASE, TABLE); @@ -132,6 +139,56 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } + @Test + void testCloseOperatorWithCloseableCatalog() throws Exception { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of(DATABASE)); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + closeableCatalogLoader, + 10, + 1000, + 10, + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + DATABASE + "." + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + operator.map(input); + + // InMemoryCatalog implements Closeable, so close should invoke catalog.close() + operator.close(); + + // After close, the catalog's internal state should be cleared + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testCaseInSensitivity(boolean caseSensitive) throws Exception { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java new file mode 100644 index 000000000000..407045742a49 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperatorWithRESTCatalog.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.util.Collections; +import java.util.Map; +import org.apache.flink.table.data.GenericRowData; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTCatalogAdapter; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MinIOContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; + +@Testcontainers +class TestDynamicTableUpdateOperatorWithRESTCatalog { + + private static final String BUCKET = "iceberg-test-bucket"; + private static final String MINIO_IMAGE = "minio/minio:RELEASE.2023-09-04T19-57-37Z"; + + @Container + private static final MinIOContainer MINIO = + new MinIOContainer(MINIO_IMAGE).withUserName("testuser").withPassword("testpassword"); + + private static final Schema SCHEMA = + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + private DynamicTableUpdateOperator operator; + private RESTCatalog restCatalog; + private S3Client s3Client; + + @BeforeEach + void before() throws Exception { + s3Client = + S3Client.builder() + .endpointOverride(URI.create(MINIO.getS3URL())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(MINIO.getUserName(), MINIO.getPassword()))) + .region(Region.US_EAST_1) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build(); + + s3Client.createBucket(b -> b.bucket(BUCKET)); + + Map s3Props = Maps.newHashMap(); + s3Props.put("s3.endpoint", MINIO.getS3URL()); + s3Props.put("s3.access-key-id", MINIO.getUserName()); + s3Props.put("s3.secret-access-key", MINIO.getPassword()); + s3Props.put("s3.path-style-access", "true"); + s3Props.put("client.region", "us-east-1"); + + InMemoryCatalog backendCatalog = new InMemoryCatalog(); + Map backendProps = Maps.newHashMap(s3Props); + backendProps.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + BUCKET + "/warehouse"); + backendProps.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); + backendCatalog.initialize("backend", backendProps); + backendCatalog.createNamespace(Namespace.of("default")); + + RESTCatalogAdapter adapter = new RESTCatalogAdapter(backendCatalog); + restCatalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), properties -> adapter); + restCatalog.initialize("test-rest", Maps.newHashMap(backendProps)); + + CatalogLoader restCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return restCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + operator = + new DynamicTableUpdateOperator( + restCatalogLoader, 10, 1000, 10, TableCreator.DEFAULT, true, false); + } + + @AfterEach + void after() throws Exception { + if (operator != null) { + // operator.close() closes the catalog (restCatalog) via Closeable check + operator.close(); + } else if (restCatalog != null) { + restCatalog.close(); + } + + if (s3Client != null) { + s3Client.close(); + } + } + + @Test + void testOperatorWithS3FileIO() throws Exception { + operator.open(null); + + DynamicRecordInternal input = + new DynamicRecordInternal( + "default.test_table", + "main", + SCHEMA, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + + DynamicRecordInternal output = operator.map(input); + assertThat(output).isEqualTo(input); + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java index 1cf2c8bae001..62b70af7c039 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -25,6 +25,8 @@ import static org.apache.iceberg.types.Types.StringType; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Collections; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; @@ -32,14 +34,19 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; class TestTableSerializerCache { + @TempDir private Path tempDir; + @RegisterExtension static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); @@ -121,4 +128,36 @@ void testCacheSize() { cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); assertThat(cache.maximumSize()).isEqualTo(1000); } + + @Test + void testSchemaLookupClosesCloseableCatalog() { + InMemoryCatalog closeableCatalog = new InMemoryCatalog(); + closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString())); + closeableCatalog.createNamespace(Namespace.of("db")); + Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1); + + CatalogLoader closeableCatalogLoader = + new CatalogLoader() { + @Override + public Catalog loadCatalog() { + return closeableCatalog; + } + + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + @Override + public CatalogLoader clone() { + return this; + } + }; + + TableSerializerCache tableSerializerCache = + new TableSerializerCache(closeableCatalogLoader, 10); + + Tuple3 serializerWithSchemaAndSpec = + tableSerializerCache.serializerWithSchemaAndSpec( + "db.table", table.schema().schemaId(), table.spec().specId()); + + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(closeableCatalog.listNamespaces()).isEmpty(); + } }