Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 7 additions & 46 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -502,35 +502,18 @@ index a206e97c353..79813d8e259 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 93275487f29..510e3087e0f 100644
index 93275487f29..ca79ad8b6d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}

import scala.collection.mutable

+import org.apache.comet.CometConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}

@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
case "" => "_LEGACY_ERROR_TEMP_2062"
case _ => "_LEGACY_ERROR_TEMP_2055"
}
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
checkErrorMatchPVals(
exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
}

Seq("parquet", "orc").foreach { format =>
Expand All @@ -540,31 +523,31 @@ index 93275487f29..510e3087e0f 100644
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
+ case smJoin: CometSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
Expand Down Expand Up @@ -1409,28 +1392,6 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index a1147c16cc8..c7a29496328 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException}
import org.apache.spark.sql._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}

- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
withView("v1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
Expand Down
44 changes: 31 additions & 13 deletions native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
// Fall back to plain SparkError (no context)
throw_spark_error_as_json(env, spark_error)
} else {
// Not a SparkError, use generic exception
// Check for file-not-found errors from object store
let error_msg = e.to_string();
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
// Not a SparkError, use generic exception
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
exception.class,
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
),
_ => env.throw_new(exception.class, exception.msg),
}
}
}
}
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
_ => {
// Check for file-not-found errors that may arrive through other wrapping paths
let error_msg = error.to_string();
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
Expand All @@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
}
}
}
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
_ => {
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
exception.class,
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
),
_ => env.throw_new(exception.class, exception.msg),
}
}
}
.expect("Thrown exception")
}
Expand Down
15 changes: 15 additions & 0 deletions native/spark-expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub enum SparkError {
#[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")]
ScalarSubqueryTooManyRows,

#[error("{message}")]
FileNotFound { message: String },

#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),

Expand Down Expand Up @@ -236,6 +239,7 @@ impl SparkError {
SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex",
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
SparkError::FileNotFound { .. } => "FileNotFound",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
Expand Down Expand Up @@ -421,6 +425,11 @@ impl SparkError {
"dataType": data_type,
})
}
SparkError::FileNotFound { message } => {
serde_json::json!({
"message": message,
})
}
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
Expand Down Expand Up @@ -487,6 +496,9 @@ impl SparkError {
SparkError::DatatypeCannotOrder { .. }
| SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException",

// FileNotFound - will be converted to SparkFileNotFoundException by the shim
SparkError::FileNotFound { .. } => "org/apache/spark/SparkException",

// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
}
Expand Down Expand Up @@ -559,6 +571,9 @@ impl SparkError {
// Subquery errors
SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"),

// File not found
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),

// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
Expand Down
15 changes: 1 addition & 14 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.apache.comet

import java.io.FileNotFoundException
import java.lang.management.ManagementFactory

import scala.util.matching.Regex

import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -163,19 +160,9 @@ class CometExecIterator(
// threw the exception, so we log the exception with taskAttemptId here
logError(s"Native execution for task $taskAttemptId failed", e)

val fileNotFoundPattern: Regex =
("""^External: Object at location (.+?) not found: No such file or directory """ +
"""\(os error \d+\)$""").r
val parquetError: Regex =
val parquetError: scala.util.matching.Regex =
"""^Parquet error: (?:.*)$""".r
e.getMessage match {
case fileNotFoundPattern(filePath) =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
throw new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2055",
messageParameters = Map("message" -> e.getMessage),
cause = new FileNotFoundException(filePath)
) // Can't use SparkFileNotFoundException because it's private.
case parquetError() =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@

package org.apache.spark.sql.comet.shims

import java.io.FileNotFoundException

import scala.util.matching.Regex

import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

object ShimSparkErrorConverter {
val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
}

/**
* Spark 3.4 implementation for converting error types to proper Spark exceptions.
*
Expand Down Expand Up @@ -243,6 +251,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = ShimSparkErrorConverter.ObjectLocationPattern
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@

package org.apache.spark.sql.comet.shims

import java.io.FileNotFoundException

import scala.util.matching.Regex

import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

object ShimSparkErrorConverter {
val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
}

/**
* Spark 3.5 implementation for converting error types to proper Spark exceptions.
*
Expand Down Expand Up @@ -239,6 +247,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = ShimSparkErrorConverter.ObjectLocationPattern
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

package org.apache.spark.sql.comet.shims

import scala.util.matching.Regex

import org.apache.spark.QueryContext
import org.apache.spark.SparkException
import org.apache.spark.SparkFileNotFoundException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

object ShimSparkErrorConverter {
val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r
}

/**
* Spark 4.0-specific implementation for converting error types to proper Spark exceptions.
*/
Expand Down Expand Up @@ -251,6 +258,20 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
context.headOption.orNull))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = ShimSparkErrorConverter.ObjectLocationPattern
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
// readCurrentFileNotFoundError was removed in Spark 4.0; construct directly
Some(
new SparkFileNotFoundException(
errorClass = "_LEGACY_ERROR_TEMP_2055",
messageParameters = Map("message" -> s"File $path does not exist")))

case _ =>
// Unknown error type - return None to trigger fallback
None
Expand Down
Loading