Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044
Flink: Close catalog in dynamic sink operators to prevent resource leaks#16044donPain wants to merge 5 commits intoapache:mainfrom
Conversation
2f9bee7 to
e7dedc6
Compare
Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator, DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache) load a catalog via CatalogLoader but never close it. When the catalog is a RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and other resources held by the catalog's CloseableGroup. Fix each operator to close its catalog on shutdown if it implements Closeable. For TableSerializerCache, cache the catalog instance to avoid creating a new (leaked) catalog on every serializer update. Add tests for operator lifecycle with both Closeable and non-Closeable catalogs, and an integration test with RESTCatalog + S3FileIO.
|
Hey @nastra could you take a look at this PR when you have a moment? |
|
CC: @mxm, @Guosmilesmile |
| testImplementation project(':iceberg-aws') | ||
| testImplementation(platform(libs.awssdk.bom)) | ||
| testImplementation "software.amazon.awssdk:s3" | ||
| testImplementation "software.amazon.awssdk:auth" | ||
| testImplementation "software.amazon.awssdk:kms" | ||
| testImplementation "software.amazon.awssdk:sts" | ||
| testImplementation libs.testcontainers | ||
| testImplementation libs.testcontainers.junit.jupiter | ||
| testImplementation libs.testcontainers.minio |
There was a problem hiding this comment.
Do we really need all these dependencies to verify the fix?
There was a problem hiding this comment.
In my production scenario, I was using S3Tables, so I needed to simulate at least one object-store with a "rest" catalog
| if (catalog == null) { | ||
| catalog = catalogLoader.loadCatalog(); | ||
| } |
There was a problem hiding this comment.
Where are we closing this catalog?
| @Test | ||
| void testCloseOperatorWithNonCloseableCatalog() throws Exception { | ||
| DynamicTableUpdateOperator operator = | ||
| new DynamicTableUpdateOperator( | ||
| CATALOG_EXTENSION.catalogLoader(), | ||
| 10, | ||
| 1000, | ||
| 10, | ||
| TableCreator.DEFAULT, | ||
| CASE_SENSITIVE, | ||
| PRESERVE_COLUMNS); | ||
| operator.open(null); | ||
|
|
||
| DynamicRecordInternal input = | ||
| new DynamicRecordInternal( | ||
| TABLE, | ||
| "branch", | ||
| SCHEMA1, | ||
| GenericRowData.of(1), | ||
| PartitionSpec.unpartitioned(), | ||
| 42, | ||
| false, | ||
| Collections.emptySet()); | ||
| operator.map(input); | ||
|
|
||
| // HadoopCatalog is not Closeable, so close should complete without error | ||
| operator.close(); | ||
| } |
There was a problem hiding this comment.
Not sure we need this test. There won't be a way to call close on a catalog which doesn't implement Closeable.
There was a problem hiding this comment.
Yes, I added that to validate local tests and forgot to remove it, good point.
| if (catalog instanceof Closeable) { | ||
| ((Closeable) catalog).close(); | ||
| } |
| try { | ||
| ((Closeable) catalog).close(); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Can we standardize on a single way of writing this? Found different style in 1.20 vs 2.1
if (catalog instanceof Closeable rs)
There was a problem hiding this comment.
Yes, of course. What do you think about changing the runtime exception to a more specific exception?
| if (catalog instanceof Closeable) { | ||
| ((Closeable) catalog).close(); | ||
| } |
…s, and add missing close catalog.
Operators in the Flink dynamic sink pipeline (DynamicTableUpdateOperator, DynamicRecordProcessor, DynamicWriteResultAggregator, TableSerializerCache) load a catalog via CatalogLoader but never close it. When the catalog is a RESTCatalog backed by S3FileIO, this leaks HTTP clients, S3 clients, and other resources held by the catalog's CloseableGroup.
Fix each operator to close its catalog on shutdown if it implements Closeable. For TableSerializerCache, cache the catalog instance to avoid creating a new (leaked) catalog on every serializer update.
Add tests for operator lifecycle with both Closeable and non-Closeable catalogs, and an integration test with RESTCatalog + S3FileIO.
Warning exception trace:
