-
Notifications
You must be signed in to change notification settings - Fork 536
[flink] Fix batch query on empty datalake-enabled table to return 0 rows instead of failing #3208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -361,9 +361,26 @@ private void startInBatchMode() { | |
| context.callAsync( | ||
| () -> { | ||
| List<SourceSplitBase> splits = generateHybridLakeFlussSplits(); | ||
| // No lake snapshot exists, fall back to Fluss-only splits | ||
| if (splits == null) { | ||
| throw new UnsupportedOperationException( | ||
| "Currently, Batch mode can only be supported if one lake snapshot exists for the table."); | ||
| LOG.info( | ||
| "No lake snapshot found for table {}," | ||
| + " falling back to Fluss-only splits.", | ||
| tablePath); | ||
| if (isPartitioned) { | ||
| Set<PartitionInfo> partitionInfos = listPartitions(); | ||
| Collection<Partition> partitions = | ||
| partitionInfos.stream() | ||
| .map( | ||
| p -> | ||
| new Partition( | ||
| p.getPartitionId(), | ||
| p.getPartitionName())) | ||
| .collect(Collectors.toList()); | ||
| splits = this.initPartitionedSplits(partitions); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue here is that for primary-key tables, this fallback may generate both kv snapshot splits and log splits. At the moment, the Flink connector does not seem to support merging kv snapshot splits with log splits yet. @matrixsparse Would you be open to submitting a follow-up PR to address this? If that is not convenient at the moment, it may be safer to revert this change for now and reopen the issue so we can revisit it with a complete fix later.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood. I will submit a follow-up PR to fix this limitation in the Flink connector. Thanks for pointing this out @luoyuxia
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @luoyuxia Good point, thank you, I missed it |
||
| } else { | ||
| splits = this.initNonPartitionedSplits(); | ||
| } | ||
| } | ||
| return splits; | ||
| }, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe it's a good idea to add some info log for easier tracing, at least stream-mode has it when it goes through the lake path:
fluss/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
Line 315 in 53bdba8
smth like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Added the info log for tracing. Thanks @fresh-borzoni!