Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ static int select(String partitionName, int bucket, int numChannels) {
return (startChannel + bucket) % numChannels;
}

static int select(Long partitionId, int bucket, int numChannels) {
int startChannel = ((partitionId.hashCode() * 31) & 0x7FFFFFFF) % numChannels;
return (startChannel + bucket) % numChannels;
}
Comment on lines +62 to +65

static int select(int bucket, int numChannels) {
return bucket % numChannels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.apache.fluss.flink.sink.ChannelComputer.select;
import static org.apache.fluss.flink.sink.ChannelComputer.shouldCombinePartitionInSharding;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.Preconditions.checkState;

Expand Down Expand Up @@ -924,39 +926,57 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
/**
* Returns the index of the target subtask that a specific split should be assigned to.
*
* <p>The resulting distribution of splits of a single table has the following contract:
* <p>Routing rules (in order):
*
* <ul>
* <li>1. Splits in same bucket are assigned to same subtask
* <li>2. Uniformly distributed across subtasks
* <li>3. For partitioned table, the buckets in same partition are round-robin distributed
* (strictly clockwise w.r.t. ascending subtask indices) by using the partition id as the
* offset from a starting index. The starting index is the index of the subtask which
* bucket 0 of the partition will be assigned to, determined using the partition id to
* make sure the partitions' buckets of a table are distributed uniformly
* <li>Lake splits with bucket {@code -1}: subtask is {@code splitId().hashCode() %
* parallelism} (bucket-unaware lake paths).
* <li>Otherwise, let {@code P} be parallelism and {@code B} be the table bucket count from
* {@link org.apache.fluss.metadata.TableInfo#getNumBuckets()}:
* <ul>
* <li>If the split has no partition id (non-partitioned table): {@code bucket % P}.
* <li>If the split has a partition id and {@code B % P == 0}: {@code bucket % P} only;
* partition id is not used (same channel formula as without partitioning).
* <li>If the split has a partition id and {@code B % P != 0}: partition id participates
* in the channel via {@link org.apache.fluss.flink.sink.ChannelComputer} (hash of
* partition id picks a starting channel, then buckets of that partition round-robin
* from there) so buckets from different partitions spread more evenly when {@code
* B} does not divide {@code P}.
* </ul>
* </ul>
*
* <p>All splits for the same physical bucket still map to the same subtask.
*
* @param split the split to assign.
* @return the id of the subtask that owns the split.
* @throws IllegalStateException if table metadata is not loaded yet (see {@link #start()});
* bucket-based routing requires {@code tableInfo}. Lake splits with bucket {@code -1} do
* not consult {@code tableInfo} and may be resolved before {@link #start()}.
*/
@VisibleForTesting
protected int getSplitOwner(SourceSplitBase split) {
TableBucket tableBucket = split.getTableBucket();
Comment on lines 956 to 958
int startIndex =
tableBucket.getPartitionId() == null
? 0
: ((tableBucket.getPartitionId().hashCode() * 31) & 0x7FFFFFFF)
% context.currentParallelism();
int numChannels = context.currentParallelism();

// super hack logic, if the bucket is -1, it means the split is
// for bucket unaware, like paimon unaware bucket log table,
// we use hash split id to get the split owner
// todo: refactor the split assign logic
if (split.isLakeSplit() && tableBucket.getBucket() == -1) {
return (split.splitId().hashCode() & 0x7FFFFFFF) % context.currentParallelism();
return (split.splitId().hashCode() & 0x7FFFFFFF) % numChannels;
}

return (startIndex + tableBucket.getBucket()) % context.currentParallelism();
checkState(
tableInfo != null,
"Table metadata is not loaded yet; call start() on the enumerator before "
+ "getSplitOwner() for this split (bucket-based assignment needs numBuckets).");

int bucketId = tableBucket.getBucket();
if (shouldCombinePartitionInSharding(
tableBucket.getPartitionId() != null, tableInfo.getNumBuckets(), numChannels)) {
return select(tableBucket.getPartitionId(), bucketId, numChannels);
}
return select(bucketId, numChannels);
}

private void checkReaderRegistered(int readerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,12 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
}
}

@Test
void testGetSplitOwner() throws Exception {
int numSubtasks = 3;
@ParameterizedTest
@ValueSource(ints = {2, 3})
void testGetSplitOwner(int parallelism) throws Exception {
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);
try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(numSubtasks);
new MockSplitEnumeratorContext<>(parallelism);
FlinkSourceEnumerator enumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
Expand All @@ -564,6 +564,8 @@ void testGetSplitOwner() throws Exception {
LeaseContext.DEFAULT,
false)) {

enumerator.start();

// test splits for same non-partitioned bucket, should assign to same task
TableBucket t1 = new TableBucket(tableId, 0);
SourceSplitBase s1 = new LogSplit(t1, null, 1);
Expand All @@ -585,13 +587,19 @@ void testGetSplitOwner() throws Exception {
assertThat(enumerator.getSplitOwner(s1)).isEqualTo(0);
assertThat(enumerator.getSplitOwner(s2)).isEqualTo(1);

// splits are with different partitions
t1 = new TableBucket(tableId, 1L, 0);
t2 = new TableBucket(tableId, 2L, 0);
s1 = new LogSplit(t1, "p1", 0);
s2 = new LogSplit(t2, "p2", 0);
assertThat(enumerator.getSplitOwner(s1)).isEqualTo(1);
assertThat(enumerator.getSplitOwner(s2)).isEqualTo(2);

if (parallelism % DEFAULT_BUCKET_NUM == 0) {
// splits are with different partitions but the same bucket: when bucket count is
// divisible by parallelism, the enumerator uses bucket-only routing (same as w/o
// partition id), so both splits share the same owner.
assertThat(enumerator.getSplitOwner(s1)).isEqualTo(enumerator.getSplitOwner(s2));
} else {
assertThat(enumerator.getSplitOwner(s1)).isNotEqualTo(enumerator.getSplitOwner(s2));
}
}
}

Expand Down