[flink] add split assignment mechanism w/o partitionId#3272
Draft
zuston wants to merge 1 commit intoapache:mainfrom
Draft
[flink] add split assignment mechanism w/o partitionId#3272zuston wants to merge 1 commit intoapache:mainfrom
zuston wants to merge 1 commit intoapache:mainfrom
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adjusts Flink source split-to-subtask assignment to better align with the sink channel selection logic, aiming to co-locate buckets on the same subtasks (notably when numBuckets % parallelism == 0) to reduce shuffle and improve large-scale backfill performance (issue #3269).
Changes:
- Update
FlinkSourceEnumerator#getSplitOwnerto useChannelComputer.shouldCombinePartitionInSharding(...)and route either by bucket-only or by (partition,bucket). - Add
ChannelComputer.select(Long partitionId, int bucket, int numChannels)to support partition-id-based channel selection.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java | Switch split ownership logic to reuse ChannelComputer sharding decisions and enable bucket-only routing when appropriate. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/ChannelComputer.java | Add a partition-id-based select(...) overload for channel selection. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
943
to
945
| @VisibleForTesting | ||
| protected int getSplitOwner(SourceSplitBase split) { | ||
| TableBucket tableBucket = split.getTableBucket(); |
Comment on lines
+956
to
+962
| Long partitionId = tableBucket.getPartitionId(); | ||
| int bucketId = tableBucket.getBucket(); | ||
| if (ChannelComputer.shouldCombinePartitionInSharding( | ||
| partitionId != null, tableInfo.getNumBuckets(), numChannels)) { | ||
| return ChannelComputer.select(partitionId, bucketId, numChannels); | ||
| } | ||
| return ChannelComputer.select(bucketId, numChannels); |
Comment on lines
+958
to
+960
| if (ChannelComputer.shouldCombinePartitionInSharding( | ||
| partitionId != null, tableInfo.getNumBuckets(), numChannels)) { | ||
| return ChannelComputer.select(partitionId, bucketId, numChannels); |
| import org.apache.fluss.flink.lake.LakeSplitGenerator; | ||
| import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; | ||
| import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; | ||
| import org.apache.fluss.flink.sink.ChannelComputer; |
Comment on lines
+62
to
+65
| static int select(Long partitionId, int bucket, int numChannels) { | ||
| int startChannel = ((partitionId.hashCode() * 31) & 0x7FFFFFFF) % numChannels; | ||
| return (startChannel + bucket) % numChannels; | ||
| } |
Comment on lines
+956
to
+962
| Long partitionId = tableBucket.getPartitionId(); | ||
| int bucketId = tableBucket.getBucket(); | ||
| if (ChannelComputer.shouldCombinePartitionInSharding( | ||
| partitionId != null, tableInfo.getNumBuckets(), numChannels)) { | ||
| return ChannelComputer.select(partitionId, bucketId, numChannels); | ||
| } | ||
| return ChannelComputer.select(bucketId, numChannels); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #3269
The motivation of this PR is to align with the sink channel selector. And another point is to make it possible that all bucket data could be routed into the same subtasks if having the partitionIds when
numBucket % concurrency = 0, that could emlinate the shuffle to improve the performance to reduce backfill time for large-scale data.Brief change log
Tests
API and Format
Documentation