Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,26 @@ private void startInBatchMode() {
context.callAsync(
() -> {
List<SourceSplitBase> splits = generateHybridLakeFlussSplits();
// No lake snapshot exists, fall back to Fluss-only splits
if (splits == null) {
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe it's a good idea to add some info log for easier tracing, at least stream-mode has it when it goes through the lake path:

smth like:

LOG.info("No lake snapshot found for table {}, falling back to Fluss-only splits.", tablePath);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Added the info log for tracing. Thanks @fresh-borzoni!

throw new UnsupportedOperationException(
"Currently, Batch mode can only be supported if one lake snapshot exists for the table.");
LOG.info(
"No lake snapshot found for table {},"
+ " falling back to Fluss-only splits.",
tablePath);
if (isPartitioned) {
Set<PartitionInfo> partitionInfos = listPartitions();
Collection<Partition> partitions =
partitionInfos.stream()
.map(
p ->
new Partition(
p.getPartitionId(),
p.getPartitionName()))
.collect(Collectors.toList());
splits = this.initPartitionedSplits(partitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that for primary-key tables, this fallback may generate both kv snapshot splits and log splits. At the moment, the Flink connector does not seem to support merging kv snapshot splits with log splits yet.
In comparison, the Spark connector only generates log splits in this case, with the lake splits left empty, so it can continue to reuse the existing path that already supports merging lake splits with log splits.

@matrixsparse Would you be open to submitting a follow-up PR to address this? If that is not convenient at the moment, it may be safer to revert this change for now and reopen the issue so we can revisit it with a complete fix later.
cc @fresh-borzoni

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I will submit a follow-up PR to fix this limitation in the Flink connector. Thanks for pointing this out @luoyuxia

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia Good point, thank you, I missed it

} else {
splits = this.initNonPartitionedSplits();
}
}
return splits;
},
Expand Down