Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public Table loadTable() {

@Override
public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
if (catalog instanceof Closeable closeableCatalog) {
closeableCatalog.close();
}

catalog = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Closeable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
Expand Down Expand Up @@ -47,6 +48,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final TableCreator tableCreator;
private final boolean caseSensitive;

private transient Catalog catalog;
private transient TableMetadataCache tableCache;
private transient HashKeyGenerator hashKeyGenerator;
private transient TableUpdater updater;
Expand Down Expand Up @@ -78,7 +80,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
Catalog catalog = catalogLoader.loadCatalog();
this.catalog = catalogLoader.loadCatalog();
this.tableCache =
new TableMetadataCache(
catalog,
Expand Down Expand Up @@ -195,5 +197,13 @@ public void close() {
} catch (Exception e) {
throw new RuntimeException(e);
}

if (catalog instanceof Closeable closeableCatalog) {
try {
closeableCatalog.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
Comment on lines +202 to +206
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we standardize on a single way of writing this? Found different style in 1.20 vs 2.1

if (catalog instanceof Closeable rs) 

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course. What do you think about changing the runtime exception to a more specific exception?

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Closeable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
Expand Down Expand Up @@ -45,6 +46,7 @@ class DynamicTableUpdateOperator
private final TableCreator tableCreator;
private final boolean caseSensitive;

private transient Catalog catalog;
private transient TableUpdater updater;

DynamicTableUpdateOperator(
Expand All @@ -67,7 +69,7 @@ class DynamicTableUpdateOperator
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
Catalog catalog = catalogLoader.loadCatalog();
this.catalog = catalogLoader.loadCatalog();
this.updater =
new TableUpdater(
new TableMetadataCache(
Expand Down Expand Up @@ -101,4 +103,12 @@ public DynamicRecordInternal map(DynamicRecordInternal data) throws Exception {

return data;
}

@Override
public void close() throws Exception {
super.close();
if (catalog instanceof Closeable closeableCatalog) {
closeableCatalog.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -219,4 +220,12 @@ private PartitionSpec spec(String tableName, int specId) {
Table table = catalog.loadTable(TableIdentifier.parse(tableName));
return table.specs().get(specId);
}

@Override
public void close() throws Exception {
super.close();
if (catalog instanceof Closeable closeableCatalog) {
closeableCatalog.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
Expand All @@ -28,6 +31,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -106,6 +110,16 @@ int maximumSize() {
return maximumSize;
}

private static void closeCatalog(Catalog catalog) {
if (catalog instanceof Closeable closeableCatalog) {
try {
closeableCatalog.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close catalog", e);
}
}
}

private class SerializerInfo {
private final String tableName;
private final Map<Schema, RowDataSerializer> serializers;
Expand All @@ -120,9 +134,14 @@ private class SerializerInfo {
}

private void update() {
Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName));
schemas = table.schemas();
specs = table.specs();
Catalog loadedCatalog = catalogLoader.loadCatalog();
try {
Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName));
schemas = table.schemas();
specs = table.specs();
} finally {
closeCatalog(loadedCatalog);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@
import static org.apache.iceberg.types.Types.StringType;
import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Path;
import java.util.Collections;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class TestTableSerializerCache {
class TestTableSerializerCache {

@TempDir private Path tempDir;

@RegisterExtension
static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table");
Expand Down Expand Up @@ -121,4 +128,36 @@ void testCacheSize() {
cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000);
assertThat(cache.maximumSize()).isEqualTo(1000);
}

@Test
void testSchemaLookupClosesCloseableCatalog() {
InMemoryCatalog closeableCatalog = new InMemoryCatalog();
closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString()));
closeableCatalog.createNamespace(Namespace.of("db"));
Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1);

CatalogLoader closeableCatalogLoader =
new CatalogLoader() {
@Override
public Catalog loadCatalog() {
return closeableCatalog;
}

@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
@Override
public CatalogLoader clone() {
return this;
}
};

TableSerializerCache tableSerializerCache =
new TableSerializerCache(closeableCatalogLoader, 10);

Tuple3<RowDataSerializer, Schema, PartitionSpec> serializerWithSchemaAndSpec =
tableSerializerCache.serializerWithSchemaAndSpec(
"db.table", table.schema().schemaId(), table.spec().specId());

assertThat(serializerWithSchemaAndSpec).isNotNull();
assertThat(closeableCatalog.listNamespaces()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public Table loadTable() {

@Override
public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
if (catalog instanceof Closeable closeableCatalog) {
closeableCatalog.close();
}

catalog = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
Expand All @@ -28,6 +31,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -106,6 +110,16 @@ int maximumSize() {
return maximumSize;
}

private static void closeCatalog(Catalog catalog) {
if (catalog instanceof Closeable closeableCatalog) {
try {
closeableCatalog.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close catalog", e);
}
}
}

private class SerializerInfo {
private final String tableName;
private final Map<Schema, RowDataSerializer> serializers;
Expand All @@ -120,9 +134,14 @@ private class SerializerInfo {
}

private void update() {
Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName));
schemas = table.schemas();
specs = table.specs();
Catalog loadedCatalog = catalogLoader.loadCatalog();
try {
Table table = loadedCatalog.loadTable(TableIdentifier.parse(tableName));
schemas = table.schemas();
specs = table.specs();
} finally {
closeCatalog(loadedCatalog);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@
import static org.apache.iceberg.types.Types.StringType;
import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Path;
import java.util.Collections;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TestTableSerializerCache {

@TempDir private Path tempDir;

@RegisterExtension
static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table");

Expand Down Expand Up @@ -121,4 +128,36 @@ void testCacheSize() {
cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000);
assertThat(cache.maximumSize()).isEqualTo(1000);
}

@Test
void testSchemaLookupClosesCloseableCatalog() {
InMemoryCatalog closeableCatalog = new InMemoryCatalog();
closeableCatalog.initialize("test", Collections.singletonMap("warehouse", tempDir.toString()));
closeableCatalog.createNamespace(Namespace.of("db"));
Table table = closeableCatalog.createTable(TableIdentifier.of("db", "table"), schema1);

CatalogLoader closeableCatalogLoader =
new CatalogLoader() {
@Override
public Catalog loadCatalog() {
return closeableCatalog;
}

@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
@Override
public CatalogLoader clone() {
return this;
}
};

TableSerializerCache tableSerializerCache =
new TableSerializerCache(closeableCatalogLoader, 10);

Tuple3<RowDataSerializer, Schema, PartitionSpec> serializerWithSchemaAndSpec =
tableSerializerCache.serializerWithSchemaAndSpec(
"db.table", table.schema().schemaId(), table.spec().specId());

assertThat(serializerWithSchemaAndSpec).isNotNull();
assertThat(closeableCatalog.listNamespaces()).isEmpty();
}
}
7 changes: 7 additions & 0 deletions flink/v2.1/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation(libs.flink21.test.utilsjunit) {
exclude group: 'junit'
}
testImplementation project(':iceberg-aws')
testImplementation(platform(libs.awssdk.bom))
testImplementation "software.amazon.awssdk:s3"
testImplementation "software.amazon.awssdk:kms"
testImplementation "software.amazon.awssdk:sts"
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.minio
testImplementation(libs.flink21.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public Table loadTable() {

@Override
public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
if (catalog instanceof Closeable closeableCatalog) {
closeableCatalog.close();
}

catalog = null;
Expand Down
Loading
Loading