diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index 2ea7b9e554..e5a4803db1 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -55,6 +55,7 @@ import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.control.NonFatal /** Gluten's stats tracker with vectorized aggregation inside to produce statistics efficiently. */ private[stats] class GlutenDeltaJobStatsTracker(val delegate: DeltaJobStatisticsTracker) @@ -89,8 +90,18 @@ object GlutenDeltaJobStatsTracker extends Logging { def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker match { case tracker: BasicWriteJobStatsTracker => new GlutenDeltaJobStatsRowCountingTracker(tracker) - case tracker: DeltaJobStatisticsTracker => + case tracker: DeltaJobStatisticsTracker + if canOffloadStats(tracker.dataCols, tracker.statsColExpr) => new GlutenDeltaJobStatsTracker(tracker) + case tracker: DeltaJobStatisticsTracker => + // The per-file statistics aggregation could not be offloaded to Velox (for example min/max + // over TIMESTAMP_NTZ). The native stats tracker assumes the aggregation collapses into a + // WholeStageTransformer and would otherwise crash with a ClassCastException, so use the + // row-based fallback tracker (columnar-to-row + the original Delta tracker) instead. + logWarning( + "Gluten Delta: per-file statistics aggregation cannot be offloaded to Velox; " + + "falling back to row-based stats collection.") + new GlutenDeltaJobStatsFallbackTracker(tracker) case tracker => logWarning( "Gluten Delta: Creating fallback job stats tracker," + @@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging { new GlutenDeltaJobStatsFallbackTracker(tracker) } + /** + * Returns whether the Delta per-file statistics aggregation can be offloaded to a Velox + * whole-stage transformer. This mirrors the plan that [[GlutenDeltaTaskStatsTracker]] builds on + * the executors: if the aggregation/projection is not supported by Velox it stays a vanilla + * [[ProjectExec]] (i.e. does not collapse into a [[WholeStageTransformer]]), in which case the + * native stats tracker must not be used. Evaluated once on the driver so the executors never + * allocate native resources for a plan that cannot run. + */ + private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr: Expression): Boolean = { + try { + val aggregates = statsColExpr.collect { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + ae + } + val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes) + val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes) + val aggOp = SortAggregateExec( + None, + isStreaming = false, + None, + Seq.empty, + aggregates, + statsAttrs, + 0, + statsResultAttrs, + StatisticsInputNode(dataCols)) + val projOp = ProjectExec(statsResultAttrs, aggOp) + val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) + val config = GlutenConfig.get + val transformRule = HeuristicTransform.WithRewrites( + Validators.newValidator(config, offloads), + Seq(PullOutPreProject), + offloads) + ColumnarCollapseTransformStages(config)( + transformRule(projOp)).isInstanceOf[WholeStageTransformer] + } catch { + case NonFatal(e) => + logWarning("Gluten Delta: failed to plan native stats aggregation; using fallback.", e) + false + } + } + /** A columnar-based statistics collection for Gluten + Delta Lake. */ private class GlutenDeltaTaskStatsTracker( dataCols: Seq[Attribute], diff --git a/backends-velox/src-delta33/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala new file mode 100644 index 0000000000..70da8cc317 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.test.SharedSparkSession + +import java.io.File + +/** + * Regression test for the Gluten Delta per-file statistics tracker. + * + * Writing a Delta table whose collected min/max statistics cannot be offloaded to Velox -- for + * example over a TIMESTAMP_NTZ column -- used to crash the write task with a ClassCastException + * (ProjectExec cannot be cast to WholeStageTransformer), because the native stats tracker assumed + * the statistics aggregation always collapses into a WholeStageTransformer. The tracker must now + * fall back to row-based statistics collection instead of crashing. + */ +class GlutenDeltaStatsSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { + + import testImplicits._ + + test("TIMESTAMP_NTZ stats fall back instead of crashing the write") { + withTempDir { + dir => + val path = new File(dir, "ntz-stats").getCanonicalPath + // The maxValue statistic for a TIMESTAMP_NTZ near Long.MaxValue triggers the per-file + // statistics aggregation that cannot be offloaded to Velox. + val nearMaxMicros = Long.MaxValue - 999L + val data = Seq(nearMaxMicros) + .toDF("micros") + .selectExpr("micros AS id", "CAST(TIMESTAMP_MICROS(micros) AS TIMESTAMP_NTZ) AS ts") + + // Without the fix this write fails with a ClassCastException (ProjectExec cannot be cast + // to WholeStageTransformer) while collecting statistics. With the fix it succeeds via the + // row-based fallback tracker. A count avoids materializing the TIMESTAMP_NTZ column, which + // is an unrelated read-path limitation. + data.coalesce(1).write.format("delta").save(path) + + assert(spark.read.format("delta").load(path).count() === 1) + } + } +} diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala index ca6c7a6a7f..5ef85e6e45 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala @@ -55,6 +55,7 @@ import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.control.NonFatal /** Gluten's stats tracker with vectorized aggregation inside to produce statistics efficiently. */ private[stats] class GlutenDeltaJobStatsTracker(val delegate: DeltaJobStatisticsTracker) @@ -89,8 +90,18 @@ object GlutenDeltaJobStatsTracker extends Logging { def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker match { case tracker: BasicWriteJobStatsTracker => new GlutenDeltaJobStatsRowCountingTracker(tracker) - case tracker: DeltaJobStatisticsTracker => + case tracker: DeltaJobStatisticsTracker + if canOffloadStats(tracker.dataCols, tracker.statsColExpr) => new GlutenDeltaJobStatsTracker(tracker) + case tracker: DeltaJobStatisticsTracker => + // The per-file statistics aggregation could not be offloaded to Velox (for example min/max + // over TIMESTAMP_NTZ). The native stats tracker assumes the aggregation collapses into a + // WholeStageTransformer and would otherwise crash with a ClassCastException, so use the + // row-based fallback tracker (columnar-to-row + the original Delta tracker) instead. + logWarning( + "Gluten Delta: per-file statistics aggregation cannot be offloaded to Velox; " + + "falling back to row-based stats collection.") + new GlutenDeltaJobStatsFallbackTracker(tracker) case tracker => logWarning( "Gluten Delta: Creating fallback job stats tracker," + @@ -99,6 +110,48 @@ object GlutenDeltaJobStatsTracker extends Logging { new GlutenDeltaJobStatsFallbackTracker(tracker) } + /** + * Returns whether the Delta per-file statistics aggregation can be offloaded to a Velox + * whole-stage transformer. This mirrors the plan that [[GlutenDeltaTaskStatsTracker]] builds on + * the executors: if the aggregation/projection is not supported by Velox it stays a vanilla + * [[ProjectExec]] (i.e. does not collapse into a [[WholeStageTransformer]]), in which case the + * native stats tracker must not be used. Evaluated once on the driver so the executors never + * allocate native resources for a plan that cannot run. + */ + private def canOffloadStats(dataCols: Seq[Attribute], statsColExpr: Expression): Boolean = { + try { + val aggregates = statsColExpr.collect { + case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => + ae + } + val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes) + val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes) + val aggOp = SortAggregateExec( + None, + isStreaming = false, + None, + Seq.empty, + aggregates, + statsAttrs, + 0, + statsResultAttrs, + StatisticsInputNode(dataCols)) + val projOp = ProjectExec(statsResultAttrs, aggOp) + val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) + val config = GlutenConfig.get + val transformRule = HeuristicTransform.WithRewrites( + Validators.newValidator(config, offloads), + Seq(PullOutPreProject), + offloads) + ColumnarCollapseTransformStages(config)( + transformRule(projOp)).isInstanceOf[WholeStageTransformer] + } catch { + case NonFatal(e) => + logWarning("Gluten Delta: failed to plan native stats aggregation; using fallback.", e) + false + } + } + /** A columnar-based statistics collection for Gluten + Delta Lake. */ private class GlutenDeltaTaskStatsTracker( dataCols: Seq[Attribute], diff --git a/backends-velox/src-delta40/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala b/backends-velox/src-delta40/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala new file mode 100644 index 0000000000..70da8cc317 --- /dev/null +++ b/backends-velox/src-delta40/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.test.SharedSparkSession + +import java.io.File + +/** + * Regression test for the Gluten Delta per-file statistics tracker. + * + * Writing a Delta table whose collected min/max statistics cannot be offloaded to Velox -- for + * example over a TIMESTAMP_NTZ column -- used to crash the write task with a ClassCastException + * (ProjectExec cannot be cast to WholeStageTransformer), because the native stats tracker assumed + * the statistics aggregation always collapses into a WholeStageTransformer. The tracker must now + * fall back to row-based statistics collection instead of crashing. + */ +class GlutenDeltaStatsSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { + + import testImplicits._ + + test("TIMESTAMP_NTZ stats fall back instead of crashing the write") { + withTempDir { + dir => + val path = new File(dir, "ntz-stats").getCanonicalPath + // The maxValue statistic for a TIMESTAMP_NTZ near Long.MaxValue triggers the per-file + // statistics aggregation that cannot be offloaded to Velox. + val nearMaxMicros = Long.MaxValue - 999L + val data = Seq(nearMaxMicros) + .toDF("micros") + .selectExpr("micros AS id", "CAST(TIMESTAMP_MICROS(micros) AS TIMESTAMP_NTZ) AS ts") + + // Without the fix this write fails with a ClassCastException (ProjectExec cannot be cast + // to WholeStageTransformer) while collecting statistics. With the fix it succeeds via the + // row-based fallback tracker. A count avoids materializing the TIMESTAMP_NTZ column, which + // is an unrelated read-path limitation. + data.coalesce(1).write.format("delta").save(path) + + assert(spark.read.format("delta").load(path).count() === 1) + } + } +}