diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala index d979a5fd88..916a125b78 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala @@ -38,14 +38,20 @@ trait FlussScan extends Scan { def requiredSchema: Option[StructType] + /** Spark predicates that the scan reports as pushed down (used in [[description]]). */ + def pushedSparkPredicates: Seq[Predicate] = Seq.empty + protected def scanType: String override def readSchema(): StructType = { requiredSchema.getOrElse(SparkConversions.toSparkDataType(tableInfo.getRowType)) } - override def description(): String = - s"FlussScan: [$tablePath], Type: [$scanType]" + override def description(): String = { + val base = s"FlussScan: [$tablePath], Type: [$scanType]" + if (pushedSparkPredicates.isEmpty) base + else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]" + } override def supportedCustomMetrics(): Array[CustomMetric] = Array(FlussNumRowsReadMetric()) @@ -57,7 +63,7 @@ case class FlussAppendScan( tableInfo: TableInfo, requiredSchema: Option[StructType], pushedPredicate: Option[FlussPredicate], - pushedSparkPredicates: Seq[Predicate], + override val pushedSparkPredicates: Seq[Predicate], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -77,12 +83,6 @@ case class FlussAppendScan( flussConfig, checkpointLocation) } - - override def description(): String = { - val base = super.description() - if (pushedSparkPredicates.isEmpty) base - else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]" - } } /** Fluss Lake Append Scan. */ @@ -90,6 +90,8 @@ case class FlussLakeAppendScan( tablePath: TablePath, tableInfo: TableInfo, requiredSchema: Option[StructType], + pushedPredicate: Option[FlussPredicate], + override val pushedSparkPredicates: Seq[Predicate], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -97,7 +99,13 @@ case class FlussLakeAppendScan( override protected val scanType: String = "LakeAppend" override def toBatch: Batch = { - new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options, flussConfig) + new FlussLakeAppendBatch( + tablePath, + tableInfo, + readSchema, + pushedPredicate, + options, + flussConfig) } override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { @@ -142,6 +150,8 @@ case class FlussLakeUpsertScan( tablePath: TablePath, tableInfo: TableInfo, requiredSchema: Option[StructType], + pushedPredicate: Option[FlussPredicate], + override val pushedSparkPredicates: Seq[Predicate], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussScan { @@ -149,7 +159,13 @@ case class FlussLakeUpsertScan( override protected val scanType: String = "LakeUpsert" override def toBatch: Batch = { - new FlussLakeUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig) + new FlussLakeUpsertBatch( + tablePath, + tableInfo, + readSchema, + pushedPredicate, + options, + flussConfig) } override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index ef08d9de8b..c74f43b9c5 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read import org.apache.fluss.config.{Configuration => FlussConfiguration} import org.apache.fluss.metadata.{LogFormat, TableInfo, TablePath} import org.apache.fluss.predicate.{Predicate => FlussPredicate} +import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils} import org.apache.fluss.spark.utils.SparkPredicateConverter import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -27,6 +28,10 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownR import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import java.util.{Collections, IdentityHashMap, Set => JSet} + +import scala.collection.JavaConverters._ + /** An interface that extends from Spark [[ScanBuilder]]. */ trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns { @@ -45,21 +50,54 @@ trait FlussSupportsPushDownV2Filters extends FlussScanBuilder with SupportsPushD protected var pushedPredicate: Option[FlussPredicate] = None protected var acceptedPredicates: Array[Predicate] = Array.empty[Predicate] + protected def convertAndStorePredicates(predicates: Array[Predicate]): Unit = { + val (predicate, accepted) = + SparkPredicateConverter.convertPredicates(tableInfo.getRowType, predicates.toSeq) + pushedPredicate = predicate + acceptedPredicates = accepted.toArray + } + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { // Server-side batch filter only supports ARROW; other log formats reject it. if (tableInfo.getTableConfig.getLogFormat == LogFormat.ARROW) { - val (predicate, accepted) = - SparkPredicateConverter.convertPredicates(tableInfo.getRowType, predicates.toSeq) - pushedPredicate = predicate - acceptedPredicates = accepted.toArray + convertAndStorePredicates(predicates) } - // Server-side filter is batch-level only; Spark must re-apply for row-exact results. predicates } override def pushedPredicates(): Array[Predicate] = acceptedPredicates } +/** + * Lake reads push to the lake source regardless of log format. Each convertible predicate is + * offered to the lake source individually; only the lake-accepted subset is reported back to Spark + * and combined into the predicate handed to the scan. + */ +trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters { + + def tablePath: TablePath + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val pairs = + SparkPredicateConverter.convertPerPredicate(tableInfo.getRowType, predicates.toSeq) + val (acceptedSpark, acceptedFluss) = if (pairs.isEmpty) { + (Seq.empty[Predicate], Seq.empty[FlussPredicate]) + } else { + val lakeSource = + FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + val result = FlussLakeBatch.applyLakeFilters(lakeSource, pairs.map(_._2).asJava) + // Identity-match: lake sources are expected to return the same instances they received. + val acceptedSet: JSet[FlussPredicate] = + Collections.newSetFromMap(new IdentityHashMap()) + acceptedSet.addAll(result.acceptedPredicates()) + pairs.collect { case (sp, fp) if acceptedSet.contains(fp) => (sp, fp) }.unzip + } + pushedPredicate = SparkPredicateConverter.combineAnd(acceptedFluss) + acceptedPredicates = acceptedSpark.toArray + predicates + } +} + /** Fluss Append Scan Builder. */ class FlussAppendScanBuilder( tablePath: TablePath, @@ -82,14 +120,21 @@ class FlussAppendScanBuilder( /** Fluss Lake Append Scan Builder. */ class FlussLakeAppendScanBuilder( - tablePath: TablePath, - tableInfo: TableInfo, + val tablePath: TablePath, + val tableInfo: TableInfo, options: CaseInsensitiveStringMap, flussConfig: FlussConfiguration) - extends FlussScanBuilder { + extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { - FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig) + FlussLakeAppendScan( + tablePath, + tableInfo, + requiredSchema, + pushedPredicate, + acceptedPredicates.toSeq, + options, + flussConfig) } } @@ -108,13 +153,20 @@ class FlussUpsertScanBuilder( /** Fluss Lake Upsert Scan Builder for lake-enabled primary key tables. */ class FlussLakeUpsertScanBuilder( - tablePath: TablePath, - tableInfo: TableInfo, + val tablePath: TablePath, + val tableInfo: TableInfo, options: CaseInsensitiveStringMap, flussConfig: FlussConfiguration) - extends FlussScanBuilder { + extends FlussLakeSupportsPushDownV2Filters { override def build(): Scan = { - FlussLakeUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig) + FlussLakeUpsertScan( + tablePath, + tableInfo, + requiredSchema, + pushedPredicate, + acceptedPredicates.toSeq, + options, + flussConfig) } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala index a8c2ec195f..d451039299 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -22,7 +22,8 @@ import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.Configuration import org.apache.fluss.exception.LakeTableSnapshotNotExistException import org.apache.fluss.lake.source.{LakeSource, LakeSplit} -import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.metadata.{LogFormat, ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.predicate.{Predicate => FlussPredicate} import org.apache.fluss.spark.read._ import org.apache.fluss.utils.ExceptionUtils @@ -38,6 +39,7 @@ class FlussLakeAppendBatch( tablePath: TablePath, tableInfo: TableInfo, readSchema: StructType, + pushedPredicate: Option[FlussPredicate], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { @@ -45,14 +47,25 @@ class FlussLakeAppendBatch( // Required by FlussLakeBatch but unused — lake snapshot determines start offsets. override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() + // Server-side log filter requires ARROW format. + private val logTailPredicate: Option[FlussPredicate] = + if (tableInfo.getTableConfig.getLogFormat == LogFormat.ARROW) pushedPredicate else None + override def createReaderFactory(): PartitionReaderFactory = { if (isFallback) { - new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig) + new FlussAppendPartitionReaderFactory( + tablePath, + projection, + logTailPredicate, + options, + flussConfig) } else { new FlussLakePartitionReaderFactory( tableInfo.getProperties.toMap, tablePath, projection, + pushedPredicate, + logTailPredicate, flussConfig) } } @@ -75,6 +88,7 @@ class FlussLakeAppendBatch( val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) + pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) val lakeSplits = lakeSource .createPlanner(new LakeSource.PlannerContext { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala index c07ac0b8bc..d61aea9d1c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala @@ -19,13 +19,18 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.{TableInfo, TablePath} +import org.apache.fluss.predicate.{Predicate => FlussPredicate} import org.apache.fluss.spark.read._ +import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import java.util.Collections + import scala.collection.JavaConverters._ abstract class FlussLakeBatch( @@ -62,3 +67,21 @@ abstract class FlussLakeBatch( .toMap } } + +object FlussLakeBatch extends Logging { + + def applyLakeFilters( + lakeSource: LakeSource[LakeSplit], + predicates: java.util.List[FlussPredicate]): LakeSource.FilterPushDownResult = { + val result = lakeSource.withFilters(predicates) + logInfo( + s"Lake source accepted ${result.acceptedPredicates()}, " + + s"remaining ${result.remainingPredicates()}") + result + } + + def applyLakeFilters( + lakeSource: LakeSource[LakeSplit], + predicate: FlussPredicate): LakeSource.FilterPushDownResult = + applyLakeFilters(lakeSource, Collections.singletonList(predicate)) +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala index 7acdb041fa..369c45f748 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath +import org.apache.fluss.predicate.{Predicate => FlussPredicate} import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader} import org.apache.spark.sql.catalyst.InternalRow @@ -32,12 +33,15 @@ class FlussLakePartitionReaderFactory( tableProperties: util.Map[String, String], tablePath: TablePath, projection: Array[Int], + flussPredicate: Option[FlussPredicate], + logTailPredicate: Option[FlussPredicate], flussConfig: Configuration) extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath) source.withProject(FlussLakeUtils.lakeProjection(projection)) + flussPredicate.foreach(FlussLakeBatch.applyLakeFilters(source, _)) source } @@ -51,7 +55,12 @@ class FlussLakePartitionReaderFactory( projection, flussConfig) case logSplit: FlussAppendInputPartition => - new FlussAppendPartitionReader(tablePath, projection, None, logSplit, flussConfig) + new FlussAppendPartitionReader( + tablePath, + projection, + logTailPredicate, + logSplit, + flussConfig) case mixedSplit: FlussLakeUpsertInputPartition => new FlussLakeUpsertPartitionReader( tablePath, diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala index ef8127f430..2f99e39a57 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -23,6 +23,7 @@ import org.apache.fluss.config.Configuration import org.apache.fluss.exception.LakeTableSnapshotNotExistException import org.apache.fluss.lake.source.LakeSplit import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.predicate.{Predicate => FlussPredicate} import org.apache.fluss.spark.read._ import org.apache.fluss.utils.ExceptionUtils @@ -41,6 +42,7 @@ class FlussLakeUpsertBatch( tablePath: TablePath, tableInfo: TableInfo, readSchema: StructType, + pushedPredicate: Option[FlussPredicate], options: CaseInsensitiveStringMap, flussConfig: Configuration) extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { @@ -57,10 +59,13 @@ class FlussLakeUpsertBatch( if (isFallback) { new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) } else { + // PK kv-tail reader does not consume server-side log filters. new FlussLakePartitionReaderFactory( tableInfo.getProperties.toMap, tablePath, projection, + pushedPredicate, + None, flussConfig) } } @@ -83,6 +88,7 @@ class FlussLakeUpsertBatch( val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) + pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _)) val lakeSplits = lakeSource .createPlanner(() => lakeSnapshot.getSnapshotId) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala index fcf5f5b064..4ea5fd75f7 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala @@ -50,14 +50,21 @@ object SparkPredicateConverter { def convertPredicates( rowType: RowType, predicates: Seq[Predicate]): (Option[FlussPredicate], Seq[Predicate]) = { - val (accepted, converted) = - predicates.flatMap(p => convert(rowType, p).map((p, _))).unzip - val combined = converted match { - case Seq() => None - case Seq(single) => Some(single) - case many => Some(PredicateBuilder.and(many.asJava)) - } - (combined, accepted) + val pairs = convertPerPredicate(rowType, predicates) + (combineAnd(pairs.map(_._2)), pairs.map(_._1)) + } + + /** Returns the convertible (Spark, Fluss) predicate pairs in input order. */ + def convertPerPredicate( + rowType: RowType, + predicates: Seq[Predicate]): Seq[(Predicate, FlussPredicate)] = + predicates.flatMap(p => convert(rowType, p).map((p, _))) + + /** AND-combine a list of Fluss predicates; empty -> None, single -> as-is. */ + def combineAnd(predicates: Seq[FlussPredicate]): Option[FlussPredicate] = predicates match { + case Seq() => None + case Seq(single) => Some(single) + case many => Some(PredicateBuilder.and(many.asJava)) } private def toFluss( diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala index 08ff77cd86..210b23e127 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala @@ -398,6 +398,131 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { } } + test("Spark Lake Read: filter pushdown — lake-only (all data in lake)") { + withTable("t_pd_lake") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_lake (id INT, amount INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_lake VALUES + |(1, 100, 'alpha'), (2, 200, 'beta'), (3, 300, 'gamma') + |""".stripMargin) + tierToLake("t_pd_lake") + + val query = + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_pd_lake WHERE amount > 150 ORDER BY id") + checkAnswer(query, Row(2, 200, "beta") :: Row(3, 300, "gamma") :: Nil) + assertPushedNames(query, Set(">")) + } + } + + test("Spark Lake Read: filter pushdown — union (lake + log tail)") { + withTable("t_pd_union") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_union (id INT, amount INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_union VALUES + |(1, 100, 'alpha'), (2, 200, 'beta'), (3, 300, 'gamma') + |""".stripMargin) + tierToLake("t_pd_union") + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_union VALUES + |(4, 400, 'delta'), (5, 500, 'epsilon') + |""".stripMargin) + + val query = + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_pd_union WHERE amount >= 200 ORDER BY id") + checkAnswer( + query, + Row(2, 200, "beta") :: + Row(3, 300, "gamma") :: + Row(4, 400, "delta") :: + Row(5, 500, "epsilon") :: Nil + ) + assertPushedNames(query, Set(">=")) + } + } + + test("Spark Lake Read: filter pushdown — fallback (no lake snapshot)") { + withTable("t_pd_fallback") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_fallback (id INT, amount INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_fallback VALUES + |(1, 100, 'alpha'), (2, 200, 'beta'), (3, 300, 'gamma') + |""".stripMargin) + + val query = + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_pd_fallback WHERE amount = 200") + checkAnswer(query, Row(2, 200, "beta") :: Nil) + assertPushedNames(query, Set("=")) + } + } + + test("Spark Lake Read: filter pushdown — partitioned lake table") { + withTable("t_pd_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_partitioned + | (id INT, amount INT, name STRING, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_partitioned VALUES + |(1, 100, 'alpha', '2026-01-01'), + |(2, 200, 'beta', '2026-01-01'), + |(3, 300, 'gamma', '2026-01-02') + |""".stripMargin) + tierToLake("t_pd_partitioned") + + val query = sql(s""" + |SELECT id, amount, dt FROM $DEFAULT_DATABASE.t_pd_partitioned + |WHERE amount >= 200 ORDER BY id""".stripMargin) + checkAnswer(query, Row(2, 200, "2026-01-01") :: Row(3, 300, "2026-01-02") :: Nil) + assertPushedNames(query, Set(">=")) + } + } + + test("Spark Lake Read: filter pushdown — non-pushable predicate falls back to Spark") { + withTable("t_pd_nonpushable") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_nonpushable (id INT, amount INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_nonpushable VALUES + |(1, 100, 'alpha'), (2, 200, 'beta'), (3, 300, 'gamma'), (4, 400, 'delta') + |""".stripMargin) + tierToLake("t_pd_nonpushable") + + val query = + sql(s"SELECT id FROM $DEFAULT_DATABASE.t_pd_nonpushable WHERE amount % 200 = 0 ORDER BY id") + checkAnswer(query, Row(2) :: Row(4) :: Nil) + // The modulo expression is not convertible; only the implicit IS_NOT_NULL Spark adds is pushed. + assertResult(Set("IS_NOT_NULL"))(pushedPredicates(query).map(_.name()).toSet) + } + } + test("Spark Lake Read: non-FULL startup mode skips lake path") { withTable("t_earliest") { sql(s""" @@ -427,6 +552,7 @@ abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { } } } + } class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTest { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala index 22277445cd..4913ee29e6 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -328,6 +328,56 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTe } } + test("Spark Lake Read: pk filter pushdown — lake-only on non-pk column") { + withTable("t_pd_pk_lake") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_pk_lake (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_pk_lake VALUES + |(1, 'alice', 90), (2, 'bob', 85), (3, 'charlie', 95), (4, 'dave', 70) + |""".stripMargin) + tierToLake("t_pd_pk_lake") + + val query = + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_pd_pk_lake WHERE score >= 90 ORDER BY id") + checkAnswer(query, Row(1, "alice", 90) :: Row(3, "charlie", 95) :: Nil) + assertPushedNames(query, Set(">=")) + } + } + + test("Spark Lake Read: pk filter pushdown — union (lake + kv tail)") { + withTable("t_pd_pk_union") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_pd_pk_union (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_pk_union VALUES + |(1, 'alice', 90), (2, 'bob', 85), (3, 'charlie', 95) + |""".stripMargin) + tierToLake("t_pd_pk_union") + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_pd_pk_union VALUES + |(4, 'dave', 88), (5, 'eve', 92) + |""".stripMargin) + + val query = + sql(s"SELECT id, score FROM $DEFAULT_DATABASE.t_pd_pk_union WHERE score >= 90 ORDER BY id") + checkAnswer(query, Row(1, 90) :: Row(3, 95) :: Row(5, 92) :: Nil) + assertPushedNames(query, Set(">=")) + } + } + test("Spark Lake Read: primary key table projection with type-dependent columns") { withTable("t") { val tablePath = createTablePath("t") @@ -381,6 +431,7 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTe ) } } + } class SparkLakePaimonPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableReadTestBase { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala index d3e4c743b4..9ca8d81a0d 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala @@ -22,9 +22,13 @@ import org.apache.fluss.flink.tiering.LakeTieringJobBuilder import org.apache.fluss.flink.tiering.source.TieringSourceOptions import org.apache.fluss.metadata.{DataLakeFormat, TableBucket} import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.spark.read.FlussScan import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation} import java.time.Duration @@ -128,4 +132,23 @@ abstract class SparkLakeTableReadTestBase extends FlussSparkTestBase { } } + protected def pushedPredicates(df: DataFrame): Array[Predicate] = { + val scans = + df.queryExecution.executedPlan.collect { + case b: BatchScanExec => b.scan + } ++ df.queryExecution.optimizedPlan.collect { + case DataSourceV2ScanRelation(_, scan, _, _, _) => scan + } + scans + .collect { case f: FlussScan => f.pushedSparkPredicates } + .flatten + .toArray + } + + protected def assertPushedNames(df: DataFrame, expected: Set[String]): Unit = { + val pushed = pushedPredicates(df).map(_.name()).toSet + assert( + expected.exists(pushed.contains), + s"Expected any of $expected in pushed predicates, got $pushed") + } }