Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {

def enableFallbackReport: Boolean = getConf(FALLBACK_REPORTER_ENABLED)

def failOnFallback: Boolean = getConf(FALLBACK_FAIL_ON_FALLBACK)

def debug: Boolean = getConf(DEBUG_ENABLED)

/** Full scan SQL metrics; also enabled when [[debug]] is true. */
Expand Down Expand Up @@ -1430,6 +1432,15 @@ object GlutenConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)

val FALLBACK_FAIL_ON_FALLBACK =
buildConf("spark.gluten.sql.columnar.failOnFallback")
.internal()
.doc(
"When true, throw an exception if any operator falls back to Spark" +
" instead of running on the native engine.")
.booleanConf

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add .internal() to mark this config as not intended for end users.

.createWithDefault(false)

val TEXT_INPUT_ROW_MAX_BLOCK_SIZE =
buildConf("spark.gluten.sql.text.input.max.block.size")
.doc("the max block size for text input rows")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,39 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
}
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]
/**
* Walks `plan` and classifies each operator into one of three buckets:
* - native Gluten operator: invokes `onGluten`
* - vanilla Spark operator considered a fallback: invokes `onFallback` with the reason resolved
* from physical or logical [[FallbackTags]], or a synthetic default when no tag is present
* - structural / non-operator nodes (commands, transitions, codegen wrappers, query stages, AQE
* shuffle reads, reused exchanges, etc.): no callback
*
* Recurses into AQE subqueries and query stages, and into each visited operator's
* `innerChildren`. Subqueries reached purely via expressions on a vanilla Spark plan are not
* visited here; callers that need full subquery coverage should use [[processPlan]], which
* performs an explicit subquery walk.
*/
def visitFallbackNodes(
plan: QueryPlan[_],
onFallback: (SparkPlan, String) => Unit,
onGluten: SparkPlan => Unit = _ => ()): Unit = {

def collect(tmp: QueryPlan[_]): Unit = {
def fallbackReason(p: SparkPlan): String =
FallbackTags
.getOption(p)
.orElse(p.logicalLink.flatMap(FallbackTags.getOption))
.map(_.reason())
.getOrElse("Gluten does not touch it or does not support it")

def visit(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
case cmd: CommandResultExec => visit(cmd.commandPhysicalPlan)
case p: V2CommandExec
if FallbackTags.nonEmpty(p) ||
p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
onFallback(p, fallbackReason(p))
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
Expand All @@ -108,26 +129,36 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
case _: ReusedExchangeExec =>
case _: NoopLeaf =>
case w: WriteFilesExec if w.child.isInstanceOf[NoopLeaf] =>
case sub: AdaptiveSparkPlanExec if sub.isSubquery => collect(sub.executedPlan)
case sub: AdaptiveSparkPlanExec if sub.isSubquery => visit(sub.executedPlan)
case _: AdaptiveSparkPlanExec =>
case p: QueryStageExec => collect(p.plan)
case p: QueryStageExec => visit(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
p.innerChildren.foreach(collect)
onGluten(p)
p.innerChildren.foreach(visit)
case i: InMemoryTableScanExec =>
if (PlanUtil.isGlutenTableCache(i)) {
numGlutenNodes += 1
onGluten(i)
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
onFallback(i, "Columnar table cache is disabled")
}
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
onFallback(p, fallbackReason(p))
p.innerChildren.foreach(visit)
case _ =>
}
}
collect(plan)
visit(plan)
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]
visitFallbackNodes(
plan,
onGluten = _ => numGlutenNodes += 1,
onFallback = (p, reason) => addFallbackNodeWithReason(p, reason, fallbackNodeToReason)
)
(numGlutenNodes, fallbackNodeToReason.toMap)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.logging.LogLevelUtil

Expand All @@ -27,20 +27,33 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.ui.GlutenUIUtils

import scala.collection.mutable.ArrayBuffer

/**
* This rule is used to collect all fallback reason.
* 1. print fallback reason for each plan node 2. post all fallback reason using one event
* Collects fallback information from a finalized columnar plan and, depending on configuration:
* 1. logs the fallback reason for every non-Gluten operator 2. throws a [[GlutenException]] when
* `spark.gluten.sql.columnar.failOnFallback` is enabled and any fallback is present 3. posts a
* single [[GlutenPlanFallbackEvent]] for the SQL UI
*
* The set of operators that count as a fallback is shared with [[GlutenExplainUtils]] via
* [[GlutenExplainUtils.visitFallbackNodes]], so the reporter, the SQL UI, and any future consumers
* always agree on what "fallback" means.
*/
case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession)
extends Rule[SparkPlan]
with LogLevelUtil {

override def apply(plan: SparkPlan): SparkPlan = {
if (!glutenConf.enableFallbackReport) {
val report = glutenConf.enableFallbackReport
val fail = glutenConf.failOnFallback
if (!report && !fail) {
return plan
}
printFallbackReason(plan)
if (GlutenUIUtils.uiEnabled(spark.sparkContext)) {
val fallbacks = collectAndLogFallbacks(plan, log = report)
if (fail) {
throwIfFallback(fallbacks)
}
if (report && GlutenUIUtils.uiEnabled(spark.sparkContext)) {
postFallbackReason(plan)
}
plan
Expand All @@ -53,26 +66,35 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession)
logOnLevel(logLevel, s"Validation failed for plan: $nodeName$executionIdInfo, due to: $reason")
}

private def printFallbackReason(plan: SparkPlan): Unit = {
private def collectAndLogFallbacks(plan: SparkPlan, log: Boolean): Seq[(String, String)] = {
val validationLogLevel = glutenConf.validationLogLevel
def printPlan(p: SparkPlan): Unit = {
p.foreachUp {
case _: GlutenPlan => // ignore
case cmd: CommandResultExec =>
printPlan(cmd.commandPhysicalPlan)
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
val tag = FallbackTags.get(p)
logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
// With in next round stage in AQE, the physical plan would be a new instance that
// can not preserve the tag, so we need to set the fallback reason to logical plan.
// Then we can be aware of the fallback reason for the whole plan.
// If a logical plan mapping to several physical plan, we add all reason into
// that logical plan to make sure we do not lose any fallback reason.
p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, tag))
case _ =>
val fallbacks = ArrayBuffer[(String, String)]()
GlutenExplainUtils.visitFallbackNodes(
plan,
onFallback = (node, reason) => {
if (log) {
logFallbackReason(validationLogLevel, node.nodeName, reason)
// Within the next AQE stage, the physical plan would be a new instance that does not
// preserve the tag, so we propagate the fallback reason to the logical plan.
// If a logical plan maps to several physical plans, we add all reasons onto that
// logical plan to make sure we do not lose any fallback reason.
FallbackTags
.getOption(node)
.foreach(tag => node.logicalLink.foreach(FallbackTags.add(_, tag)))
}
fallbacks += ((node.nodeName, reason))
}
)
fallbacks.toSeq
}

private def throwIfFallback(fallbacks: Seq[(String, String)]): Unit = {
if (fallbacks.nonEmpty) {
val reasons = fallbacks.map { case (name, reason) => s" $name: $reason" }.mkString("\n")
throw new GlutenException(
s"Gluten fallback detected (${GlutenConfig.FALLBACK_FAIL_ON_FALLBACK.key} is enabled). " +
s"${fallbacks.size} operator(s) fell back to Spark:\n$reasons")
}
printPlan(plan)
}

private def postFallbackReason(plan: SparkPlan): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.GlutenException

import java.util.Locale

class GlutenFailOnFallbackSuite extends WholeStageTransformerSuite {

protected val resourcePath: String = null
protected val fileFormat: String = null

test("failOnFallback throws only when a fallback is present") {
withTable("t") {
spark.range(10).write.format("parquet").saveAsTable("t")

// failOnFallback explicitly disabled: a fallback must NOT throw.
withSQLConf(
GlutenConfig.FALLBACK_FAIL_ON_FALLBACK.key -> "false",
GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
sql("SELECT * FROM t").collect()
}

// failOnFallback enabled, fully native query (no fallback): must NOT throw.
withSQLConf(GlutenConfig.FALLBACK_FAIL_ON_FALLBACK.key -> "true") {
sql("SELECT * FROM t").collect()
}

// failOnFallback enabled, fallback present: must throw with the config key
// and the offending operator/reason in the message.
withSQLConf(
GlutenConfig.FALLBACK_FAIL_ON_FALLBACK.key -> "true",
GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
val ex = intercept[GlutenException] {
sql("SELECT * FROM t").collect()
}
assert(ex.getMessage.contains(GlutenConfig.FALLBACK_FAIL_ON_FALLBACK.key))
assert(ex.getMessage.contains("fell back to Spark"))
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("scan"))
}
}
}
}
Loading