Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are
| `setAll(Map<String, String> properties)` | Set multiple properties at once |
| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. |
| `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). |
| `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. |
| `generatorSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). |
| `disableSlotSharing(boolean disabled)` | Put generator and sink into each own's unique slot sharing group. This explicitly prevent them from slot sharing with other tasks. |

### Distribution Modes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
Expand Down Expand Up @@ -241,6 +243,9 @@ public static class Builder<T> {
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;
private boolean caseSensitive = true;
@Nullable private SlotSharingGroup generatorSlotSharingGroup;
@Nullable private SlotSharingGroup shuffeSinkSlotSharingGroup;
private boolean disableSlotSharing = false;

Builder() {}

Expand Down Expand Up @@ -316,6 +321,26 @@ public Builder<T> writeParallelism(int newWriteParallelism) {
return this;
}

public Builder<T> generatorSlotSharingGroup(SlotSharingGroup ssg) {
generatorSlotSharingGroup = ssg;
return this;
}

public Builder<T> shuffleSinkSlotSharingGroup(SlotSharingGroup ssg) {
shuffeSinkSlotSharingGroup = ssg;
return this;
}

/**
* Put generator and sink into each own's unique slot sharing group.
*
* @return {@link Builder} to connect the iceberg table.
*/
public Builder<T> disableSlotSharing(boolean newDisableSlotSharing) {
disableSlotSharing = newDisableSlotSharing;
return this;
}

/**
* Set the uid prefix for IcebergSink operators. Note that IcebergSink internally consists of
* multiple operators (like writer, committer, aggregator) Actual operator uid will be appended
Expand Down Expand Up @@ -424,6 +449,17 @@ private DynamicIcebergSink build(
generator != null, "Please use withGenerator() to convert the input DataStream.");
Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null");

if (disableSlotSharing) {
if (generatorSlotSharingGroup == null) {
generatorSlotSharingGroup =
SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-generator")).build();
}
if (shuffeSinkSlotSharingGroup == null) {
shuffeSinkSlotSharingGroup =
SlotSharingGroup.newBuilder(prefixIfNotNull(uidPrefix, "-sink")).build();
}
}

Configuration flinkConfig =
readableConfig instanceof Configuration
? (Configuration) readableConfig
Expand All @@ -435,7 +471,7 @@ private DynamicIcebergSink build(
TypeInformation<CommittableMessage<DynamicWriteResult>> writeResultTypeInfo =
CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);

DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
SingleOutputStreamOperator<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
converted
.getSideOutput(
new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
Expand All @@ -446,6 +482,10 @@ private DynamicIcebergSink build(
.setParallelism(converted.getParallelism())
.uid(prefixIfNotNull(uidPrefix, "-forward-writer"));

if (generatorSlotSharingGroup != null) {
forwardWriteResults.slotSharingGroup(generatorSlotSharingGroup);
}

// Inject forward write results into sink — they'll be unioned in addPreCommitTopology
return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
}
Expand Down Expand Up @@ -507,6 +547,9 @@ public DataStreamSink<DynamicRecordInternal> append() {
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
if (generatorSlotSharingGroup != null) {
converted.slotSharingGroup(generatorSlotSharingGroup);
}

DynamicIcebergSink sink = build(converted, sideOutputType);

Expand Down Expand Up @@ -535,6 +578,9 @@ public DataStreamSink<DynamicRecordInternal> append() {
shuffleInput
.sinkTo(sink) // Forward write results are implicitly injected here
.uid(prefixIfNotNull(uidPrefix, "-sink"));
if (shuffeSinkSlotSharingGroup != null) {
result.slotSharingGroup(shuffeSinkSlotSharingGroup);
}

FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig);
if (flinkWriteConf.writeParallelism() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.OperatorIDPair;
Expand Down Expand Up @@ -330,6 +332,66 @@ void testNoShuffleTopology() throws Exception {
assertThat(generatorAndSinkChained).isTrue();
}

@Test
void testSlotSharingGroup() {
DataStream<DynamicIcebergDataImpl> dataStream =
env.fromData(Collections.emptyList(), TypeInformation.of(new TypeHint<>() {}));

// This test is a little awkward because slot sharing group is translated to internal
// representation after the job graph is built, losing object reference equality. Therefore, we
// can only test the effect of applying an SSG: in other word, we test that the resource
// requirements are applied to the job graph.
MemorySize shuffleSinkMemorySize = new MemorySize(123);
SlotSharingGroup shuffleSinkSSG =
SlotSharingGroup.newBuilder("shuffle-sink-ssg")
.setCpuCores(123)
.setTaskHeapMemory(shuffleSinkMemorySize)
.build();

MemorySize generatorMemorySize = new MemorySize(456);
SlotSharingGroup generatorSSG =
SlotSharingGroup.newBuilder("generator-ssg")
.setCpuCores(456)
.setTaskHeapMemory(generatorMemorySize)
.build();

DynamicIcebergSink.forInput(dataStream)
.generator(new ForwardGenerator())
.catalogLoader(CATALOG_EXTENSION.catalogLoader())
.immediateTableUpdate(false)
.shuffleSinkSlotSharingGroup(shuffleSinkSSG)
.generatorSlotSharingGroup(generatorSSG)
.append();

List<JobVertex> vertices =
StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(), false)
.toList();

boolean shufflingWriterSSGApplied =
vertices.stream()
.filter(vertex -> vertex.getName() != null && vertex.getName().contains("Sink: Writer"))
.anyMatch(
vertex ->
vertex
.getSlotSharingGroup()
.getResourceProfile()
.getTaskHeapMemory()
.equals(shuffleSinkMemorySize));
boolean generatorSSGApplied =
vertices.stream()
.filter(vertex -> vertex.getName() != null && vertex.getName().contains("generator"))
.anyMatch(
vertex ->
vertex
.getSlotSharingGroup()
.getResourceProfile()
.getTaskHeapMemory()
.equals(generatorMemorySize));

assertThat(shufflingWriterSSGApplied).isTrue();
assertThat(generatorSSGApplied).isTrue();
}

@Test
void testForwardWrite() throws Exception {
runForwardWriteTest(new ForwardGenerator());
Expand Down
Loading