Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class ResolutionValidator {
validateRelation(multiInstanceRelation)
case supervisingCommand: SupervisingCommand =>
validateSupervisingCommand(supervisingCommand)
case setCommandBase: SetCommandBase =>
validateSetCommandBase(setCommandBase)
}

operator match {
Expand Down Expand Up @@ -351,6 +353,8 @@ class ResolutionValidator {

private def validateSupervisingCommand(supervisingCommand: SupervisingCommand): Unit = {}

private def validateSetCommandBase(setCommandBase: SetCommandBase): Unit = {}

private def handleOperatorOutput(operator: LogicalPlan): Unit = {
attributeScopeStack.overwriteCurrent(operator.output)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ class Resolver(
handleLeafOperator(hiveTableRelation)
case supervisingCommand: SupervisingCommand =>
resolveSupervisingCommand(supervisingCommand)
case setCommandBase: SetCommandBase =>
resolveSetCommand(setCommandBase)
case repartition: Repartition =>
resolveRepartition(repartition)
case sample: Sample =>
Expand Down Expand Up @@ -731,6 +733,14 @@ class Resolver(
supervisingCommand
}

/**
* [[SetCommandBase]] is a leaf command that is fully formed by the parser and requires no
* resolution.
*/
private def resolveSetCommand(setCommandBase: SetCommandBase): LogicalPlan = {
setCommandBase
}

/**
* Resolve [[Repartition]] operator. Its resolution doesn't require any specific logic (besides
* child resolution).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ class ResolverGuard(
checkSort(sort)
case supervisingCommand: SupervisingCommand =>
None
case _: SetCommandBase
if conf.getConf(
SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLE_SET_COMMAND_RESOLUTION
) =>
None
case repartition: Repartition =>
checkRepartition(repartition)
case having: UnresolvedHaving =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ trait SupervisingCommand extends LeafCommand {
*/
def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): LogicalPlan
}

/**
* A base trait for the SET command that lives in `sql/core`. This trait is
* defined in `sql/catalyst` so that the single-pass analyzer can
* pattern-match on it without depending on `sql/core`.
*/
trait SetCommandBase extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ANALYZER_SINGLE_PASS_RESOLVER_ENABLE_SET_COMMAND_RESOLUTION =
buildConf("spark.sql.analyzer.singlePassResolver.enableSetCommandResolution")
.internal()
.doc(
"When true, enables SetCommand resolution in single-pass analyzer. Otherwise, " +
"resolution falls back to fixed-point analyzer."
)
.version("4.1.0")
.booleanConf
.createWithDefault(true)

val ANALYZER_SINGLE_PASS_RESOLVER_RUN_HEAVY_EXTENDED_RESOLUTION_CHECKS =
buildConf("spark.sql.analyzer.singlePassResolver.runHeavyExtendedResolutionChecks")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.VariableResolution
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.SetCommandBase
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
* }}}
*/
case class SetCommand(kv: Option[(String, Option[String])])
extends LeafRunnableCommand with Logging {
extends LeafRunnableCommand with Logging with SetCommandBase {

private def keyValueOutput: Seq[Attribute] = {
val schema = StructType(Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.resolver.{
}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

class ResolverGuardSuite extends ResolverGuardSuiteBase {
Expand Down Expand Up @@ -259,6 +260,25 @@ class ResolverGuardSuite extends ResolverGuardSuiteBase {
checkResolverGuard("DESCRIBE QUERY SELECT * FROM VALUES (1)")
}

Seq(true, false).foreach { enabled =>
val expectedReason = if (enabled) {
None
} else {
Some("class org.apache.spark.sql.execution.command.SetCommand operator resolution")
}
test(s"SET command - enabled=$enabled") {
withSQLConf(
SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLE_SET_COMMAND_RESOLUTION.key ->
enabled.toString
) {
checkResolverGuard("SET", expectedReason)
checkResolverGuard("SET spark.sql.shuffle.partitions=10", expectedReason)
checkResolverGuard("SET spark.sql.shuffle.partitions", expectedReason)
checkResolverGuard("SET -v", expectedReason)
}
}
}

test("HAVING") {
checkResolverGuard(
"SELECT col1 FROM VALUES(1) GROUP BY col1 HAVING col1 > 1"
Expand Down