Skip to content

[VL] Fix ClassCastException in Delta stats for non-offloadable aggregations#12292

Open
felipepessoto wants to merge 1 commit into
apache:mainfrom
felipepessoto:fix-delta-stats-classcast
Open

[VL] Fix ClassCastException in Delta stats for non-offloadable aggregations#12292
felipepessoto wants to merge 1 commit into
apache:mainfrom
felipepessoto:fix-delta-stats-classcast

Conversation

@felipepessoto

@felipepessoto felipepessoto commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

Uncovered by the Delta Spark UT pipeline (#12278).

GlutenDeltaJobStatsTracker builds the per-file statistics aggregation as a SortAggregateExec -> ProjectExec plan, runs Gluten's HeuristicTransform, then unconditionally casts the result to a WholeStageTransformer. When the statistics aggregation cannot be offloaded to Velox -- for example min/max over a TIMESTAMP_NTZ column, as exercised by Delta's DataSkippingDeltaV1Suite "data skipping on TIMESTAMP_NTZ near Long.MaxValue" -- the projection stays a vanilla ProjectExec and the cast throws:

java.lang.ClassCastException: org.apache.spark.sql.execution.ProjectExec cannot be cast to org.apache.gluten.execution.WholeStageTransformer

in the per-task tracker constructor (GlutenDeltaJobStatsTracker.scala), failing the write.

This PR decides on the driver whether the aggregation actually offloads: a new canOffloadStats() dry-runs the same transform pipeline once and checks whether it collapses into a WholeStageTransformer. If it does not, the DeltaJobStatisticsTracker is routed to the existing GlutenDeltaJobStatsFallbackTracker (columnar-to-row + the original Delta tracker, which produces correct statistics for any type) instead of the native tracker. Evaluating this on the driver also avoids the per-task constructor allocating a single-thread executor and a NativePlanEvaluator before the cast. The fix is applied to both the Delta 3.x (src-delta33) and Delta 4.x (src-delta40) copies.

How was this patch tested?

Added GlutenDeltaStatsSuite, which writes a Delta table whose TIMESTAMP_NTZ min/max statistics cannot be offloaded to Velox. Before this change the write crashes with the ClassCastException above; after it, the write succeeds via the row-based fallback tracker.

Locally verified (Spark 3.5, Scala 2.12): the new suite fails without the fix (Tests: succeeded 0, failed 1, ClassCastException) and passes with it (succeeded 1, failed 0). A companion test-only PR (#12293) demonstrates the same red/green contrast on CI. Also confirmed end-to-end against Delta's DataSkippingDeltaV1Suite "TIMESTAMP_NTZ near Long.MaxValue" (succeeded 2, failed 0 with the fix).

scalafmt/spotless report no changes.

PR/CI unit test before the fix: #12293 / https://github.com/apache/gluten/actions/runs/27509527767/job/81307865870?pr=12293

image

Was this patch authored or co-authored using generative AI tooling?

Generated-by: GitHub Copilot CLI (claude-opus-4.8)

Copilot AI review requested due to automatic review settings June 14, 2026 15:18
@github-actions github-actions Bot added the VELOX label Jun 14, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Prevent Delta writes from crashing when per-file statistics aggregation cannot be offloaded to Velox by detecting non-offloadable plans on the driver and falling back to the row-based tracker.

Changes:

  • Added a driver-side canOffloadStats check to decide whether to use the native stats tracker or a safe fallback.
  • Added a warning + fallback path for non-offloadable per-file statistics aggregations.
  • Added a regression test covering TIMESTAMP_NTZ stats write behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala Adds offloadability detection and fallback selection to avoid WholeStageTransformer cast failures.
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala Same offloadability detection + fallback behavior for the delta33 variant.
backends-velox/src-delta/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala Adds regression coverage ensuring writes succeed when TIMESTAMP_NTZ stats aren’t offloadable.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +123 to +126
val aggregates = statsColExpr.collect {
case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
ae
}
statsResultAttrs,
StatisticsInputNode(dataCols))
val projOp = ProjectExec(statsResultAttrs, aggOp)
val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
Comment on lines +123 to +126
val aggregates = statsColExpr.collect {
case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
ae
}
statsResultAttrs,
StatisticsInputNode(dataCols))
val projOp = ProjectExec(statsResultAttrs, aggOp)
val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
Comment on lines +42 to +47
// The maxValue statistic for a TIMESTAMP_NTZ near Long.MaxValue triggers the per-file
// statistics aggregation that cannot be offloaded to Velox.
val nearMaxMicros = Long.MaxValue - 999L
val data = Seq(nearMaxMicros)
.toDF("micros")
.selectExpr("micros AS id", "CAST(TIMESTAMP_MICROS(micros) AS TIMESTAMP_NTZ) AS ts")
…ations

GlutenDeltaJobStatsTracker builds a SortAggregateExec -> ProjectExec plan for
the per-file statistics aggregation, runs Gluten's HeuristicTransform, then
unconditionally casts the result to a WholeStageTransformer. When the stats
aggregation cannot be offloaded to Velox -- e.g. min/max over TIMESTAMP_NTZ, as
exercised by Delta's DataSkippingDeltaV1Suite "data skipping on TIMESTAMP_NTZ
near Long.MaxValue" -- the projection stays a vanilla ProjectExec and the cast
throws java.lang.ClassCastException: ProjectExec cannot be cast to
WholeStageTransformer in the per-task tracker constructor, failing the write.

Decide on the driver whether the aggregation actually offloads: add
canOffloadStats(), which dry-runs the same transform pipeline once and checks
whether it collapses into a WholeStageTransformer. If it does not, route the
DeltaJobStatisticsTracker to the existing GlutenDeltaJobStatsFallbackTracker
(columnar-to-row + the original Delta tracker, which produces correct stats for
any type) instead of the native tracker. Evaluating this on the driver also
avoids the per-task constructor allocating a single-thread executor and a
NativePlanEvaluator before the cast. Applied to both the Delta 3.x (src-delta33)
and Delta 4.x (src-delta40) copies.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@felipepessoto felipepessoto force-pushed the fix-delta-stats-classcast branch from 626cc18 to e8a84f3 Compare June 14, 2026 19:26
@felipepessoto felipepessoto changed the title [VL] Fix ClassCastException in Delta stats for non-offloadable aggregations [WIP] [VL] Fix ClassCastException in Delta stats for non-offloadable aggregations Jun 14, 2026
@felipepessoto

Copy link
Copy Markdown
Contributor Author

@zhztheplayer @philo-he similar to #12290, I found the issue when running the Delta CI #12278.

@felipepessoto felipepessoto changed the title [WIP] [VL] Fix ClassCastException in Delta stats for non-offloadable aggregations [VL] Fix ClassCastException in Delta stats for non-offloadable aggregations Jun 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants