Skip to content

Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044

Open
donPain wants to merge 5 commits intoapache:mainfrom
donPain:main
Open

Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044
donPain wants to merge 5 commits intoapache:mainfrom
donPain:main

Conversation

@donPain
Copy link
Copy Markdown

@donPain donPain commented Apr 19, 2026

Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator, DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache) load a catalog via CatalogLoader but never close it. When the catalog is a RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and other resources held by the catalog's CloseableGroup.

Fix each operator to close its catalog on shutdown if it implements Closeable. For TableSerializerCache, cache the catalog instance to avoid creating a new (leaked) catalog on every serializer update.

Add tests for operator lifecycle with both Closeable and non-Closeable catalogs, and an integration test with RESTCatalog + S3FileIO.

Warning exception trace:
image

@donPain donPain force-pushed the main branch 3 times, most recently from 2f9bee7 to e7dedc6 Compare April 20, 2026 18:42
Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator,
DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache)
load a catalog via CatalogLoader but never close it. When the catalog is a
RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and
other resources held by the catalog's CloseableGroup.

Fix each operator to close its catalog on shutdown if it implements
Closeable. For TableSerializerCache, cache the catalog instance to avoid
creating a new (leaked) catalog on every serializer update.

Add tests for operator lifecycle with both Closeable and non-Closeable
catalogs, and an integration test with RESTCatalog + S3FileIO.
@donPain
Copy link
Copy Markdown
Author

donPain commented Apr 20, 2026

Hey @nastra could you take a look at this PR when you have a moment?

@pvary
Copy link
Copy Markdown
Contributor

pvary commented Apr 21, 2026

CC: @mxm, @Guosmilesmile

Copy link
Copy Markdown
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @donPain!

Comment thread flink/v2.1/build.gradle
Comment on lines +77 to +85
testImplementation project(':iceberg-aws')
testImplementation(platform(libs.awssdk.bom))
testImplementation "software.amazon.awssdk:s3"
testImplementation "software.amazon.awssdk:auth"
testImplementation "software.amazon.awssdk:kms"
testImplementation "software.amazon.awssdk:sts"
testImplementation libs.testcontainers
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.minio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need all these dependencies to verify the fix?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my production scenario, I was using S3Tables, so I needed to simulate at least one object-store with a "rest" catalog

Comment on lines +125 to +127
if (catalog == null) {
catalog = catalogLoader.loadCatalog();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we closing this catalog?

Comment on lines +142 to +169
@Test
void testCloseOperatorWithNonCloseableCatalog() throws Exception {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
10,
1000,
10,
TableCreator.DEFAULT,
CASE_SENSITIVE,
PRESERVE_COLUMNS);
operator.open(null);

DynamicRecordInternal input =
new DynamicRecordInternal(
TABLE,
"branch",
SCHEMA1,
GenericRowData.of(1),
PartitionSpec.unpartitioned(),
42,
false,
Collections.emptySet());
operator.map(input);

// HadoopCatalog is not Closeable, so close should complete without error
operator.close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need this test. There won't be a way to call close on a catalog which doesn't implement Closeable.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added that to validate local tests and forgot to remove it, good point.

Comment on lines +227 to +229
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same.

Comment on lines +202 to +206
try {
((Closeable) catalog).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we standardize on a single way of writing this? Found different style in 1.20 vs 2.1

if (catalog instanceof Closeable rs) 

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course. What do you think about changing the runtime exception to a more specific exception?

Comment on lines +110 to +112
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same

@donPain donPain requested a review from Guosmilesmile April 22, 2026 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants