Skip to content

[spark] lake table predicate pushdown#3238

Open
fresh-borzoni wants to merge 4 commits intoapache:mainfrom
fresh-borzoni:feat/spark-lake-filter-pushdown
Open

[spark] lake table predicate pushdown#3238
fresh-borzoni wants to merge 4 commits intoapache:mainfrom
fresh-borzoni:feat/spark-lake-filter-pushdown

Conversation

@fresh-borzoni
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni commented May 2, 2026

Summary

closes #3239

Push Spark WHERE predicates to LakeSource.withFilters() for batch reads of lake-enabled log and kv tables.

Note: partition filter pushdown is not implemented here, I will address this in the followup.

@fresh-borzoni fresh-borzoni force-pushed the feat/spark-lake-filter-pushdown branch from 7284ad5 to 15c85e3 Compare May 2, 2026 03:32
@fresh-borzoni fresh-borzoni force-pushed the feat/spark-lake-filter-pushdown branch from 15c85e3 to a3deed5 Compare May 2, 2026 09:34
@fresh-borzoni
Copy link
Copy Markdown
Member Author

@Yohahaha @YannByron PTAL 🙏

Copy link
Copy Markdown
Contributor

@Yohahaha Yohahaha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! left some comments, thank you!

please update PR title/description to note that partition filter was not implemented.

@fresh-borzoni fresh-borzoni force-pushed the feat/spark-lake-filter-pushdown branch from 4f736f4 to 34af92c Compare May 7, 2026 13:10
@fresh-borzoni
Copy link
Copy Markdown
Member Author

@Yohahaha Ty for the review, addressed

@luoyuxia Can you take a quick look, pls?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Spark batch predicate pushdown support for lake-enabled log/kv tables by converting Spark WHERE predicates into Fluss predicates and forwarding them to LakeSource.withFilters(...), aligning Spark behavior more closely with Flink’s lake filter pushdown.

Changes:

  • Introduce lake-specific SupportsPushDownV2Filters handling that probes LakeSource.withFilters(...) and reports/propagates the accepted predicate subset.
  • Apply pushed Fluss predicates in lake batch planning and reader factories (lake splits) and wire predicate state through lake scans/batches.
  • Add UT coverage for lake append + lake upsert filter pushdown scenarios (lake-only, union with tail, fallback, non-pushable expressions).

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala Adds reusable helpers to extract pushed predicates from Spark plans.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala Adds primary-key table lake-read filter pushdown tests.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala Adds append/log lake-read filter pushdown tests (lake-only/union/fallback/non-pushable).
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala Refactors predicate conversion to support per-predicate conversion and AND-combination helper.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala Threads pushed predicate into lake upsert batch planning and lake source filter application.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala Applies lake filters to lake readers and supports passing a separate predicate to log-tail readers.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala Adds shared applyLakeFilters helper (with debug logging) for LakeSource.withFilters.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala Threads pushed predicate into lake append planning; optionally forwards to log-tail server-side filter for ARROW.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala Adds lake-specific V2 filter pushdown trait and wires lake scan builders to it.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala Propagates pushed Spark predicates into scan descriptions and adds lake scan predicate plumbing.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fresh-borzoni Thanks for the pr. LGTM! One minor comment. Feel free to merge it.

val remaining = result.remainingPredicates().size()
s"Lake source accepted $accepted of ${accepted + remaining} pushed predicates"
}
logDebug(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it can be info level

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] lake table predicate pushdown

4 participants