From e84d57f7ae5724b9ec929c137a8f2b3438a89ba8 Mon Sep 17 00:00:00 2001 From: Han You Date: Mon, 20 Apr 2026 13:16:36 -0500 Subject: [PATCH 1/3] Allow setting slot sharing group for fine-grained resource management Currently all operators created by the dynamic sink are part of the default slot sharing group, and thus getting an equal share of the resources on taskmanagers. However, it is usually the case that the sink and the generator operators are far more resource-heavy than the rest of the operators, making the default resource allocation inefficient. Flink already supports fine-grained resource management mechanism to support use cases exactly like this. This change adds support to wire the dynamic sink into that system, by allowing the users to set slot sharing groups for 1. the shuffle writer 2. the generator+the forward writer -- they need to share the same slot sharing group to enable operator chaining. --- docs/docs/flink-writes.md | 2 + .../sink/dynamic/DynamicIcebergSink.java | 26 +++++++- .../sink/dynamic/TestDynamicIcebergSink.java | 62 +++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 09fa22b640c7..cba272e9f463 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -546,6 +546,8 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `setAll(Map properties)` | Set multiple properties at once | | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | +| `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. | +| `generatorAndForwardSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator and forward sink. | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 7b0de6fbe9e3..9e4ce2df0cba 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -21,8 +21,10 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import javax.annotation.Nullable; import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; @@ -241,6 +243,8 @@ public static class Builder { private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; private boolean caseSensitive = true; + @Nullable private SlotSharingGroup generatorAndForwardSinkSlotSharingGroup; + @Nullable private SlotSharingGroup shuffeSinkSlotSharingGroup; Builder() {} @@ -316,6 +320,16 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } + public Builder generatorAndForwardSinkSlotSharingGroup(SlotSharingGroup ssg) { + generatorAndForwardSinkSlotSharingGroup = ssg; + return this; + } + + public Builder shuffleSinkSlotSharingGroup(SlotSharingGroup ssg) { + shuffeSinkSlotSharingGroup = ssg; + return this; + } + /** * Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of * multiple operators (like writer, committer, aggregator) Actual operator uid will be appended @@ -435,7 +449,7 @@ private DynamicIcebergSink build( TypeInformation> writeResultTypeInfo = CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); - DataStream> forwardWriteResults = + SingleOutputStreamOperator> forwardWriteResults = converted .getSideOutput( new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType)) @@ -446,6 +460,10 @@ private DynamicIcebergSink build( .setParallelism(converted.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-forward-writer")); + if (generatorAndForwardSinkSlotSharingGroup != null) { + forwardWriteResults.slotSharingGroup(generatorAndForwardSinkSlotSharingGroup); + } + // Inject forward write results into sink — they'll be unioned in addPreCommitTopology return instantiateSink(writeOptions, flinkConfig, forwardWriteResults); } @@ -507,6 +525,9 @@ public DataStreamSink append() { .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); + if (generatorAndForwardSinkSlotSharingGroup != null) { + converted.slotSharingGroup(generatorAndForwardSinkSlotSharingGroup); + } DynamicIcebergSink sink = build(converted, sideOutputType); @@ -535,6 +556,9 @@ public DataStreamSink append() { shuffleInput .sinkTo(sink) // Forward write results are implicitly injected here .uid(prefixIfNotNull(uidPrefix, "-sink")); + if (shuffeSinkSlotSharingGroup != null) { + result.slotSharingGroup(shuffeSinkSlotSharingGroup); + } FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); if (flinkWriteConf.writeParallelism() != null) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 4e7511501014..4af9201521a1 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -35,12 +35,14 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.OperatorIDPair; @@ -330,6 +332,66 @@ void testNoShuffleTopology() throws Exception { assertThat(generatorAndSinkChained).isTrue(); } + @Test + void testSlotSharingGroup() { + DataStream dataStream = + env.fromData(Collections.emptyList(), TypeInformation.of(new TypeHint<>() {})); + + // This test is a little awkward because slot sharing group is translated to internal + // representation after the job graph is built, losing object reference equality. Therefore, we + // can only test the effect of applying an SSG: in other word, we test that the resource + // requirements are applied to the job graph. + MemorySize shuffleSinkMemorySize = new MemorySize(123); + SlotSharingGroup shuffleSinkSSG = + SlotSharingGroup.newBuilder("shuffle-sink-ssg") + .setCpuCores(123) + .setTaskHeapMemory(shuffleSinkMemorySize) + .build(); + + MemorySize generatorAndForwardSinkMemorySize = new MemorySize(456); + SlotSharingGroup generatorAndForwardSinkSSG = + SlotSharingGroup.newBuilder("generator-and-forward-ssg") + .setCpuCores(456) + .setTaskHeapMemory(generatorAndForwardSinkMemorySize) + .build(); + + DynamicIcebergSink.forInput(dataStream) + .generator(new ForwardGenerator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .immediateTableUpdate(false) + .shuffleSinkSlotSharingGroup(shuffleSinkSSG) + .generatorAndForwardSinkSlotSharingGroup(generatorAndForwardSinkSSG) + .append(); + + List vertices = + StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(), false) + .toList(); + + boolean shufflingWriterSSGApplied = + vertices.stream() + .filter(vertex -> vertex.getName() != null && vertex.getName().contains("Sink: Writer")) + .anyMatch( + vertex -> + vertex + .getSlotSharingGroup() + .getResourceProfile() + .getTaskHeapMemory() + .equals(shuffleSinkMemorySize)); + boolean generatorAndForwardWriterSSGApplied = + vertices.stream() + .filter(vertex -> vertex.getName() != null && vertex.getName().contains("generator")) + .anyMatch( + vertex -> + vertex + .getSlotSharingGroup() + .getResourceProfile() + .getTaskHeapMemory() + .equals(generatorAndForwardSinkMemorySize)); + + assertThat(shufflingWriterSSGApplied).isTrue(); + assertThat(generatorAndForwardWriterSSGApplied).isTrue(); + } + @Test void testForwardWrite() throws Exception { runForwardWriteTest(new ForwardGenerator()); From 47b4c15b0e0eee165d01a7e80ca6458bf351ba47 Mon Sep 17 00:00:00 2001 From: Han You Date: Tue, 21 Apr 2026 10:19:51 -0500 Subject: [PATCH 2/3] PR comments --- docs/docs/flink-writes.md | 2 +- .../flink/sink/dynamic/DynamicIcebergSink.java | 14 +++++++------- .../sink/dynamic/TestDynamicIcebergSink.java | 16 ++++++++-------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index cba272e9f463..9cc4552afa02 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -547,7 +547,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | | `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. | -| `generatorAndForwardSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator and forward sink. | +| `generatorSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 9e4ce2df0cba..27676ea0e8ae 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -243,7 +243,7 @@ public static class Builder { private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; private boolean caseSensitive = true; - @Nullable private SlotSharingGroup generatorAndForwardSinkSlotSharingGroup; + @Nullable private SlotSharingGroup generatorSlotSharingGroup; @Nullable private SlotSharingGroup shuffeSinkSlotSharingGroup; Builder() {} @@ -320,8 +320,8 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } - public Builder generatorAndForwardSinkSlotSharingGroup(SlotSharingGroup ssg) { - generatorAndForwardSinkSlotSharingGroup = ssg; + public Builder generatorSlotSharingGroup(SlotSharingGroup ssg) { + generatorSlotSharingGroup = ssg; return this; } @@ -460,8 +460,8 @@ private DynamicIcebergSink build( .setParallelism(converted.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-forward-writer")); - if (generatorAndForwardSinkSlotSharingGroup != null) { - forwardWriteResults.slotSharingGroup(generatorAndForwardSinkSlotSharingGroup); + if (generatorSlotSharingGroup != null) { + forwardWriteResults.slotSharingGroup(generatorSlotSharingGroup); } // Inject forward write results into sink — they'll be unioned in addPreCommitTopology @@ -525,8 +525,8 @@ public DataStreamSink append() { .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); - if (generatorAndForwardSinkSlotSharingGroup != null) { - converted.slotSharingGroup(generatorAndForwardSinkSlotSharingGroup); + if (generatorSlotSharingGroup != null) { + converted.slotSharingGroup(generatorSlotSharingGroup); } DynamicIcebergSink sink = build(converted, sideOutputType); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 4af9201521a1..7dfec8a6194a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -348,11 +348,11 @@ void testSlotSharingGroup() { .setTaskHeapMemory(shuffleSinkMemorySize) .build(); - MemorySize generatorAndForwardSinkMemorySize = new MemorySize(456); - SlotSharingGroup generatorAndForwardSinkSSG = - SlotSharingGroup.newBuilder("generator-and-forward-ssg") + MemorySize generatorMemorySize = new MemorySize(456); + SlotSharingGroup generatorSSG = + SlotSharingGroup.newBuilder("generator-ssg") .setCpuCores(456) - .setTaskHeapMemory(generatorAndForwardSinkMemorySize) + .setTaskHeapMemory(generatorMemorySize) .build(); DynamicIcebergSink.forInput(dataStream) @@ -360,7 +360,7 @@ void testSlotSharingGroup() { .catalogLoader(CATALOG_EXTENSION.catalogLoader()) .immediateTableUpdate(false) .shuffleSinkSlotSharingGroup(shuffleSinkSSG) - .generatorAndForwardSinkSlotSharingGroup(generatorAndForwardSinkSSG) + .generatorSlotSharingGroup(generatorSSG) .append(); List vertices = @@ -377,7 +377,7 @@ void testSlotSharingGroup() { .getResourceProfile() .getTaskHeapMemory() .equals(shuffleSinkMemorySize)); - boolean generatorAndForwardWriterSSGApplied = + boolean generatorSSGApplied = vertices.stream() .filter(vertex -> vertex.getName() != null && vertex.getName().contains("generator")) .anyMatch( @@ -386,10 +386,10 @@ void testSlotSharingGroup() { .getSlotSharingGroup() .getResourceProfile() .getTaskHeapMemory() - .equals(generatorAndForwardSinkMemorySize)); + .equals(generatorMemorySize)); assertThat(shufflingWriterSSGApplied).isTrue(); - assertThat(generatorAndForwardWriterSSGApplied).isTrue(); + assertThat(generatorSSGApplied).isTrue(); } @Test From c74bf00e86939e3fb4a262308e4d0551d8a31c70 Mon Sep 17 00:00:00 2001 From: Han You Date: Thu, 23 Apr 2026 13:24:42 -0500 Subject: [PATCH 3/3] PR comment: add disableSlotSharing --- docs/docs/flink-writes.md | 1 + .../sink/dynamic/DynamicIcebergSink.java | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 9cc4552afa02..2253e04402fa 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -548,6 +548,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | | `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. | | `generatorSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). | +| `disableSlotSharing(boolean disabled)` | Put generator and sink into each own's unique slot sharing group. This explicitly prevent them from slot sharing with other tasks. | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 27676ea0e8ae..e84869e8df93 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -245,6 +245,7 @@ public static class Builder { private boolean caseSensitive = true; @Nullable private SlotSharingGroup generatorSlotSharingGroup; @Nullable private SlotSharingGroup shuffeSinkSlotSharingGroup; + private boolean disableSlotSharing = false; Builder() {} @@ -330,6 +331,16 @@ public Builder shuffleSinkSlotSharingGroup(SlotSharingGroup ssg) { return this; } + /** + * Put generator and sink into each own's unique slot sharing group. + * + * @return {@link Builder} to connect the iceberg table. + */ + public Builder disableSlotSharing(boolean newDisableSlotSharing) { + disableSlotSharing = newDisableSlotSharing; + return this; + } + /** * Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of * multiple operators (like writer, committer, aggregator) Actual operator uid will be appended @@ -438,6 +449,17 @@ private DynamicIcebergSink build( generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); + if (disableSlotSharing) { + if (generatorSlotSharingGroup == null) { + generatorSlotSharingGroup = + SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-generator")).build(); + } + if (shuffeSinkSlotSharingGroup == null) { + shuffeSinkSlotSharingGroup = + SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-sink")).build(); + } + } + Configuration flinkConfig = readableConfig instanceof Configuration ? (Configuration) readableConfig