[client] Fix tiering hang on first_row merge engine empty batches#3242
[client] Fix tiering hang on first_row merge engine empty batches#3242Kaixuan-Duan wants to merge 3 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a tiering hang for tables using the FIRST_ROW merge engine where the log offset can advance via “empty” WAL batches (recordCount=0) even when no ScanRecord is materialized, causing the tiering layer to repeatedly poll the same range (Issue #2371).
Changes:
- Extend
fluss-clientScanRecordsto carry per-bucketnextLogOffsets, and exposepolledBuckets()+nextLogOffset(bucket)so callers can observe offset progress even when no records were produced. - Update
LogFetchCollectorto always recordnextFetchOffsetfor each polled bucket and constructScanRecordswith these offsets. - Update Flink
TieringSplitReader#forLogRecordsto iteratepolledBuckets()and determine end-of-range usingnextLogOffset(with fallback to last-record checks), plus add a regression test reproducing Issue #2371.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java | Adds regression test to ensure tiering completes under FIRST_ROW with duplicate keys/empty batches. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java | Uses polledBuckets() and nextLogOffset to finish splits even when only empty batches occur. |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java | Adds unit tests for legacy constructor behavior and new polledBuckets()/nextLogOffset semantics. |
| fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java | Verifies empty/filtered responses still expose offset advancement via polledBuckets()/nextLogOffset. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java | Introduces nextLogOffsets, new constructor, polledBuckets(), and nextLogOffset(bucket). |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java | Always records advanced nextFetchOffset per polled bucket and returns it in ScanRecords. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Set<TableBucket> polledBuckets() { | ||
| if (nextLogOffsets.isEmpty()) { | ||
| return buckets(); | ||
| } | ||
| Set<TableBucket> all = new HashSet<>(records.keySet()); | ||
| all.addAll(nextLogOffsets.keySet()); | ||
| return Collections.unmodifiableSet(all); | ||
| } |
| // Tracks the next fetch offset for every bucket polled in this round, even when the | ||
| // returned record list is empty (e.g. empty WAL batches produced by the FIRST_ROW | ||
| // merge engine, see issue #2371). This lets callers (such as the tiering service) | ||
| // detect that the log offset has advanced past empty batches. | ||
| Map<TableBucket, Long> nextOffsets = new HashMap<>(); |
|
@luoyuxia Hi, I have addressed the feedback. PTAL |
|
@Kaixuan-Duan Thanks for the pr. Will take a look when i got some time |
luoyuxia
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks for the pr. Left minor comments. PTAL
| * Return the fetched log records, empty the record buffer and update the consumed position. | ||
| * | ||
| * <p>NOTE: returning empty records guarantees the consumed position are NOT updated. | ||
| * <p>The returned {@link ScanRecords#records(TableBucket)} may be empty for a bucket even when |
There was a problem hiding this comment.
The comments looks too long..Too long comments may break attention. I think we can revert the changes. It may not important to add these comments.
| * empty WAL batches generated by the FIRST_ROW merge engine when the upserted key already | ||
| * exists). See <a href="https://github.com/apache/fluss/issues/2371">FLUSS-2371</a>. | ||
| */ | ||
| private final Map<TableBucket, Long> nextLogOffsets; |
There was a problem hiding this comment.
nextLogOffsets -> lastConsumedOffsets?
Also comment that it's a exclusive offset..
| * @return the union of buckets exposed via {@link #buckets()} and buckets that only have an | ||
| * advanced {@code nextLogOffset}. | ||
| */ | ||
| public Set<TableBucket> polledBuckets() { |
There was a problem hiding this comment.
I'm wondering whether can we just remove this method?
We can put empty records(no actaull record, but with offset advance) into records?
| */ | ||
| @Nullable | ||
| public Long nextLogOffset(TableBucket bucket) { | ||
| return nextLogOffsets.get(bucket); |
There was a problem hiding this comment.
when bucket was not polled in this round?
If not polled in this round, will it be put into nextLogOffsets?
| } | ||
|
|
||
| /** | ||
| * Regression test for <a href="https://github.com/apache/fluss/issues/2371">Issue #2371</a>: |
There was a problem hiding this comment.
comments is too long to me.
| continue; | ||
| } | ||
| LOG.info("tiering table bucket is not empty {}.", bucket); | ||
| // Iterate polledBuckets() (instead of buckets()) so that buckets which only produced |
There was a problem hiding this comment.
I'm wondering if we can change it like following, which I think may be more clear:
for (TableBucket bucket : scanRecords.polledBuckets()) {
LOG.info("tiering table bucket {}.", bucket);
List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
// no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}
LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket);
ScanRecord lastRecord = null;
if (!bucketScanRecords.isEmpty()) {
LOG.info("tiering table bucket is not empty {}.", bucket);
LakeWriter<WriteResult> lakeWriter =
getOrCreateLakeWriter(
bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
for (ScanRecord record : bucketScanRecords) {
// Only tier records that are within the split range [start, stoppingOffset).
if (record.logOffset() < stoppingOffset) {
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());
}
}
}
lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
}
Long nextFetchOffset = scanRecords.nextLogOffset(bucket);
// Prefer the scanner-reported next fetch offset because it advances even when this poll
// round only observes empty WAL batches. Fall back to the last materialized record
// offset
// for callers that do not provide nextFetchOffset.
boolean reachedEnd =
nextFetchOffset != null
? nextFetchOffset >= stoppingOffset
: lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1;
if (!reachedEnd && lastRecord == null) {
continue;
}
// Track the latest tiered offset/timestamp for this bucket.
// Once the split reaches the end, the correct tiered offset is stoppingOffset - 1,
// because stoppingOffset is exclusive and records at/after it are not part of this
// split.
long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset();
long tieredTimestamp;
if (lastRecord != null) {
tieredTimestamp = lastRecord.timestamp();
} else {
LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
}
currentTableTieredOffsetAndTimestamp.put(
bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp));
if (!reachedEnd) {
continue;
}
currentTableStoppingOffsets.remove(bucket);
if (bucket.getPartitionId() != null) {
currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
} else {
// todo: should unsubscribe the log split if unsubscribe bucket for
// un-partitioned table is supported
}
TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
String currentSplitId = currentTieringSplit.splitId();
writeResults.put(
bucket,
completeLakeWriter(
bucket,
currentTieringSplit.getPartitionName(),
stoppingOffset,
tieredTimestamp));
finishedSplitIds.put(bucket, currentSplitId);
LOG.info(
"Finish tier bucket {} for table {}, split: {}.",
bucket,
currentTablePath,
currentSplitId);
}
Purpose
Linked issue: close #2371
Brief change log
fluss-clientScanRecords: add aMap<TableBucket, Long> nextLogOffsetsfield with a new two-arg constructor (legacy single-arg constructor preserved for backwards compatibility); expose two new accessors:polledBuckets()— union of buckets that produced records and buckets that only advanced their next fetch offset.nextLogOffset(bucket)— exclusive upper bound of consumed offsets in this poll round, ornullif the bucket was not polled.LogFetchCollector#collectFetch: always record the advancednextFetchOffsetper polled bucket, even when the materialized record list is empty, and pack it into the newScanRecordsconstructor.fluss-flink-commonTieringSplitReader#forLogRecords: iteratescanRecords.polledBuckets()instead ofscanRecords.buckets(). Determine end-of-range by comparing the scanner-reportednextLogOffsetagainst the bucket'sstoppingOffset(with the legacylastRecord.logOffset() >= stoppingOffset - 1check kept as a fallback for callers that don't supplynextLogOffset). Tolerate splits that finish with no real record observed by falling back toUNKNOWN_BUCKET_TIMESTAMPwhen computing the finish timestamp.Tests
./mvnw -pl fluss-client,fluss-flink/fluss-flink-common \
-Dtest='ScanRecordsTest,LogFetchCollectorTest,TieringSplitReaderTest#testTieringFirstRowMergeEngineFinishes' \
-DfailIfNoTests=false test
API and Format
Documentation