Skip to content

[flink] add split assignment mechanism w/o partitionId#3272

Draft
zuston wants to merge 1 commit intoapache:mainfrom
zuston:split-assign
Draft

[flink] add split assignment mechanism w/o partitionId#3272
zuston wants to merge 1 commit intoapache:mainfrom
zuston:split-assign

Conversation

@zuston
Copy link
Copy Markdown
Member

@zuston zuston commented May 8, 2026

Purpose

Linked issue: close #3269

The motivation of this PR is to align with the sink channel selector. And another point is to make it possible that all bucket data could be routed into the same subtasks if having the partitionIds when numBucket % concurrency = 0 , that could emlinate the shuffle to improve the performance to reduce backfill time for large-scale data.

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adjusts Flink source split-to-subtask assignment to better align with the sink channel selection logic, aiming to co-locate buckets on the same subtasks (notably when numBuckets % parallelism == 0) to reduce shuffle and improve large-scale backfill performance (issue #3269).

Changes:

  • Update FlinkSourceEnumerator#getSplitOwner to use ChannelComputer.shouldCombinePartitionInSharding(...) and route either by bucket-only or by (partition,bucket).
  • Add ChannelComputer.select(Long partitionId, int bucket, int numChannels) to support partition-id-based channel selection.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java Switch split ownership logic to reuse ChannelComputer sharding decisions and enable bucket-only routing when appropriate.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/ChannelComputer.java Add a partition-id-based select(...) overload for channel selection.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 943 to 945
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
Comment on lines +956 to +962
Long partitionId = tableBucket.getPartitionId();
int bucketId = tableBucket.getBucket();
if (ChannelComputer.shouldCombinePartitionInSharding(
partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
return ChannelComputer.select(partitionId, bucketId, numChannels);
}
return ChannelComputer.select(bucketId, numChannels);
Comment on lines +958 to +960
if (ChannelComputer.shouldCombinePartitionInSharding(
partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
return ChannelComputer.select(partitionId, bucketId, numChannels);
import org.apache.fluss.flink.lake.LakeSplitGenerator;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.sink.ChannelComputer;
Comment on lines +62 to +65
static int select(Long partitionId, int bucket, int numChannels) {
int startChannel = ((partitionId.hashCode() * 31) & 0x7FFFFFFF) % numChannels;
return (startChannel + bucket) % numChannels;
}
Comment on lines +956 to +962
Long partitionId = tableBucket.getPartitionId();
int bucketId = tableBucket.getBucket();
if (ChannelComputer.shouldCombinePartitionInSharding(
partitionId != null, tableInfo.getNumBuckets(), numChannels)) {
return ChannelComputer.select(partitionId, bucketId, numChannels);
}
return ChannelComputer.select(bucketId, numChannels);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optional channel selection for source split in FlinkSourceEnumerator

2 participants