[spark] lake table predicate pushdown#3238
[spark] lake table predicate pushdown#3238fresh-borzoni wants to merge 4 commits intoapache:mainfrom
Conversation
7284ad5 to
15c85e3
Compare
15c85e3 to
a3deed5
Compare
|
@Yohahaha @YannByron PTAL 🙏 |
Yohahaha
left a comment
There was a problem hiding this comment.
LGTM! left some comments, thank you!
please update PR title/description to note that partition filter was not implemented.
4f736f4 to
34af92c
Compare
There was a problem hiding this comment.
Pull request overview
Adds Spark batch predicate pushdown support for lake-enabled log/kv tables by converting Spark WHERE predicates into Fluss predicates and forwarding them to LakeSource.withFilters(...), aligning Spark behavior more closely with Flink’s lake filter pushdown.
Changes:
- Introduce lake-specific
SupportsPushDownV2Filtershandling that probesLakeSource.withFilters(...)and reports/propagates the accepted predicate subset. - Apply pushed Fluss predicates in lake batch planning and reader factories (lake splits) and wire predicate state through lake scans/batches.
- Add UT coverage for lake append + lake upsert filter pushdown scenarios (lake-only, union with tail, fallback, non-pushable expressions).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala | Adds reusable helpers to extract pushed predicates from Spark plans. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala | Adds primary-key table lake-read filter pushdown tests. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala | Adds append/log lake-read filter pushdown tests (lake-only/union/fallback/non-pushable). |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala | Refactors predicate conversion to support per-predicate conversion and AND-combination helper. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala | Threads pushed predicate into lake upsert batch planning and lake source filter application. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala | Applies lake filters to lake readers and supports passing a separate predicate to log-tail readers. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala | Adds shared applyLakeFilters helper (with debug logging) for LakeSource.withFilters. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala | Threads pushed predicate into lake append planning; optionally forwards to log-tail server-side filter for ARROW. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala | Adds lake-specific V2 filter pushdown trait and wires lake scan builders to it. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala | Propagates pushed Spark predicates into scan descriptions and adds lake scan predicate plumbing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. LGTM! One minor comment. Feel free to merge it.
| val remaining = result.remainingPredicates().size() | ||
| s"Lake source accepted $accepted of ${accepted + remaining} pushed predicates" | ||
| } | ||
| logDebug( |
Summary
closes #3239
Push Spark
WHEREpredicates toLakeSource.withFilters()for batch reads of lake-enabled log and kv tables.Note: partition filter pushdown is not implemented here, I will address this in the followup.