From 73acaf2305d95eeddc03da5e94f5f570d8b10bb0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 12:18:33 -0600 Subject: [PATCH 1/4] fix: fall back from native_datafusion scan for duplicate fields in case-insensitive mode When Parquet files contain duplicate column names that only differ by case, DataFusion produces a different error than Spark in case-insensitive mode. Add a check in CometScanRule to detect duplicate fields in both the read and data schemas and fall back to Spark's reader when case-insensitive mode is enabled. Physical-file-only duplicates (not reflected in the table schema) cannot be detected at plan time, so the SPARK-25207 test is updated to accept either error message format. Closes #3311 --- dev/diffs/3.5.8.diff | 91 ++++++------------- .../source/contributor-guide/parquet_scans.md | 4 + .../apache/comet/rules/CometScanRule.scala | 16 ++++ 3 files changed, 46 insertions(+), 65 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 3aaecdecb1..4eeef0910b 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..1d2e215d6a3 100644 +index fa1a64460fc..134f0db1fb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1134,31 +1134,18 @@ index d269290e616..13726a31e07 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..b7c234e1437 100644 +index cfc8b2cc845..c4be7eb3731 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector - import scala.collection.mutable.ArrayBuffer - +@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf --import org.apache.spark.sql.{AnalysisException, QueryTest} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest} + import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { - } - } - -- test("Fallback Parquet V2 to V1") { -+ test("Fallback Parquet V2 to V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { - val commands = ArrayBuffer.empty[(String, LogicalPlan)] -@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { +@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { val df = spark.read.format(format).load(path.getCanonicalPath) checkAnswer(df, inputData.toDF()) assert( @@ -2038,7 +2025,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 8e88049f51e..49f2001dc6b 100644 +index 8e88049f51e..6150a556f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -2127,17 +2114,24 @@ index 8e88049f51e..49f2001dc6b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared - } - } +@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + val e = intercept[SparkException] { + sql(s"select a from $tableName where b > 0").collect() + } +- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( +- """Found duplicate field(s) "B": [B, b] in case-insensitive mode""")) ++ assert(e.getCause.isInstanceOf[RuntimeException]) ++ val msg = e.getCause.getMessage ++ // native_datafusion produces a different error message for duplicate fields ++ assert( ++ msg.contains( ++ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") || ++ msg.contains("Unable to get field named"), ++ s"Unexpected error message: $msg") + } -- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { -+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withTempPath { dir => - val count = 10 - val tableName = "spark_25207" -@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { +@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2147,7 +2141,7 @@ index 8e88049f51e..49f2001dc6b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2157,7 +2151,7 @@ index 8e88049f51e..49f2001dc6b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2170,7 +2164,7 @@ index 8e88049f51e..49f2001dc6b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2930,39 +2924,6 @@ index aad91601758..201083bd621 100644 }) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -index b5cf13a9c12..ac17603fb7f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar - - import org.apache.spark.{SparkException, TestUtils} - import org.apache.spark.internal.Logging --import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} -+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode} - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} - import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} -@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi - ) - } - -- test("SPARK-41198: input row calculation with CTE") { -+ test("SPARK-41198: input row calculation with CTE", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - withTable("parquet_tbl", "parquet_streaming_tbl") { - spark.range(0, 10).selectExpr("id AS col1", "id AS col2") - .write.format("parquet").saveAsTable("parquet_tbl") -@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi - } - } - -- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { -+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { - withTable("parquet_streaming_tbl") { - val streamInput = MemoryStream[Int] - val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala index 8f099c31e6b..ce4b7ad25b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index c8e960a15e..2a10bb111d 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -62,6 +62,10 @@ cause Comet to fall back to Spark. - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` +- No support for duplicate field names in case-insensitive mode. When the required or data schema contains + field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates + in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time, + so DataFusion may produce a different error message than Spark in that case. The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results without falling back to Spark: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 698b68777a..c004d77283 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -222,6 +222,22 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") return None } + // Case-insensitive mode with duplicate field names produces different errors + // in DataFusion vs Spark, so fall back to avoid incompatible error messages + if (!session.sessionState.conf.caseSensitiveAnalysis) { + val schemas = Seq(scanExec.requiredSchema, r.dataSchema) + for (schema <- schemas) { + val fieldNames = + schema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT)) + if (fieldNames.length != fieldNames.distinct.length) { + withInfo( + scanExec, + "Native DataFusion scan does not support " + + "duplicate field names in case-insensitive mode") + return None + } + } + } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } From 4f6beaf3968c97a294bf5387a9d3a641437e18c1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 13:11:31 -0600 Subject: [PATCH 2/4] fix: revert FileDataSourceV2FallBackSuite changes from Spark diff --- dev/diffs/3.5.8.diff | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 4eeef0910b..a4aa1f600e 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1133,31 +1133,6 @@ index d269290e616..13726a31e07 100644 }.isDefined === sortExpected) } } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..c4be7eb3731 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer - import org.apache.spark.SparkConf - import org.apache.spark.sql.{AnalysisException, QueryTest} - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} - import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} - import org.apache.spark.sql.connector.read.ScanBuilder - import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { - val df = spark.read.format(format).load(path.getCanonicalPath) - checkAnswer(df, inputData.toDF()) - assert( -- df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec])) -+ df.queryExecution.executedPlan.exists { -+ case _: FileSourceScanExec | _: CometScanExec | _: CometNativeScanExec => true -+ case _ => false -+ } -+ ) - } - } finally { - spark.listenerManager.unregister(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 71e030f535e..d5ae6cbf3d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala From c3caf3718d5be32507ee06d0e0e3a6f09e8f33e6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 13:19:23 -0600 Subject: [PATCH 3/4] Revert "fix: revert FileDataSourceV2FallBackSuite changes from Spark diff" This reverts commit 4f6beaf3968c97a294bf5387a9d3a641437e18c1. --- dev/diffs/3.5.8.diff | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index a4aa1f600e..4eeef0910b 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1133,6 +1133,31 @@ index d269290e616..13726a31e07 100644 }.isDefined === sortExpected) } } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +index cfc8b2cc845..c4be7eb3731 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer + import org.apache.spark.SparkConf + import org.apache.spark.sql.{AnalysisException, QueryTest} + import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} + import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} + import org.apache.spark.sql.connector.read.ScanBuilder + import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { + val df = spark.read.format(format).load(path.getCanonicalPath) + checkAnswer(df, inputData.toDF()) + assert( +- df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec])) ++ df.queryExecution.executedPlan.exists { ++ case _: FileSourceScanExec | _: CometScanExec | _: CometNativeScanExec => true ++ case _ => false ++ } ++ ) + } + } finally { + spark.listenerManager.unregister(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 71e030f535e..d5ae6cbf3d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala From e1c8b9e3d6a1019298895f298e1f2d4f33898256 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 14 Mar 2026 13:23:04 -0600 Subject: [PATCH 4/4] fix diff --- dev/diffs/3.5.8.diff | 56 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 4eeef0910b..60d24f59df 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644 SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index fa1a64460fc..134f0db1fb8 100644 +index fa1a64460fc..1d2e215d6a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1134,18 +1134,31 @@ index d269290e616..13726a31e07 100644 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -index cfc8b2cc845..c4be7eb3731 100644 +index cfc8b2cc845..b7c234e1437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala -@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer +@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector + import scala.collection.mutable.ArrayBuffer + import org.apache.spark.SparkConf - import org.apache.spark.sql.{AnalysisException, QueryTest} +-import org.apache.spark.sql.{AnalysisException, QueryTest} ++import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { +@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { + } + } + +- test("Fallback Parquet V2 to V1") { ++ test("Fallback Parquet V2 to V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { + val commands = ArrayBuffer.empty[(String, LogicalPlan)] +@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { val df = spark.read.format(format).load(path.getCanonicalPath) checkAnswer(df, inputData.toDF()) assert( @@ -2924,6 +2937,39 @@ index aad91601758..201083bd621 100644 }) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +index b5cf13a9c12..ac17603fb7f 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar + + import org.apache.spark.{SparkException, TestUtils} + import org.apache.spark.internal.Logging +-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} ++import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode} + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} + import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation} +@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi + ) + } + +- test("SPARK-41198: input row calculation with CTE") { ++ test("SPARK-41198: input row calculation with CTE", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + withTable("parquet_tbl", "parquet_streaming_tbl") { + spark.range(0, 10).selectExpr("id AS col1", "id AS col2") + .write.format("parquet").saveAsTable("parquet_tbl") +@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi + } + } + +- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { ++ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) { + withTable("parquet_streaming_tbl") { + val streamInput = MemoryStream[Int] + val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala index 8f099c31e6b..ce4b7ad25b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala