diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..891ac6cd63 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -453,7 +453,7 @@ index f33432ddb6f..42eb9fd1cb7 100644 test("static scan metrics", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3313")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -476,7 +476,7 @@ index a206e97c353..79813d8e259 100644 - test("explain formatted - check presence of subquery in case of DPP") { + test("explain formatted - check presence of subquery in case of DPP", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3313")) { withTable("df1", "df2") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", @@ -536,7 +536,7 @@ index 93275487f29..510e3087e0f 100644 Seq("parquet", "orc").foreach { format => - test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { + test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" @@ -609,7 +609,7 @@ index 00000000000..1ee842b6f62 + */ +case class IgnoreComet(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") ++case class IgnoreComet(reason: String) extends Tag("DisableComet") +case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") + +/** @@ -1142,7 +1142,7 @@ index cfc8b2cc845..b7c234e1437 100644 import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, QueryTest} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} @@ -1154,7 +1154,7 @@ index cfc8b2cc845..b7c234e1437 100644 - test("Fallback Parquet V2 to V1") { + test("Fallback Parquet V2 to V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) { Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { val commands = ArrayBuffer.empty[(String, LogicalPlan)] @@ -1430,7 +1430,7 @@ index a1147c16cc8..c7a29496328 100644 import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} import org.apache.spark.sql.catalyst.parser.ParseException @@ -1440,7 +1440,7 @@ index a1147c16cc8..c7a29496328 100644 - test("alter temporary view should follow current storeAnalyzedPlanForView config") { + test("alter temporary view should follow current storeAnalyzedPlanForView config", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3314")) { withTable("t") { Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") withView("v1") { @@ -2070,7 +2070,7 @@ index 8e88049f51e..49f2001dc6b 100644 - test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") { + test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3320")) { import testImplicits._ withAllParquetReaders { @@ -2091,7 +2091,7 @@ index 8e88049f51e..49f2001dc6b 100644 - test("Filters should be pushed down for Parquet readers at row group level") { + test("Filters should be pushed down for Parquet readers at row group level", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3320")) { import testImplicits._ withSQLConf( @@ -2133,7 +2133,7 @@ index 8e88049f51e..49f2001dc6b 100644 - test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { + test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { withTempPath { dir => val count = 10 val tableName = "spark_25207" @@ -2193,7 +2193,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2203,7 +2203,7 @@ index 8ed9ef1630e..f312174b182 100644 - test("SPARK-35640: int as long should throw schema incompatible error") { + test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) @@ -2227,7 +2227,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2247,7 +2247,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2277,7 +2277,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2393,7 +2393,7 @@ index 3f47c5e506f..92a5eafec84 100644 import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.{IgnoreComet, IgnoreComet} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2413,7 +2413,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2423,7 +2423,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3311")) { import testImplicits._ withTempPath { dir => @@ -2939,7 +2939,7 @@ index b5cf13a9c12..ac17603fb7f 100644 import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} -+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode} ++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreComet, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} @@ -2949,7 +2949,7 @@ index b5cf13a9c12..ac17603fb7f 100644 - test("SPARK-41198: input row calculation with CTE") { + test("SPARK-41198: input row calculation with CTE", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) { withTable("parquet_tbl", "parquet_streaming_tbl") { spark.range(0, 10).selectExpr("id AS col1", "id AS col2") .write.format("parquet").saveAsTable("parquet_tbl") @@ -2959,7 +2959,7 @@ index b5cf13a9c12..ac17603fb7f 100644 - test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { + test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3315")) { withTable("parquet_streaming_tbl") { val streamInput = MemoryStream[Int] val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") @@ -2972,7 +2972,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644 import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.apache.spark.sql.SaveMode -+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode} ++import org.apache.spark.sql.{IgnoreComet, SaveMode} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog} @@ -2982,7 +2982,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644 - test("self-union, DSv1, read via DataStreamReader API") { + test("self-union, DSv1, read via DataStreamReader API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3401")) { withTempPath { dir => val dataLocation = dir.getAbsolutePath spark.range(1, 4).write.format("parquet").save(dataLocation) @@ -2992,7 +2992,7 @@ index 8f099c31e6b..ce4b7ad25b3 100644 - test("self-union, DSv1, read via table API") { + test("self-union, DSv1, read via table API", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) { ++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3401")) { withTable("parquet_streaming_tbl") { spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet") diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index c8e960a15e..d1104d96cd 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -63,6 +63,51 @@ cause Comet to fall back to Spark. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` +### Known Behavioral Differences with `native_datafusion` + +The `native_datafusion` scan has several known behavioral differences compared to Spark. These are tracked in +GitHub issues and cause some Spark SQL tests to be ignored when running with `native_datafusion` enabled. + +**Schema incompatibility handling ([#3311]).** +Spark throws a `SparkException` when reading Parquet files with certain schema mismatches (e.g., incompatible type +changes between writer and reader schemas). The `native_datafusion` implementation handles some of these cases +gracefully through DataFusion's type widening support, or throws errors with different exception types and messages. +This also affects INT96 timestamp detection for `TimestampLTZ`/`TimestampNTZ` types. + +**Dynamic Partition Pruning ([#3313]).** +Dynamic Partition Pruning (DPP) does not work correctly with the `native_datafusion` scan. The `CometNativeScan` +plan node produced by `native_datafusion` may bypass or be incompatible with Spark's DPP filter injection. This +can result in test failures and potentially suboptimal query performance when DPP would otherwise apply. + +**Missing files error handling ([#3314]).** +When files are deleted or become unavailable between planning and execution, the `native_datafusion` scan produces +different error behavior than Spark. The native reader throws an "Object at location ... not found" error instead of +the Spark-expected behavior (which may silently skip missing files depending on configuration). The fallback check +for `ignoreMissingFiles` may not trigger correctly in all cases. + +**Plan structure differences ([#3315]).** +The `native_datafusion` scan produces different plan nodes and partitioning information than Spark expects. +`CometNativeScan` is used instead of `CometScanExec` or `FileSourceScanExec`, and `UnknownPartitioning` is reported +instead of preserving the original partitioning information. This can cause failures in tests that assert on plan +structure, partitioning, or broadcast join selection. + +**Plan serialization crashes ([#3320]).** +Some query patterns can cause `CometNativeExec` to be executed without a serialized plan, resulting in a +`CometRuntimeException`. This occurs when `CometNativeScan` is included in a plan but plan serialization to the +native side did not happen, typically because the query is executed in a non-standard way that bypasses the normal +`CometSparkSessionExtensions` hooks (e.g., certain filter push-down test patterns). + +**Streaming self-union ([#3401]).** +Streaming queries that perform a self-union using the DataSource V1 API return no results when using the +`native_datafusion` scan. This affects structured streaming workloads that union a stream with itself. + +[#3311]: https://github.com/apache/datafusion-comet/issues/3311 +[#3313]: https://github.com/apache/datafusion-comet/issues/3313 +[#3314]: https://github.com/apache/datafusion-comet/issues/3314 +[#3315]: https://github.com/apache/datafusion-comet/issues/3315 +[#3320]: https://github.com/apache/datafusion-comet/issues/3320 +[#3401]: https://github.com/apache/datafusion-comet/issues/3401 + The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 698b68777a..c0651c7f3e 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -20,17 +20,15 @@ package org.apache.comet.rules import java.net.URI - import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MetadataColumnHelper, sideBySide} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec} @@ -40,8 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ - -import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport} +import org.apache.comet.{CometConf, CometExplainInfo, CometNativeException, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType @@ -168,8 +165,13 @@ case class CometScanRule(session: SparkSession) COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO => - // TODO add support for native_datafusion in the future - nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + nativeDataFusionScan(plan, session, scanExec, r, hadoopConf) + .orElse { + // Clear explain info tags from the failed nativeDataFusionScan + // attempt so they don't leak into the fallback path + scanExec.unsetTagValue(CometExplainInfo.EXTENSION_INFO) + nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + } .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)