Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ index f33432ddb6f..42eb9fd1cb7 100644
test("static scan metrics",
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3313")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
Expand All @@ -476,7 +476,7 @@ index a206e97c353..79813d8e259 100644

- test("explain formatted - check presence of subquery in case of DPP") {
+ test("explain formatted - check presence of subquery in case of DPP",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3313")) {
withTable("df1", "df2") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
Expand Down Expand Up @@ -536,7 +536,7 @@ index 93275487f29..510e3087e0f 100644
Seq("parquet", "orc").foreach { format =>
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
Expand Down Expand Up @@ -609,7 +609,7 @@ index 00000000000..1ee842b6f62
+ */
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
+
+/**
Expand Down Expand Up @@ -1142,7 +1142,7 @@ index cfc8b2cc845..b7c234e1437 100644

import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
Expand All @@ -1154,7 +1154,7 @@ index cfc8b2cc845..b7c234e1437 100644

- test("Fallback Parquet V2 to V1") {
+ test("Fallback Parquet V2 to V1",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) {
Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
Expand Down Expand Up @@ -1430,7 +1430,7 @@ index a1147c16cc8..c7a29496328 100644

import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException}
import org.apache.spark.sql._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
+import org.apache.spark.sql.IgnoreComet
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
import org.apache.spark.sql.catalyst.parser.ParseException
Expand All @@ -1440,7 +1440,7 @@ index a1147c16cc8..c7a29496328 100644

- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3314")) {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
withView("v1") {
Expand Down Expand Up @@ -2070,7 +2070,7 @@ index 8e88049f51e..49f2001dc6b 100644

- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") {
+ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3320")) {
import testImplicits._

withAllParquetReaders {
Expand All @@ -2091,7 +2091,7 @@ index 8e88049f51e..49f2001dc6b 100644

- test("Filters should be pushed down for Parquet readers at row group level") {
+ test("Filters should be pushed down for Parquet readers at row group level",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3320")) {
import testImplicits._

withSQLConf(
Expand Down Expand Up @@ -2133,7 +2133,7 @@ index 8e88049f51e..49f2001dc6b 100644

- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempPath { dir =>
val count = 10
val tableName = "spark_25207"
Expand Down Expand Up @@ -2193,7 +2193,7 @@ index 8ed9ef1630e..f312174b182 100644

- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))

Expand All @@ -2203,7 +2203,7 @@ index 8ed9ef1630e..f312174b182 100644

- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))

Expand All @@ -2227,7 +2227,7 @@ index f6472ba3d9d..ce39ebb52e6 100644

- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
val data = (1 to 1000).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
Expand All @@ -2247,7 +2247,7 @@ index f6472ba3d9d..ce39ebb52e6 100644

- test("SPARK-34212 Parquet should read decimals correctly") {
+ test("SPARK-34212 Parquet should read decimals correctly",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
def readParquet(schema: String, path: File): DataFrame = {
spark.read.schema(schema).parquet(path.toString)
}
Expand Down Expand Up @@ -2277,7 +2277,7 @@ index f6472ba3d9d..ce39ebb52e6 100644

- test("row group skipping doesn't overflow when reading into larger type") {
+ test("row group skipping doesn't overflow when reading into larger type",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
Expand Down Expand Up @@ -2393,7 +2393,7 @@ index 3f47c5e506f..92a5eafec84 100644
import org.apache.parquet.schema.Type._

import org.apache.spark.SparkException
+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
+import org.apache.spark.sql.{IgnoreComet, IgnoreComet}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
Expand All @@ -2413,7 +2413,7 @@ index 3f47c5e506f..92a5eafec84 100644

- test("schema mismatch failure error message for parquet vectorized reader") {
+ test("schema mismatch failure error message for parquet vectorized reader",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SparkException])
Expand All @@ -2423,7 +2423,7 @@ index 3f47c5e506f..92a5eafec84 100644

- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>") {
+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) {
import testImplicits._

withTempPath { dir =>
Expand Down Expand Up @@ -2939,7 +2939,7 @@ index b5cf13a9c12..ac17603fb7f 100644
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreComet, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation}
Expand All @@ -2949,7 +2949,7 @@ index b5cf13a9c12..ac17603fb7f 100644

- test("SPARK-41198: input row calculation with CTE") {
+ test("SPARK-41198: input row calculation with CTE",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) {
withTable("parquet_tbl", "parquet_streaming_tbl") {
spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
.write.format("parquet").saveAsTable("parquet_tbl")
Expand All @@ -2959,7 +2959,7 @@ index b5cf13a9c12..ac17603fb7f 100644

- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) {
withTable("parquet_streaming_tbl") {
val streamInput = MemoryStream[Int]
val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream")
Expand All @@ -2972,7 +2972,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644
import org.scalatest.concurrent.PatienceConfiguration.Timeout

-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode}
+import org.apache.spark.sql.{IgnoreComet, SaveMode}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
Expand All @@ -2982,7 +2982,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644

- test("self-union, DSv1, read via DataStreamReader API") {
+ test("self-union, DSv1, read via DataStreamReader API",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3401")) {
withTempPath { dir =>
val dataLocation = dir.getAbsolutePath
spark.range(1, 4).write.format("parquet").save(dataLocation)
Expand All @@ -2992,7 +2992,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644

- test("self-union, DSv1, read via table API") {
+ test("self-union, DSv1, read via table API",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3401")) {
withTable("parquet_streaming_tbl") {
spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")

Expand Down
45 changes: 45 additions & 0 deletions docs/source/contributor-guide/parquet_scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,51 @@ cause Comet to fall back to Spark.
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`

### Known Behavioral Differences with `native_datafusion`

The `native_datafusion` scan has several known behavioral differences compared to Spark. These are tracked in
GitHub issues and cause some Spark SQL tests to be ignored when running with `native_datafusion` enabled.

**Schema incompatibility handling ([#3311]).**
Spark throws a `SparkException` when reading Parquet files with certain schema mismatches (e.g., incompatible type
changes between writer and reader schemas). The `native_datafusion` implementation handles some of these cases
gracefully through DataFusion's type widening support, or throws errors with different exception types and messages.
This also affects INT96 timestamp detection for `TimestampLTZ`/`TimestampNTZ` types.

**Dynamic Partition Pruning ([#3313]).**
Dynamic Partition Pruning (DPP) does not work correctly with the `native_datafusion` scan. The `CometNativeScan`
plan node produced by `native_datafusion` may bypass or be incompatible with Spark's DPP filter injection. This
can result in test failures and potentially suboptimal query performance when DPP would otherwise apply.

**Missing files error handling ([#3314]).**
When files are deleted or become unavailable between planning and execution, the `native_datafusion` scan produces
different error behavior than Spark. The native reader throws an "Object at location ... not found" error instead of
the Spark-expected behavior (which may silently skip missing files depending on configuration). The fallback check
for `ignoreMissingFiles` may not trigger correctly in all cases.

**Plan structure differences ([#3315]).**
The `native_datafusion` scan produces different plan nodes and partitioning information than Spark expects.
`CometNativeScan` is used instead of `CometScanExec` or `FileSourceScanExec`, and `UnknownPartitioning` is reported
instead of preserving the original partitioning information. This can cause failures in tests that assert on plan
structure, partitioning, or broadcast join selection.

**Plan serialization crashes ([#3320]).**
Some query patterns can cause `CometNativeExec` to be executed without a serialized plan, resulting in a
`CometRuntimeException`. This occurs when `CometNativeScan` is included in a plan but plan serialization to the
native side did not happen, typically because the query is executed in a non-standard way that bypasses the normal
`CometSparkSessionExtensions` hooks (e.g., certain filter push-down test patterns).

**Streaming self-union ([#3401]).**
Streaming queries that perform a self-union using the DataSource V1 API return no results when using the
`native_datafusion` scan. This affects structured streaming workloads that union a stream with itself.

[#3311]: https://github.com/apache/datafusion-comet/issues/3311
[#3313]: https://github.com/apache/datafusion-comet/issues/3313
[#3314]: https://github.com/apache/datafusion-comet/issues/3314
[#3315]: https://github.com/apache/datafusion-comet/issues/3315
[#3320]: https://github.com/apache/datafusion-comet/issues/3320
[#3401]: https://github.com/apache/datafusion-comet/issues/3401

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
without falling back to Spark:

Expand Down
16 changes: 9 additions & 7 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
package org.apache.comet.rules

import java.net.URI

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MetadataColumnHelper, sideBySide}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec}
Expand All @@ -40,8 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
import org.apache.comet.{CometConf, CometExplainInfo, CometNativeException, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos}
import org.apache.comet.DataTypeSupport.isComplexType
Expand Down Expand Up @@ -168,8 +165,13 @@ case class CometScanRule(session: SparkSession)

COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
// TODO add support for native_datafusion in the future
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf)
.orElse {
// Clear explain info tags from the failed nativeDataFusionScan
// attempt so they don't leak into the fallback path
scanExec.unsetTagValue(CometExplainInfo.EXTENSION_INFO)
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
}
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)
Expand Down
Loading