From 7339c52db4017e3c2e5e05f5ee493a244d5a289b Mon Sep 17 00:00:00 2001 From: matrix Date: Thu, 30 Apr 2026 20:38:11 +0800 Subject: [PATCH 1/2] [flink] Fix batch query on empty datalake-enabled table to fall back to fluss splits --- .../enumerator/FlinkSourceEnumerator.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 3ad234aa01..f24f93f5ee 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -361,9 +361,22 @@ private void startInBatchMode() { context.callAsync( () -> { List splits = generateHybridLakeFlussSplits(); + // No lake snapshot exists, fall back to Fluss-only splits if (splits == null) { - throw new UnsupportedOperationException( - "Currently, Batch mode can only be supported if one lake snapshot exists for the table."); + if (isPartitioned) { + Set partitionInfos = listPartitions(); + Collection partitions = + partitionInfos.stream() + .map( + p -> + new Partition( + p.getPartitionId(), + p.getPartitionName())) + .collect(Collectors.toList()); + splits = this.initPartitionedSplits(partitions); + } else { + splits = this.initNonPartitionedSplits(); + } } return splits; }, From dd1b7b88ae0547dbe48cdb1ebba44f10eb4dd624 Mon Sep 17 00:00:00 2001 From: matrix Date: Fri, 1 May 2026 01:38:44 +0800 Subject: [PATCH 2/2] [flink] Add info log when falling back to Fluss-only splits --- .../fluss/flink/source/enumerator/FlinkSourceEnumerator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index f24f93f5ee..1a6521f57e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -363,6 +363,10 @@ private void startInBatchMode() { List splits = generateHybridLakeFlussSplits(); // No lake snapshot exists, fall back to Fluss-only splits if (splits == null) { + LOG.info( + "No lake snapshot found for table {}," + + " falling back to Fluss-only splits.", + tablePath); if (isPartitioned) { Set partitionInfos = listPartitions(); Collection partitions =