From 3dc6f11bdaf37c9d47200e17e451aad67875563d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 4 Jun 2026 21:30:32 +0800 Subject: [PATCH 1/3] [SPARK-57261][SQL] Allow to disable HashAggregateExec by config --- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../spark/sql/execution/aggregate/AggUtils.scala | 16 +++------------- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../sql/execution/WholeStageCodegenSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsSuite.scala | 2 +- 5 files changed, 15 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 30db7bbbd59d4..9fb17c1c811ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3803,6 +3803,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_HASH_AGG = buildConf("spark.sql.execution.useHashAggregateExec") + .internal() + .doc("Decides if we use HashAggregateExec") + .version("4.3.0") + .booleanConf + .createWithDefault(true) + val JSON_GENERATOR_IGNORE_NULL_FIELDS = buildConf("spark.sql.jsonGenerator.ignoreNullFields") .doc("Whether to ignore null fields when generating JSON objects in JSON data source and " + @@ -8189,6 +8196,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) + def useHashAggregation: Boolean = getConf(USE_HASH_AGG) + def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 58055fa6129a5..c2a7024367afd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -75,13 +75,12 @@ object AggUtils { initialInputBufferOffset: Int = 0, resultExpressions: Seq[NamedExpression] = Nil, child: SparkPlan): SparkPlan = { - val useHash = Aggregate.supportsHashAggregate( + val useHash = child.conf.useHashAggregation && Aggregate.supportsHashAggregate( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes), groupingExpressions) val forceObjHashAggregate = forceApplyObjectHashAggregate(child.conf) - val forceSortAggregate = forceApplySortAggregate(child.conf) - if (useHash && !forceSortAggregate && !forceObjHashAggregate) { + if (useHash && !forceObjHashAggregate) { HashAggregateExec( requiredChildDistributionExpressions = requiredChildDistributionExpressions, isStreaming = isStreaming, @@ -97,7 +96,7 @@ object AggUtils { val useObjectHash = Aggregate.supportsObjectHashAggregate( aggregateExpressions, groupingExpressions) - if (forceObjHashAggregate || (objectHashEnabled && useObjectHash && !forceSortAggregate)) { + if (forceObjHashAggregate || (objectHashEnabled && useObjectHash)) { ObjectHashAggregateExec( requiredChildDistributionExpressions = requiredChildDistributionExpressions, isStreaming = isStreaming, @@ -584,15 +583,6 @@ object AggUtils { } } - /** - * Returns whether a sort aggregate should be force applied. - * The config key is hard-coded because it's testing only and should not be exposed. - */ - private def forceApplySortAggregate(conf: SQLConf): Boolean = { - Utils.isTesting && - conf.getConfString("spark.sql.test.forceApplySortAggregate", "false") == "true" - } - /** * Returns whether a object hash aggregate should be force applied. * The config key is hard-coded because it's testing only and should not be exposed. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 694b087d6c3fd..c0b4dfcefda8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -3322,7 +3322,7 @@ class DataFrameAggregateSuite extends SharedSparkSession Seq( "spark.sql.test.forceApplyObjectHashAggregate" -> "true", SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "1"), - Seq("spark.sql.test.forceApplySortAggregate" -> "true") + Seq(SQLConf.USE_HASH_AGG -> "false") ) // Make tests faster diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index a83d5c99bb5d1..c0bee566b8ba6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -59,7 +59,7 @@ class WholeStageCodegenSuite extends SharedSparkSession test("SortAggregate should be included in WholeStageCodegen") { val df = spark.range(10).agg(max(col("id")), avg(col("id"))) - withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") { + withSQLConf(SQLConf.USE_HASH_AGG -> "false") { val plan = df.queryExecution.executedPlan assert(plan.exists(p => p.isInstanceOf[WholeStageCodegenExec] && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e2eb37e7a433e..6f06a73480801 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -329,7 +329,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("SortAggregate metrics") { // Force use SortAggregateExec instead of HashAggregateExec - withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") { + withSQLConf(SQLConf.USE_HASH_AGG.key -> "false") { // Assume the execution plan is // -> SortAggregate(nodeId = 0) // -> Sort(nodeId = 1) From 31a601f6bac46573684b05c736d08a3848049b91 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 5 Jun 2026 02:23:12 +0800 Subject: [PATCH 2/3] fix --- .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 2 +- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index c0b4dfcefda8d..234300baa52cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -3322,7 +3322,7 @@ class DataFrameAggregateSuite extends SharedSparkSession Seq( "spark.sql.test.forceApplyObjectHashAggregate" -> "true", SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "1"), - Seq(SQLConf.USE_HASH_AGG -> "false") + Seq(SQLConf.USE_HASH_AGG.key -> "false") ) // Make tests faster diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index c0bee566b8ba6..886df9184aca4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -59,7 +59,7 @@ class WholeStageCodegenSuite extends SharedSparkSession test("SortAggregate should be included in WholeStageCodegen") { val df = spark.range(10).agg(max(col("id")), avg(col("id"))) - withSQLConf(SQLConf.USE_HASH_AGG -> "false") { + withSQLConf(SQLConf.USE_HASH_AGG.key -> "false") { val plan = df.queryExecution.executedPlan assert(plan.exists(p => p.isInstanceOf[WholeStageCodegenExec] && From d61d0be3684539346686df333e34ef5d5a1fcdfe Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 5 Jun 2026 10:30:58 +0800 Subject: [PATCH 3/3] fix --- .../configs-without-binding-policy-exceptions | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index de94d0127931d..26f29ba24d100 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -603,6 +603,7 @@ spark.sql.execution.replaceHashWithSortAgg spark.sql.execution.reuseSubquery spark.sql.execution.sortBeforeRepartition spark.sql.execution.topKSortFallbackThreshold +spark.sql.execution.useHashAggregateExec spark.sql.execution.useObjectHashAggregateExec spark.sql.execution.usePartitionEvaluator spark.sql.extendedExplainProviders