Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath}
import org.apache.fluss.predicate.Predicate
import org.apache.fluss.spark.utils.SparkPartitionPredicate

import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -78,6 +79,7 @@ class FlussAppendBatch(
tableInfo: TableInfo,
readSchema: StructType,
pushedPredicate: Option[Predicate],
partitionPredicate: Option[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
Expand Down Expand Up @@ -116,7 +118,9 @@ class FlussAppendBatch(
}

if (tableInfo.isPartitioned) {
partitionInfos.asScala
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
matching
.map {
partitionInfo =>
val startBucketOffsets = startOffsetsInitializer.getBucketOffsets(
Expand Down Expand Up @@ -172,6 +176,7 @@ class FlussUpsertBatch(
tablePath: TablePath,
tableInfo: TableInfo,
readSchema: StructType,
partitionPredicate: Option[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
Expand Down Expand Up @@ -234,7 +239,9 @@ class FlussUpsertBatch(
}

if (tableInfo.isPartitioned) {
partitionInfos.asScala.flatMap {
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
matching.flatMap {
partitionInfo =>
val partitionName = partitionInfo.getPartitionName
val kvSnapshots =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ trait FlussScan extends Scan {
/** Spark predicates that the scan reports as pushed down (used in [[description]]). */
def pushedSparkPredicates: Seq[Predicate] = Seq.empty

def partitionPredicate: Option[FlussPredicate] = None

protected def scanType: String

override def readSchema(): StructType = {
Expand All @@ -49,8 +51,13 @@ trait FlussScan extends Scan {

override def description(): String = {
val base = s"FlussScan: [$tablePath], Type: [$scanType]"
if (pushedSparkPredicates.isEmpty) base
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
val withPushed =
if (pushedSparkPredicates.isEmpty) base
else s"$base [PushedPredicates: ${pushedSparkPredicates.mkString("[", ", ", "]")}]"
partitionPredicate match {
case Some(p) => s"$withPushed [PartitionFilter: $p]"
case None => withPushed
}
}

override def supportedCustomMetrics(): Array[CustomMetric] =
Expand All @@ -63,6 +70,7 @@ case class FlussAppendScan(
tableInfo: TableInfo,
requiredSchema: Option[StructType],
pushedPredicate: Option[FlussPredicate],
override val partitionPredicate: Option[FlussPredicate],
override val pushedSparkPredicates: Seq[Predicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
Expand All @@ -71,7 +79,14 @@ case class FlussAppendScan(
override protected val scanType: String = "Append"

override def toBatch: Batch = {
new FlussAppendBatch(tablePath, tableInfo, readSchema, pushedPredicate, options, flussConfig)
new FlussAppendBatch(
tablePath,
tableInfo,
readSchema,
pushedPredicate,
partitionPredicate,
options,
flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
Expand Down Expand Up @@ -124,14 +139,15 @@ case class FlussUpsertScan(
tablePath: TablePath,
tableInfo: TableInfo,
requiredSchema: Option[StructType],
override val partitionPredicate: Option[FlussPredicate],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {

override protected val scanType: String = "Upsert"

override def toBatch: Batch = {
new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig)
new FlussUpsertBatch(tablePath, tableInfo, readSchema, partitionPredicate, options, flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{LogFormat, TableInfo, TablePath}
import org.apache.fluss.predicate.{Predicate => FlussPredicate}
import org.apache.fluss.spark.read.lake.{FlussLakeBatch, FlussLakeUtils}
import org.apache.fluss.spark.utils.SparkPredicateConverter
import org.apache.fluss.spark.utils.{SparkPartitionPredicate, SparkPredicateConverter}

import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
Expand All @@ -42,11 +42,26 @@ trait FlussScanBuilder extends ScanBuilder with SupportsPushDownRequiredColumns
}
}

/** Predicate pushdown mixin: converts what it can, returns all predicates as residual. */
trait FlussSupportsPushDownV2Filters extends FlussScanBuilder with SupportsPushDownV2Filters {
/** Extracts a partition-key predicate so the scan can skip partitions that can't match. */
trait FlussSupportsPushDownPartitionFilters
extends FlussScanBuilder
with SupportsPushDownV2Filters {

def tableInfo: TableInfo

protected var partitionPredicate: Option[FlussPredicate] = None

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
partitionPredicate = SparkPartitionPredicate.extract(tableInfo, predicates.toSeq)
predicates
}

override def pushedPredicates(): Array[Predicate] = Array.empty
}

/** Adds ARROW server-side log filtering on top of partition pushdown. */
trait FlussSupportsPushDownV2Filters extends FlussSupportsPushDownPartitionFilters {

protected var pushedPredicate: Option[FlussPredicate] = None
protected var acceptedPredicates: Array[Predicate] = Array.empty[Predicate]

Expand All @@ -58,6 +73,7 @@ trait FlussSupportsPushDownV2Filters extends FlussScanBuilder with SupportsPushD
}

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
super.pushPredicates(predicates)
// Server-side batch filter only supports ARROW; other log formats reject it.
if (tableInfo.getTableConfig.getLogFormat == LogFormat.ARROW) {
convertAndStorePredicates(predicates)
Expand Down Expand Up @@ -112,6 +128,7 @@ class FlussAppendScanBuilder(
tableInfo,
requiredSchema,
pushedPredicate,
partitionPredicate,
acceptedPredicates.toSeq,
options,
flussConfig)
Expand Down Expand Up @@ -141,13 +158,13 @@ class FlussLakeAppendScanBuilder(
/** Fluss Upsert Scan Builder. */
class FlussUpsertScanBuilder(
tablePath: TablePath,
tableInfo: TableInfo,
val tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
extends FlussScanBuilder {
extends FlussSupportsPushDownPartitionFilters {

override def build(): Scan = {
FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
FlussUpsertScan(tablePath, tableInfo, requiredSchema, partitionPredicate, options, flussConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.spark.utils

import org.apache.fluss.metadata.{PartitionInfo, TableInfo}
import org.apache.fluss.predicate.{CompoundPredicate, LeafPredicate, PartitionPredicateVisitor, Predicate => FlussPredicate, PredicateBuilder, PredicateVisitor}
import org.apache.fluss.row.{BinaryString, GenericRow}
import org.apache.fluss.types.{DataTypes, RowType}
import org.apache.fluss.utils.PartitionUtils

import org.apache.spark.sql.connector.expressions.filter.Predicate

import scala.jdk.CollectionConverters._

/** Extracts a partition-key predicate and prunes the partition list at planning time. */
object SparkPartitionPredicate {

def extract(tableInfo: TableInfo, predicates: Seq[Predicate]): Option[FlussPredicate] = {
val partitionKeys = tableInfo.getPartitionKeys
if (partitionKeys.isEmpty) return None

val rowType = partitionRowType(tableInfo)
val onlyPartitionKeys = new PartitionPredicateVisitor(partitionKeys)

val converted = predicates.flatMap {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, toPartitionRow materializes all partition values as BinaryString, and stringifyLiterals rebuilds every LeafPredicate with DataTypes.STRING.
After that, predicate evaluation runs through fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java and fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java, which means comparisons are performed using the predicate type.
So for a non-string partition column, range predicates are no longer evaluated with the real type semantics, but with string lexicographic semantics.

A concrete incorrect case is an INT partition column. Suppose partitions include pt2=2 and pt2=10, and Spark pushes pt2 > 2. With correct integer semantics, partition 10 must be kept. With the current implementation, both the row value and the literal become strings, so "10" is compared to "2" lexicographically, and "10" < "2", causing partition 10 to be pruned incorrectly.
That is a correctness bug, not just a missed optimization, because the partition is skipped before scan.

Seems Flink also have similar issue.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation, I'm thinking to lift this to fluss-common and use PartitionUtils.parseValueOfType in both Spark/Flink.

Created an issue to fix this: #3292

sparkPredicate =>
SparkPredicateConverter
.convert(rowType, sparkPredicate)
.filter(_.visit(onlyPartitionKeys))
.map(stringifyLiterals)
}

converted match {
case Seq() => None
case Seq(single) => Some(single)
case many => Some(PredicateBuilder.and(many.asJava))
}
}

def filterPartitions(
partitionInfos: Seq[PartitionInfo],
partitionPredicate: Option[FlussPredicate]): Seq[PartitionInfo] =
partitionPredicate match {
case None => partitionInfos
case Some(predicate) => partitionInfos.filter(p => predicate.test(toPartitionRow(p)))
}

private def partitionRowType(tableInfo: TableInfo): RowType = {
val schemaRowType = tableInfo.getRowType
val fieldNames = schemaRowType.getFieldNames
val partitionFieldIndexes = tableInfo.getPartitionKeys.asScala.map(fieldNames.indexOf).toArray
schemaRowType.project(partitionFieldIndexes)
}

private def toPartitionRow(partitionInfo: PartitionInfo): GenericRow = {
val values = partitionInfo.getResolvedPartitionSpec.getPartitionValues
val row = new GenericRow(values.size)
var i = 0
while (i < values.size) {
row.setField(i, BinaryString.fromString(values.get(i)))
i += 1
}
row
}

// Partition values are stored as strings; literals must be coerced before evaluation.
private val stringifier: PredicateVisitor[FlussPredicate] = new PredicateVisitor[FlussPredicate] {
override def visit(leaf: LeafPredicate): FlussPredicate = {
val converted: Seq[Object] = leaf.literals.asScala.toSeq.map {
case null => null
case literal =>
BinaryString.fromString(
PartitionUtils.convertValueOfType(literal, leaf.`type`.getTypeRoot))
}
new LeafPredicate(
leaf.function,
DataTypes.STRING,
leaf.index,
leaf.fieldName,
converted.asJava)
}

override def visit(compound: CompoundPredicate): FlussPredicate = {
val children = compound.children.asScala.map(_.visit(this)).asJava
new CompoundPredicate(compound.function, children)
}
}

private def stringifyLiterals(predicate: FlussPredicate): FlussPredicate =
predicate.visit(stringifier)
}
Loading