Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ index cfc8b2cc845..b7c234e1437 100644
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest}
Expand All @@ -1151,7 +1151,7 @@ index cfc8b2cc845..b7c234e1437 100644
@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
}
}

- test("Fallback Parquet V2 to V1") {
+ test("Fallback Parquet V2 to V1",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
Expand Down Expand Up @@ -2038,7 +2038,7 @@ index 07e2849ce6f..3e73645b638 100644
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..49f2001dc6b 100644
index 8e88049f51e..6150a556f9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Expand Down Expand Up @@ -2127,17 +2127,24 @@ index 8e88049f51e..49f2001dc6b 100644
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false)
))
@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
val e = intercept[SparkException] {
sql(s"select a from $tableName where b > 0").collect()
}
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
+ assert(e.getCause.isInstanceOf[RuntimeException])
+ val msg = e.getCause.getMessage
+ // native_datafusion produces a different error message for duplicate fields
+ assert(
+ msg.contains(
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") ||
+ msg.contains("Unable to get field named"),
+ s"Unexpected error message: $msg")
}

- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempPath { dir =>
val count = 10
val tableName = "spark_25207"
@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2147,7 +2154,7 @@ index 8e88049f51e..49f2001dc6b 100644
// block 1:
// null count min max
// page-0 0 0 99
@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}

Expand All @@ -2157,7 +2164,7 @@ index 8e88049f51e..49f2001dc6b 100644
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand All @@ -2170,7 +2177,7 @@ index 8e88049f51e..49f2001dc6b 100644
} else {
assert(selectedFilters.isEmpty, "There is filter pushed down")
}
@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
assert(pushedParquetFilters.exists(_.getClass === filterClass),
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")

Expand Down Expand Up @@ -2935,7 +2942,7 @@ index b5cf13a9c12..ac17603fb7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
Expand All @@ -2946,7 +2953,7 @@ index b5cf13a9c12..ac17603fb7f 100644
@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

- test("SPARK-41198: input row calculation with CTE") {
+ test("SPARK-41198: input row calculation with CTE",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
Expand All @@ -2956,7 +2963,7 @@ index b5cf13a9c12..ac17603fb7f 100644
@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
Expand Down
4 changes: 4 additions & 0 deletions docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ cause Comet to fall back to Spark.
- No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
- No support for duplicate field names in case-insensitive mode. When the required or data schema contains
field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates
in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time,
so DataFusion may produce a different error message than Spark in that case.

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
without falling back to Spark:
Expand Down
16 changes: 16 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ case class CometScanRule(session: SparkSession)
withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching")
return None
}
// Case-insensitive mode with duplicate field names produces different errors
// in DataFusion vs Spark, so fall back to avoid incompatible error messages
if (!session.sessionState.conf.caseSensitiveAnalysis) {
val schemas = Seq(scanExec.requiredSchema, r.dataSchema)
for (schema <- schemas) {
val fieldNames =
schema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT))
if (fieldNames.length != fieldNames.distinct.length) {
withInfo(
scanExec,
"Native DataFusion scan does not support " +
"duplicate field names in case-insensitive mode")
return None
}
}
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
Expand Down
Loading