diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 08b69a6b82..568b33e758 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -502,18 +502,10 @@ index a206e97c353..79813d8e259 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..510e3087e0f 100644 +index 93275487f29..ca79ad8b6d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption} - - import scala.collection.mutable - -+import org.apache.comet.CometConf - import org.apache.hadoop.conf.Configuration - import org.apache.hadoop.fs.{LocalFileSystem, Path} - -@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -521,16 +513,7 @@ index 93275487f29..510e3087e0f 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest - case "" => "_LEGACY_ERROR_TEMP_2062" - case _ => "_LEGACY_ERROR_TEMP_2055" - } -+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException -+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) - checkErrorMatchPVals( - exception = intercept[SparkException] { - testIgnoreMissingFiles(options) -@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -540,7 +523,7 @@ index 93275487f29..510e3087e0f 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -548,7 +531,7 @@ index 93275487f29..510e3087e0f 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -556,7 +539,7 @@ index 93275487f29..510e3087e0f 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -564,7 +547,7 @@ index 93275487f29..510e3087e0f 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1409,28 +1392,6 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index a1147c16cc8..c7a29496328 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution - - import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.TableIdentifier - import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} - import org.apache.spark.sql.catalyst.parser.ParseException -@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { - } - } - -- test("alter temporary view should follow current storeAnalyzedPlanForView config") { -+ test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - withTable("t") { - Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") - withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index eec396b2e39..bf3f1c769d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 7c8957dba7..d4582da63f 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option env.throw_new( + exception.class, + to_stacktrace_string(exception.msg, backtrace_string).unwrap(), + ), + _ => env.throw_new(exception.class, exception.msg), + } + } + } + } + // Handle direct SparkError - serialize to JSON + CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), + _ => { + // Check for file-not-found errors that may arrive through other wrapping paths + let error_msg = error.to_string(); + if error_msg.contains("not found") + && error_msg.contains("No such file or directory") + { + let spark_error = SparkError::FileNotFound { message: error_msg }; + throw_spark_error_as_json(env, &spark_error) + } else { let exception = error.to_exception(); match backtrace { Some(backtrace_string) => env.throw_new( @@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option throw_spark_error_as_json(env, spark_error), - _ => { - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - exception.class, - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - _ => env.throw_new(exception.class, exception.msg), - } - } } .expect("Thrown exception") } diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index ae3b5c0eda..592ed8b443 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -166,6 +166,9 @@ pub enum SparkError { #[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")] ScalarSubqueryTooManyRows, + #[error("{message}")] + FileNotFound { message: String }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -236,6 +239,7 @@ impl SparkError { SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex", SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", + SparkError::FileNotFound { .. } => "FileNotFound", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -421,6 +425,11 @@ impl SparkError { "dataType": data_type, }) } + SparkError::FileNotFound { message } => { + serde_json::json!({ + "message": message, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -487,6 +496,9 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } | SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException", + // FileNotFound - will be converted to SparkFileNotFoundException by the shim + SparkError::FileNotFound { .. } => "org/apache/spark/SparkException", + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -559,6 +571,9 @@ impl SparkError { // Subquery errors SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"), + // File not found + SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 28c1645718..44ebf7e36e 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -19,11 +19,8 @@ package org.apache.comet -import java.io.FileNotFoundException import java.lang.management.ManagementFactory -import scala.util.matching.Regex - import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.broadcast.Broadcast @@ -163,19 +160,9 @@ class CometExecIterator( // threw the exception, so we log the exception with taskAttemptId here logError(s"Native execution for task $taskAttemptId failed", e) - val fileNotFoundPattern: Regex = - ("""^External: Object at location (.+?) not found: No such file or directory """ + - """\(os error \d+\)$""").r - val parquetError: Regex = + val parquetError: scala.util.matching.Regex = """^Parquet error: (?:.*)$""".r e.getMessage match { - case fileNotFoundPattern(filePath) => - // See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError - throw new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2055", - messageParameters = Map("message" -> e.getMessage), - cause = new FileNotFoundException(filePath) - ) // Can't use SparkFileNotFoundException because it's private. case parquetError() => // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError // See org.apache.parquet.hadoop.ParquetFileReader for error message. diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 8e6ed1a927..da65b1eb49 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,12 +19,20 @@ package org.apache.spark.sql.comet.shims +import java.io.FileNotFoundException + +import scala.util.matching.Regex + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 3.4 implementation for converting error types to proper Spark exceptions. * @@ -243,6 +251,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = ShimSparkErrorConverter.ObjectLocationPattern + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) + Some( + QueryExecutionErrors.readCurrentFileNotFoundError( + new FileNotFoundException(s"File $path does not exist"))) + case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9bd8c7dba1..ae21d12765 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,12 +19,20 @@ package org.apache.spark.sql.comet.shims +import java.io.FileNotFoundException + +import scala.util.matching.Regex + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 3.5 implementation for converting error types to proper Spark exceptions. * @@ -239,6 +247,18 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = ShimSparkErrorConverter.ObjectLocationPattern + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) + Some( + QueryExecutionErrors.readCurrentFileNotFoundError( + new FileNotFoundException(s"File $path does not exist"))) + case _ => None } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index e49c789a77..01d4eac4b6 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,12 +19,19 @@ package org.apache.spark.sql.comet.shims +import scala.util.matching.Regex + import org.apache.spark.QueryContext import org.apache.spark.SparkException +import org.apache.spark.SparkFileNotFoundException import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 4.0-specific implementation for converting error types to proper Spark exceptions. */ @@ -251,6 +258,20 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = ShimSparkErrorConverter.ObjectLocationPattern + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) + // readCurrentFileNotFoundError was removed in Spark 4.0; construct directly + Some( + new SparkFileNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2055", + messageParameters = Map("message" -> s"File $path does not exist"))) + case _ => // Unknown error type - return None to trigger fallback None