Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ trait FlussScan extends Scan {

def requiredSchema: Option[StructType]

/** Spark predicates that the scan reports as pushed down (used in [[description]]). */
def pushedSparkPredicates: Seq[Predicate] = Seq.empty

protected def scanType: String

override def readSchema(): StructType = {
requiredSchema.getOrElse(SparkConversions.toSparkDataType(tableInfo.getRowType))
}

override def description(): String =
s"FlussScan: [$tablePath], Type: [$scanType]"
override def description(): String = {
val base = s"FlussScan: [$tablePath], Type: [$scanType]"
if (pushedSparkPredicates.isEmpty) base
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
}

override def supportedCustomMetrics(): Array[CustomMetric] =
Array(FlussNumRowsReadMetric())
Expand All @@ -57,7 +63,7 @@ case class FlussAppendScan(
tableInfo: TableInfo,
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
pushedSparkPredicates: Seq[Predicate],
override val pushedSparkPredicates: Seq[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {
Expand All @@ -77,27 +83,29 @@ case class FlussAppendScan(
flussConfig,
checkpointLocation)
}

override def description(): String = {
val base = super.description()
if (pushedSparkPredicates.isEmpty) base
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
}
}

/** Fluss Lake Append Scan. */
case class FlussLakeAppendScan(
tablePath: TablePath,
tableInfo: TableInfo,
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {

override protected val scanType: String = "LakeAppend"

override def toBatch: Batch = {
new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options, flussConfig)
new FlussLakeAppendBatch(
tablePath,
tableInfo,
readSchema,
pushedPredicate,
options,
flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
Expand Down Expand Up @@ -142,14 +150,22 @@ case class FlussLakeUpsertScan(
tablePath: TablePath,
tableInfo: TableInfo,
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {

override protected val scanType: String = "LakeUpsert"

override def toBatch: Batch = {
new FlussLakeUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig)
new FlussLakeUpsertBatch(
tablePath,
tableInfo,
readSchema,
pushedPredicate,
options,
flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ package org.apache.fluss.spark.read
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{LogFormat, TableInfo, TablePath}
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils}
import org.apache.fluss.spark.utils.SparkPredicateConverter

import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util.{Collections, IdentityHashMap, Set => JSet}

import scala.collection.JavaConverters._

/** An interface that extends from Spark [[ScanBuilder]]. */
trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns {

Expand All @@ -45,21 +50,54 @@ trait FlussSupportsPushDownV2Filters extends FlussScanBuilder with SupportsPushD
protected var pushedPredicate: Option[FlussPredicate] = None
protected var acceptedPredicates: Array[Predicate] = Array.empty[Predicate]

protected def convertAndStorePredicates(predicates: Array[Predicate]): Unit = {
val (predicate, accepted) =
SparkPredicateConverter.convertPredicates(tableInfo.getRowType, predicates.toSeq)
pushedPredicate = predicate
acceptedPredicates = accepted.toArray
}

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
// Server-side batch filter only supports ARROW; other log formats reject it.
if (tableInfo.getTableConfig.getLogFormat == LogFormat.ARROW) {
val (predicate, accepted) =
SparkPredicateConverter.convertPredicates(tableInfo.getRowType, predicates.toSeq)
pushedPredicate = predicate
acceptedPredicates = accepted.toArray
convertAndStorePredicates(predicates)
}
// Server-side filter is batch-level only; Spark must re-apply for row-exact results.
predicates
}

override def pushedPredicates(): Array[Predicate] = acceptedPredicates
}

/**
* Lake reads push to the lake source regardless of log format. Each convertible predicate is
* offered to the lake source individually; only the lake-accepted subset is reported back to Spark
* and combined into the predicate handed to the scan.
*/
trait FlussLakeSupportsPushDownV2Filters extends FlussSupportsPushDownV2Filters {

def tablePath: TablePath

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val pairs =
SparkPredicateConverter.convertPerPredicate(tableInfo.getRowType, predicates.toSeq)
val (acceptedSpark, acceptedFluss) = if (pairs.isEmpty) {
(Seq.empty[Predicate], Seq.empty[FlussPredicate])
} else {
val lakeSource =
FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
val result = FlussLakeBatch.applyLakeFilters(lakeSource, pairs.map(_._2).asJava)
// Identity-match: lake sources are expected to return the same instances they received.
val acceptedSet: JSet[FlussPredicate] =
Collections.newSetFromMap(new IdentityHashMap())
acceptedSet.addAll(result.acceptedPredicates())
pairs.collect { case (sp, fp) if acceptedSet.contains(fp) => (sp, fp) }.unzip
}
Comment thread
fresh-borzoni marked this conversation as resolved.
pushedPredicate = SparkPredicateConverter.combineAnd(acceptedFluss)
acceptedPredicates = acceptedSpark.toArray
predicates
}
}

/** Fluss Append Scan Builder. */
class FlussAppendScanBuilder(
tablePath: TablePath,
Expand All @@ -82,14 +120,21 @@ class FlussAppendScanBuilder(

/** Fluss Lake Append Scan Builder. */
class FlussLakeAppendScanBuilder(
tablePath: TablePath,
tableInfo: TableInfo,
val tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
extends FlussScanBuilder {
extends FlussLakeSupportsPushDownV2Filters {

override def build(): Scan = {
FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
FlussLakeAppendScan(
tablePath,
tableInfo,
requiredSchema,
pushedPredicate,
acceptedPredicates.toSeq,
options,
flussConfig)
}
}

Expand All @@ -108,13 +153,20 @@ class FlussUpsertScanBuilder(

/** Fluss Lake Upsert Scan Builder for lake-enabled primary key tables. */
class FlussLakeUpsertScanBuilder(
tablePath: TablePath,
tableInfo: TableInfo,
val tablePath: TablePath,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
extends FlussScanBuilder {
extends FlussLakeSupportsPushDownV2Filters {

override def build(): Scan = {
FlussLakeUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
FlussLakeUpsertScan(
tablePath,
tableInfo,
requiredSchema,
pushedPredicate,
acceptedPredicates.toSeq,
options,
flussConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.Configuration
import org.apache.fluss.exception.LakeTableSnapshotNotExistException
import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath}
import org.apache.fluss.metadata.{LogFormat, ResolvedPartitionSpec, TableBucket, TableInfo, TablePath}
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read._
import org.apache.fluss.utils.ExceptionUtils

Expand All @@ -38,21 +39,33 @@ class FlussLakeAppendBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
pushedPredicate: Option[FlussPredicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {

// Required by FlussLakeBatch but unused — lake snapshot determines start offsets.
override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()

// Server-side log filter requires ARROW format.
private val logTailPredicate: Option[FlussPredicate] =
if (tableInfo.getTableConfig.getLogFormat == LogFormat.ARROW) pushedPredicate else None

override def createReaderFactory(): PartitionReaderFactory = {
if (isFallback) {
new FlussAppendPartitionReaderFactory(tablePath, projection, None, options, flussConfig)
new FlussAppendPartitionReaderFactory(
tablePath,
projection,
logTailPredicate,
options,
flussConfig)
} else {
new FlussLakePartitionReaderFactory(
tableInfo.getProperties.toMap,
tablePath,
projection,
pushedPredicate,
logTailPredicate,
flussConfig)
}
}
Expand All @@ -75,6 +88,7 @@ class FlussLakeAppendBatch(

val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeUtils.lakeProjection(projection))
pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _))

val lakeSplits = lakeSource
.createPlanner(new LakeSource.PlannerContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package org.apache.fluss.spark.read.lake

import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer}
import org.apache.fluss.config.Configuration
import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util.Collections

import scala.collection.JavaConverters._

abstract class FlussLakeBatch(
Expand Down Expand Up @@ -62,3 +67,21 @@ abstract class FlussLakeBatch(
.toMap
}
}

object FlussLakeBatch extends Logging {

def applyLakeFilters(
lakeSource: LakeSource[LakeSplit],
predicates: java.util.List[FlussPredicate]): LakeSource.FilterPushDownResult = {
val result = lakeSource.withFilters(predicates)
logInfo(
s"Lake source accepted ${result.acceptedPredicates()}, " +
s"remaining ${result.remainingPredicates()}")
result
}

def applyLakeFilters(
lakeSource: LakeSource[LakeSplit],
predicate: FlussPredicate): LakeSource.FilterPushDownResult =
applyLakeFilters(lakeSource, Collections.singletonList(predicate))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read.lake
import org.apache.fluss.config.Configuration
import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
import org.apache.fluss.metadata.TablePath
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader}

import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -32,12 +33,15 @@ class FlussLakePartitionReaderFactory(
tableProperties: util.Map[String, String],
tablePath: TablePath,
projection: Array[Int],
flussPredicate: Option[FlussPredicate],
logTailPredicate: Option[FlussPredicate],
flussConfig: Configuration)
extends PartitionReaderFactory {

@transient private lazy val lakeSource: LakeSource[LakeSplit] = {
val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath)
source.withProject(FlussLakeUtils.lakeProjection(projection))
flussPredicate.foreach(FlussLakeBatch.applyLakeFilters(source, _))
source
}

Expand All @@ -51,7 +55,12 @@ class FlussLakePartitionReaderFactory(
projection,
flussConfig)
case logSplit: FlussAppendInputPartition =>
new FlussAppendPartitionReader(tablePath, projection, None, logSplit, flussConfig)
new FlussAppendPartitionReader(
tablePath,
projection,
logTailPredicate,
logSplit,
flussConfig)
case mixedSplit: FlussLakeUpsertInputPartition =>
new FlussLakeUpsertPartitionReader(
tablePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.fluss.config.Configuration
import org.apache.fluss.exception.LakeTableSnapshotNotExistException
import org.apache.fluss.lake.source.LakeSplit
import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath}
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read._
import org.apache.fluss.utils.ExceptionUtils

Expand All @@ -41,6 +42,7 @@ class FlussLakeUpsertBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
pushedPredicate: Option[FlussPredicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {
Expand All @@ -57,10 +59,13 @@ class FlussLakeUpsertBatch(
if (isFallback) {
new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig)
} else {
// PK kv-tail reader does not consume server-side log filters.
new FlussLakePartitionReaderFactory(
tableInfo.getProperties.toMap,
tablePath,
projection,
pushedPredicate,
None,
flussConfig)
}
}
Expand All @@ -83,6 +88,7 @@ class FlussLakeUpsertBatch(

val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeUtils.lakeProjection(projection))
pushedPredicate.foreach(FlussLakeBatch.applyLakeFilters(lakeSource, _))

val lakeSplits = lakeSource
.createPlanner(() => lakeSnapshot.getSnapshotId)
Expand Down
Loading