diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 09fa22b640c7..2253e04402fa 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -546,6 +546,9 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `setAll(Map properties)` | Set multiple properties at once | | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | +| `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. | +| `generatorSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). | +| `disableSlotSharing(boolean disabled)` | Put generator and sink into each own's unique slot sharing group. This explicitly prevent them from slot sharing with other tasks. | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 7b0de6fbe9e3..e84869e8df93 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -21,8 +21,10 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import javax.annotation.Nullable; import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; @@ -241,6 +243,9 @@ public static class Builder { private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; private boolean caseSensitive = true; + @Nullable private SlotSharingGroup generatorSlotSharingGroup; + @Nullable private SlotSharingGroup shuffeSinkSlotSharingGroup; + private boolean disableSlotSharing = false; Builder() {} @@ -316,6 +321,26 @@ public Builder writeParallelism(int newWriteParallelism) { return this; } + public Builder generatorSlotSharingGroup(SlotSharingGroup ssg) { + generatorSlotSharingGroup = ssg; + return this; + } + + public Builder shuffleSinkSlotSharingGroup(SlotSharingGroup ssg) { + shuffeSinkSlotSharingGroup = ssg; + return this; + } + + /** + * Put generator and sink into each own's unique slot sharing group. + * + * @return {@link Builder} to connect the iceberg table. + */ + public Builder disableSlotSharing(boolean newDisableSlotSharing) { + disableSlotSharing = newDisableSlotSharing; + return this; + } + /** * Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of * multiple operators (like writer, committer, aggregator) Actual operator uid will be appended @@ -424,6 +449,17 @@ private DynamicIcebergSink build( generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); + if (disableSlotSharing) { + if (generatorSlotSharingGroup == null) { + generatorSlotSharingGroup = + SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-generator")).build(); + } + if (shuffeSinkSlotSharingGroup == null) { + shuffeSinkSlotSharingGroup = + SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-sink")).build(); + } + } + Configuration flinkConfig = readableConfig instanceof Configuration ? (Configuration) readableConfig @@ -435,7 +471,7 @@ private DynamicIcebergSink build( TypeInformation> writeResultTypeInfo = CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); - DataStream> forwardWriteResults = + SingleOutputStreamOperator> forwardWriteResults = converted .getSideOutput( new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType)) @@ -446,6 +482,10 @@ private DynamicIcebergSink build( .setParallelism(converted.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-forward-writer")); + if (generatorSlotSharingGroup != null) { + forwardWriteResults.slotSharingGroup(generatorSlotSharingGroup); + } + // Inject forward write results into sink — they'll be unioned in addPreCommitTopology return instantiateSink(writeOptions, flinkConfig, forwardWriteResults); } @@ -507,6 +547,9 @@ public DataStreamSink append() { .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); + if (generatorSlotSharingGroup != null) { + converted.slotSharingGroup(generatorSlotSharingGroup); + } DynamicIcebergSink sink = build(converted, sideOutputType); @@ -535,6 +578,9 @@ public DataStreamSink append() { shuffleInput .sinkTo(sink) // Forward write results are implicitly injected here .uid(prefixIfNotNull(uidPrefix, "-sink")); + if (shuffeSinkSlotSharingGroup != null) { + result.slotSharingGroup(shuffeSinkSlotSharingGroup); + } FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); if (flinkWriteConf.writeParallelism() != null) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 4e7511501014..7dfec8a6194a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -35,12 +35,14 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.OperatorIDPair; @@ -330,6 +332,66 @@ void testNoShuffleTopology() throws Exception { assertThat(generatorAndSinkChained).isTrue(); } + @Test + void testSlotSharingGroup() { + DataStream dataStream = + env.fromData(Collections.emptyList(), TypeInformation.of(new TypeHint<>() {})); + + // This test is a little awkward because slot sharing group is translated to internal + // representation after the job graph is built, losing object reference equality. Therefore, we + // can only test the effect of applying an SSG: in other word, we test that the resource + // requirements are applied to the job graph. + MemorySize shuffleSinkMemorySize = new MemorySize(123); + SlotSharingGroup shuffleSinkSSG = + SlotSharingGroup.newBuilder("shuffle-sink-ssg") + .setCpuCores(123) + .setTaskHeapMemory(shuffleSinkMemorySize) + .build(); + + MemorySize generatorMemorySize = new MemorySize(456); + SlotSharingGroup generatorSSG = + SlotSharingGroup.newBuilder("generator-ssg") + .setCpuCores(456) + .setTaskHeapMemory(generatorMemorySize) + .build(); + + DynamicIcebergSink.forInput(dataStream) + .generator(new ForwardGenerator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .immediateTableUpdate(false) + .shuffleSinkSlotSharingGroup(shuffleSinkSSG) + .generatorSlotSharingGroup(generatorSSG) + .append(); + + List vertices = + StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(), false) + .toList(); + + boolean shufflingWriterSSGApplied = + vertices.stream() + .filter(vertex -> vertex.getName() != null && vertex.getName().contains("Sink: Writer")) + .anyMatch( + vertex -> + vertex + .getSlotSharingGroup() + .getResourceProfile() + .getTaskHeapMemory() + .equals(shuffleSinkMemorySize)); + boolean generatorSSGApplied = + vertices.stream() + .filter(vertex -> vertex.getName() != null && vertex.getName().contains("generator")) + .anyMatch( + vertex -> + vertex + .getSlotSharingGroup() + .getResourceProfile() + .getTaskHeapMemory() + .equals(generatorMemorySize)); + + assertThat(shufflingWriterSSGApplied).isTrue(); + assertThat(generatorSSGApplied).isTrue(); + } + @Test void testForwardWrite() throws Exception { runForwardWriteTest(new ForwardGenerator());