From b97571eafdbc59bde13a02917c55c5af16ea85a6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 09:07:43 -0600 Subject: [PATCH 01/11] fix: route file-not-found errors through SparkError JSON path Detect file-not-found errors from DataFusion's object store on the native side and convert them to SparkError::FileNotFound, which is serialized as JSON via CometQueryExecutionException. The shim layer then creates a proper SparkFileNotFoundException using QueryExecutionErrors.readCurrentFileNotFoundError(), producing the exact exception type that Spark tests expect. Previously, file-not-found errors arrived as CometNativeException and were pattern-matched in CometExecIterator to create a SparkException with a plain FileNotFoundException cause. Tests that cast the cause to SparkFileNotFoundException (which is private[spark]) would fail. Closes #3314 --- native/core/src/errors.rs | 49 ++++++++++++++----- native/spark-expr/src/error.rs | 15 ++++++ .../org/apache/comet/CometExecIterator.scala | 15 +----- .../comet/shims/ShimSparkErrorConverter.scala | 7 +++ .../comet/shims/ShimSparkErrorConverter.scala | 7 +++ .../comet/shims/ShimSparkErrorConverter.scala | 7 +++ 6 files changed, 73 insertions(+), 27 deletions(-) diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 7c8957dba7..4925f77129 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -413,7 +413,42 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option env.throw_new( + exception.class, + to_stacktrace_string(exception.msg, backtrace_string).unwrap(), + ), + _ => env.throw_new(exception.class, exception.msg), + } + } + } + } + // Handle direct SparkError - serialize to JSON + CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error), + _ => { + // Check for file-not-found errors that may arrive through other wrapping paths + let error_msg = error.to_string(); + if error_msg.contains("not found") + && error_msg.contains("No such file or directory") + { + let spark_error = SparkError::FileNotFound { + message: error_msg, + }; + throw_spark_error_as_json(env, &spark_error) + } else { let exception = error.to_exception(); match backtrace { Some(backtrace_string) => env.throw_new( @@ -424,18 +459,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option throw_spark_error_as_json(env, spark_error), - _ => { - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - exception.class, - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - _ => env.throw_new(exception.class, exception.msg), - } - } } .expect("Thrown exception") } diff --git a/native/spark-expr/src/error.rs b/native/spark-expr/src/error.rs index ae3b5c0eda..592ed8b443 100644 --- a/native/spark-expr/src/error.rs +++ b/native/spark-expr/src/error.rs @@ -166,6 +166,9 @@ pub enum SparkError { #[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")] ScalarSubqueryTooManyRows, + #[error("{message}")] + FileNotFound { message: String }, + #[error("ArrowError: {0}.")] Arrow(Arc), @@ -236,6 +239,7 @@ impl SparkError { SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex", SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder", SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", + SparkError::FileNotFound { .. } => "FileNotFound", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", } @@ -421,6 +425,11 @@ impl SparkError { "dataType": data_type, }) } + SparkError::FileNotFound { message } => { + serde_json::json!({ + "message": message, + }) + } SparkError::Arrow(e) => { serde_json::json!({ "message": e.to_string(), @@ -487,6 +496,9 @@ impl SparkError { SparkError::DatatypeCannotOrder { .. } | SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException", + // FileNotFound - will be converted to SparkFileNotFoundException by the shim + SparkError::FileNotFound { .. } => "org/apache/spark/SparkException", + // Generic errors SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException", } @@ -559,6 +571,9 @@ impl SparkError { // Subquery errors SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"), + // File not found + SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"), + // Generic errors (no error class) SparkError::Arrow(_) | SparkError::Internal(_) => None, } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 28c1645718..44ebf7e36e 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -19,11 +19,8 @@ package org.apache.comet -import java.io.FileNotFoundException import java.lang.management.ManagementFactory -import scala.util.matching.Regex - import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.broadcast.Broadcast @@ -163,19 +160,9 @@ class CometExecIterator( // threw the exception, so we log the exception with taskAttemptId here logError(s"Native execution for task $taskAttemptId failed", e) - val fileNotFoundPattern: Regex = - ("""^External: Object at location (.+?) not found: No such file or directory """ + - """\(os error \d+\)$""").r - val parquetError: Regex = + val parquetError: scala.util.matching.Regex = """^Parquet error: (?:.*)$""".r e.getMessage match { - case fileNotFoundPattern(filePath) => - // See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError - throw new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2055", - messageParameters = Map("message" -> e.getMessage), - cause = new FileNotFoundException(filePath) - ) // Can't use SparkFileNotFoundException because it's private. case parquetError() => // See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError // See org.apache.parquet.hadoop.ParquetFileReader for error message. diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 8e6ed1a927..a6f1500d3c 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.comet.shims +import java.io.FileNotFoundException + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors @@ -243,6 +245,11 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "FileNotFound" => + Some( + QueryExecutionErrors.readCurrentFileNotFoundError( + new FileNotFoundException(params("message").toString))) + case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9bd8c7dba1..8874181826 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.comet.shims +import java.io.FileNotFoundException + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors @@ -239,6 +241,11 @@ trait ShimSparkErrorConverter { QueryExecutionErrors .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) + case "FileNotFound" => + Some( + QueryExecutionErrors.readCurrentFileNotFoundError( + new FileNotFoundException(params("message").toString))) + case _ => None } diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index e49c789a77..40eeffa840 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.comet.shims +import java.io.FileNotFoundException + import org.apache.spark.QueryContext import org.apache.spark.SparkException import org.apache.spark.sql.errors.QueryExecutionErrors @@ -251,6 +253,11 @@ trait ShimSparkErrorConverter { QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError( context.headOption.orNull)) + case "FileNotFound" => + Some( + QueryExecutionErrors.readCurrentFileNotFoundError( + new FileNotFoundException(params("message").toString))) + case _ => // Unknown error type - return None to trigger fallback None From 90ecaa24a80377af4448d56952d194eb70a44706 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 09:12:52 -0600 Subject: [PATCH 02/11] fix: remove test skips for file-not-found in 3.5.8 diff Remove the assume() skip for native_datafusion in FileBasedDataSourceSuite and the IgnoreCometNativeDataFusion tag in SimpleSQLViewSuite, since file-not-found errors now produce the correct SparkFileNotFoundException type. --- dev/diffs/3.5.8.diff | 55 +++++++++++--------------------------------- 1 file changed, 13 insertions(+), 42 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..d2a3d66ced 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -502,7 +502,7 @@ index a206e97c353..79813d8e259 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..510e3087e0f 100644 +index 93275487f29..55902457a16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption} @@ -521,16 +521,7 @@ index 93275487f29..510e3087e0f 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest - case "" => "_LEGACY_ERROR_TEMP_2062" - case _ => "_LEGACY_ERROR_TEMP_2055" - } -+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException -+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) - checkErrorMatchPVals( - exception = intercept[SparkException] { - testIgnoreMissingFiles(options) -@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -639,7 +641,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -540,7 +531,7 @@ index 93275487f29..510e3087e0f 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -955,6 +958,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -548,7 +539,7 @@ index 93275487f29..510e3087e0f 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1019,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -556,7 +547,7 @@ index 93275487f29..510e3087e0f 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1061,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -564,7 +555,7 @@ index 93275487f29..510e3087e0f 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1246,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -965,7 +956,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..1d2e215d6a3 100644 +index fa1a64460fc..134f0db1fb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1423,7 +1414,7 @@ index 47679ed7865..9ffbaecb98e 100644 assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index a1147c16cc8..c7a29496328 100644 +index a1147c16cc8..90085de90d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution @@ -1434,16 +1425,6 @@ index a1147c16cc8..c7a29496328 100644 import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} import org.apache.spark.sql.catalyst.parser.ParseException -@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { - } - } - -- test("alter temporary view should follow current storeAnalyzedPlanForView config") { -+ test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { - withTable("t") { - Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") - withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index eec396b2e39..bf3f1c769d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala @@ -2038,7 +2019,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..49f2001dc6b 100644 +index 8e88049f51e..e21a5797996 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2127,17 +2108,7 @@ index 8e88049f51e..49f2001dc6b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } - -- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { -+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withTempPath { dir => - val count = 10 - val tableName = "spark_25207" -@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2000,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2147,7 +2118,7 @@ index 8e88049f51e..49f2001dc6b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2061,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2157,7 +2128,7 @@ index 8e88049f51e..49f2001dc6b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2294,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2170,7 +2141,7 @@ index 8e88049f51e..49f2001dc6b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2358,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") From 64b63c585fa55d115c5c5f8fb27423912965739f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 09:20:14 -0600 Subject: [PATCH 03/11] fix: remove unused imports from 3.5.8 diff Remove CometConf import from FileBasedDataSourceSuite and IgnoreCometNativeDataFusion import from SQLViewSuite that became unused after removing the test skips. --- dev/diffs/3.5.8.diff | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d2a3d66ced..fc5607e3eb 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -502,18 +502,10 @@ index a206e97c353..79813d8e259 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 93275487f29..55902457a16 100644 +index 93275487f29..ca79ad8b6d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption} - - import scala.collection.mutable - -+import org.apache.comet.CometConf - import org.apache.hadoop.conf.Configuration - import org.apache.hadoop.fs.{LocalFileSystem, Path} - -@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -521,7 +513,7 @@ index 93275487f29..55902457a16 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -639,7 +641,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -531,7 +523,7 @@ index 93275487f29..55902457a16 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -955,6 +958,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -539,7 +531,7 @@ index 93275487f29..55902457a16 100644 } assert(smJoinExec.nonEmpty) } -@@ -1015,6 +1019,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -547,7 +539,7 @@ index 93275487f29..55902457a16 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1056,6 +1061,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -555,7 +547,7 @@ index 93275487f29..55902457a16 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1240,6 +1246,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -1413,18 +1405,6 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -index a1147c16cc8..90085de90d4 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala -@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution - - import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.TableIdentifier - import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} - import org.apache.spark.sql.catalyst.parser.ParseException diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala index eec396b2e39..bf3f1c769d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala From b0d2b708ac46a8c3ef26077d218daf7fb882580a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 09:35:36 -0600 Subject: [PATCH 04/11] fix: format file-not-found message to match Hadoop convention Extract file path from native error message and format it as "File does not exist" to match the Hadoop FileNotFoundException message format that Spark tests expect. --- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 9 ++++++++- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 9 ++++++++- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 9 ++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index a6f1500d3c..a35d32520f 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -246,9 +246,16 @@ trait ShimSparkErrorConverter { .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = "Object at location (.+?) not found".r + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) Some( QueryExecutionErrors.readCurrentFileNotFoundError( - new FileNotFoundException(params("message").toString))) + new FileNotFoundException(s"File $path does not exist"))) case _ => None diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 8874181826..a9038af14a 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -242,9 +242,16 @@ trait ShimSparkErrorConverter { .intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context))) case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = "Object at location (.+?) not found".r + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) Some( QueryExecutionErrors.readCurrentFileNotFoundError( - new FileNotFoundException(params("message").toString))) + new FileNotFoundException(s"File $path does not exist"))) case _ => None diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 40eeffa840..eb98f188f8 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -254,9 +254,16 @@ trait ShimSparkErrorConverter { context.headOption.orNull)) case "FileNotFound" => + val msg = params("message").toString + // Extract file path from native error message and format like Hadoop's + // FileNotFoundException: "File does not exist" + val path = "Object at location (.+?) not found".r + .findFirstMatchIn(msg) + .map(_.group(1)) + .getOrElse(msg) Some( QueryExecutionErrors.readCurrentFileNotFoundError( - new FileNotFoundException(params("message").toString))) + new FileNotFoundException(s"File $path does not exist"))) case _ => // Unknown error type - return None to trigger fallback From 030105fcd37a1dd799c8c61c040deaf685762eeb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 09:37:16 -0600 Subject: [PATCH 05/11] style: apply cargo fmt --- native/core/src/errors.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 4925f77129..d4582da63f 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -418,10 +418,7 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option Date: Sat, 14 Mar 2026 09:57:27 -0600 Subject: [PATCH 06/11] fix: use SparkFileNotFoundException directly for Spark 4.0 readCurrentFileNotFoundError was removed in Spark 4.0. Construct SparkFileNotFoundException directly instead, which is accessible from the shim package. --- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index eb98f188f8..06a5cff3f2 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -23,6 +23,7 @@ import java.io.FileNotFoundException import org.apache.spark.QueryContext import org.apache.spark.SparkException +import org.apache.spark.SparkFileNotFoundException import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -261,9 +262,11 @@ trait ShimSparkErrorConverter { .findFirstMatchIn(msg) .map(_.group(1)) .getOrElse(msg) + // readCurrentFileNotFoundError was removed in Spark 4.0; construct directly Some( - QueryExecutionErrors.readCurrentFileNotFoundError( - new FileNotFoundException(s"File $path does not exist"))) + new SparkFileNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2055", + messageParameters = Map("message" -> s"File $path does not exist"))) case _ => // Unknown error type - return None to trigger fallback From e630c718170681b2c7de3122595636ad2353b017 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 10:41:20 -0600 Subject: [PATCH 07/11] fix: remove unused FileNotFoundException import in Spark 4.0 shim --- .../apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 06a5cff3f2..1432681bda 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.comet.shims -import java.io.FileNotFoundException - import org.apache.spark.QueryContext import org.apache.spark.SparkException import org.apache.spark.SparkFileNotFoundException From 03328cded370bdafac8db3904d95fb301811b965 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 11:38:52 -0600 Subject: [PATCH 08/11] fix: accept native DataFusion error message for duplicate fields in 3.5.8 diff The SPARK-25207 test expects a specific error message for duplicate fields in case-insensitive mode, but native DataFusion produces a different schema error. Update the test to accept either message format. --- dev/diffs/3.5.8.diff | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index fc5607e3eb..46b57e52a9 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1999,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..e21a5797996 100644 +index 8e88049f51e..6150a556f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2088,7 +2088,24 @@ index 8e88049f51e..e21a5797996 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1984,7 +2000,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + val e = intercept[SparkException] { + sql(s"select a from $tableName where b > 0").collect() + } +- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( +- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) ++ assert(e.getCause.isInstanceOf[RuntimeException]) ++ val msg = e.getCause.getMessage ++ // native_datafusion produces a different error message for duplicate fields ++ assert( ++ msg.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || ++ msg.contains("Unable to get field named"), ++ s"Unexpected error message: $msg") + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2098,7 +2115,7 @@ index 8e88049f51e..e21a5797996 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2061,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2108,7 +2125,7 @@ index 8e88049f51e..e21a5797996 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2294,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2121,7 +2138,7 @@ index 8e88049f51e..e21a5797996 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2358,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") From 37d10e3efd47d074a6889cfd8e43b31908fb2214 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 11:40:14 -0600 Subject: [PATCH 09/11] Revert "fix: accept native DataFusion error message for duplicate fields in 3.5.8 diff" This reverts commit 03328cded370bdafac8db3904d95fb301811b965. --- dev/diffs/3.5.8.diff | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 46b57e52a9..fc5607e3eb 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1999,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..6150a556f9b 100644 +index 8e88049f51e..e21a5797996 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2088,24 +2088,7 @@ index 8e88049f51e..6150a556f9b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - val e = intercept[SparkException] { - sql(s"select a from $tableName where b > 0").collect() - } -- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( -- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) -+ assert(e.getCause.isInstanceOf[RuntimeException]) -+ val msg = e.getCause.getMessage -+ // native_datafusion produces a different error message for duplicate fields -+ assert( -+ msg.contains( -+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || -+ msg.contains("Unable to get field named"), -+ s"Unexpected error message: $msg") - } - - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { -@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1984,7 +2000,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2115,7 +2098,7 @@ index 8e88049f51e..6150a556f9b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2061,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2125,7 +2108,7 @@ index 8e88049f51e..6150a556f9b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2294,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2138,7 +2121,7 @@ index 8e88049f51e..6150a556f9b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2358,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") From 27469e0ca6b72c78a043ae20bde4692115bb59b7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 14:37:24 -0600 Subject: [PATCH 10/11] fix: skip duplicate fields test for native DataFusion in 3.5.8 diff Add IgnoreCometNativeDataFusion tag to SPARK-25207 test instead of trying to accept both error messages. A separate PR will fix the underlying issue. --- dev/diffs/3.5.8.diff | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index fc5607e3eb..1f16765e12 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1999,7 +1999,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..e21a5797996 100644 +index 8e88049f51e..49f2001dc6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2088,7 +2088,17 @@ index 8e88049f51e..e21a5797996 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1984,7 +2000,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2098,7 +2108,7 @@ index 8e88049f51e..e21a5797996 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2061,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2108,7 +2118,7 @@ index 8e88049f51e..e21a5797996 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2294,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2121,7 +2131,7 @@ index 8e88049f51e..e21a5797996 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2358,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") From f4c43f385c1c0a7502d6b9e0425309d0296236c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 15 Mar 2026 08:53:32 -0600 Subject: [PATCH 11/11] address feedback --- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 8 +++++++- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 8 +++++++- .../spark/sql/comet/shims/ShimSparkErrorConverter.scala | 8 +++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index a35d32520f..da65b1eb49 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -21,12 +21,18 @@ package org.apache.spark.sql.comet.shims import java.io.FileNotFoundException +import scala.util.matching.Regex + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 3.4 implementation for converting error types to proper Spark exceptions. * @@ -249,7 +255,7 @@ trait ShimSparkErrorConverter { val msg = params("message").toString // Extract file path from native error message and format like Hadoop's // FileNotFoundException: "File does not exist" - val path = "Object at location (.+?) not found".r + val path = ShimSparkErrorConverter.ObjectLocationPattern .findFirstMatchIn(msg) .map(_.group(1)) .getOrElse(msg) diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index a9038af14a..ae21d12765 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -21,12 +21,18 @@ package org.apache.spark.sql.comet.shims import java.io.FileNotFoundException +import scala.util.matching.Regex + import org.apache.spark.{QueryContext, SparkException} import org.apache.spark.sql.catalyst.trees.SQLQueryContext import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 3.5 implementation for converting error types to proper Spark exceptions. * @@ -245,7 +251,7 @@ trait ShimSparkErrorConverter { val msg = params("message").toString // Extract file path from native error message and format like Hadoop's // FileNotFoundException: "File does not exist" - val path = "Object at location (.+?) not found".r + val path = ShimSparkErrorConverter.ObjectLocationPattern .findFirstMatchIn(msg) .map(_.group(1)) .getOrElse(msg) diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 1432681bda..01d4eac4b6 100644 --- a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.comet.shims +import scala.util.matching.Regex + import org.apache.spark.QueryContext import org.apache.spark.SparkException import org.apache.spark.SparkFileNotFoundException @@ -26,6 +28,10 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +object ShimSparkErrorConverter { + val ObjectLocationPattern: Regex = "Object at location (.+?) not found".r +} + /** * Spark 4.0-specific implementation for converting error types to proper Spark exceptions. */ @@ -256,7 +262,7 @@ trait ShimSparkErrorConverter { val msg = params("message").toString // Extract file path from native error message and format like Hadoop's // FileNotFoundException: "File does not exist" - val path = "Object at location (.+?) not found".r + val path = ShimSparkErrorConverter.ObjectLocationPattern .findFirstMatchIn(msg) .map(_.group(1)) .getOrElse(msg)