From 6538d47057ce0293401b2a05e2b028f878004d1f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 9 Apr 2026 13:50:01 +0000 Subject: [PATCH 1/4] [SPARK-xxxx][SQL] Per-write options should take precedence over session config in Parquet and Avro Co-authored-by: Isaac --- .../org/apache/spark/sql/avro/AvroUtils.scala | 12 +++--- .../datasources/DataSourceUtils.scala | 14 +++++++ .../datasources/parquet/ParquetUtils.scala | 31 ++++++-------- .../parquet/ParquetEncodingSuite.scala | 42 +++++++++++++++++++ 4 files changed, 76 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index f9e15322c031b..02220e6c85688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.avro.AvroCompressionCodec._ import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriterFactory} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -151,15 +151,17 @@ private[sql] object AvroUtils extends Logging { case DEFLATE => Some(sqlConf.getConf(SQLConf.AVRO_DEFLATE_LEVEL), codecName) case XZ => Some(sqlConf.getConf(SQLConf.AVRO_XZ_LEVEL), codecName) case ZSTANDARD => - jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, - sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)) + DataSourceUtils.setConfIfAbsent(jobConf, + AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, + sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED).toString) Some(sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_LEVEL), "zstd") case _ => None } levelAndCodecName.foreach { case (level, mapredCodecName) => + val levelKey = s"avro.mapred.$mapredCodecName.level" + DataSourceUtils.setConfIfAbsent(jobConf, levelKey, level.toString) logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} " + - log"codec at level ${MDC(CODEC_LEVEL, level)}") - jobConf.setInt(s"avro.mapred.$mapredCodecName.level", level.toInt) + log"codec at level ${MDC(CODEC_LEVEL, jobConf.get(levelKey))}") } } else { logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index d43c0355017f6..fb488bc779ab3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.jdk.CollectionConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization @@ -42,6 +43,19 @@ import org.apache.spark.util.Utils object DataSourceUtils extends PredicateHelper { + + /** + * Sets a key in the Hadoop configuration only if it is not already present. Write options + * are merged into the conf upstream via `newHadoopConfWithOptions`, so a non-null value + * means the user explicitly set a per-write option which should take precedence over the + * session-level SQLConf default. + */ + def setConfIfAbsent(conf: Configuration, key: String, value: => String): Unit = { + if (conf.get(key) == null) { + conf.set(key, value) + } + } + /** * The key to use for storing partitionBy columns as options. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 1f11a67b08fff..93603ee9503a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} -import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED @@ -507,23 +507,18 @@ object ParquetUtils extends Logging { // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet // schema and writes actual rows to Parquet files. - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sqlConf.writeLegacyParquetFormat.toString) - - conf.set( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, - sqlConf.parquetOutputTimestampType.toString) - - conf.set( - SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, - sqlConf.parquetFieldIdWriteEnabled.toString) - - conf.set( - SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - sqlConf.legacyParquetNanosAsLong.toString) - - conf.set( + // For each key, only set from SQLConf if not already present in the conf. Write options + // are already merged into the conf upstream via `newHadoopConfWithOptions`, so a non-null + // value means the user explicitly set a per-write option which should take precedence. + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, sqlConf.parquetOutputTimestampType.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong.toString) + DataSourceUtils.setConfIfAbsent(conf, SQLConf.PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE.key, sqlConf.parquetAnnotateVariantLogicalType.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index cd6f41b4ef45e..c434411023799 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.TestUtils import org.apache.spark.memory.MemoryMode @@ -236,4 +237,45 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("per-write options take precedence over session config") { + val hadoopConf = spark.sessionState.newHadoopConf() + // Test outputTimestampType: session sets INT96, write option overrides to TIMESTAMP_MICROS. + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts") + .write + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .mode("overwrite") + .parquet(path) + } + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + val schema = footer.getParquetMetadata.getFileMetaData.getSchema + val tsField = schema.getFields.asScala.find(_.getName == "ts").get + .asPrimitiveType() + // TIMESTAMP_MICROS is stored as INT64, not INT96 + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + } + + // Test writerVersion: write option sets PARQUET_2_0 which uses delta encoding. + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + spark.range(1, 100).toDF("id") + .write + .option(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + .mode("overwrite") + .parquet(path) + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + for (blockMetadata <- footer.getParquetMetadata.getBlocks.asScala) { + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + assert(columnChunkMetadata.getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + } + } + } + } } From 4ef5b366c63c108ff3894699ef9d37d2af42c0d0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 9 Apr 2026 13:54:15 +0000 Subject: [PATCH 2/4] Remove redundant comment in ParquetUtils Co-authored-by: Isaac --- .../spark/sql/execution/datasources/parquet/ParquetUtils.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 93603ee9503a4..b4e0fe5fddb81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -507,9 +507,6 @@ object ParquetUtils extends Logging { // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet // schema and writes actual rows to Parquet files. - // For each key, only set from SQLConf if not already present in the conf. Write options - // are already merged into the conf upstream via `newHadoopConfWithOptions`, so a non-null - // value means the user explicitly set a per-write option which should take precedence. DataSourceUtils.setConfIfAbsent(conf, SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) DataSourceUtils.setConfIfAbsent(conf, From 11f9cf991138ffe309890c05e1e322a7aa0ac43e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 9 Apr 2026 15:15:38 +0000 Subject: [PATCH 3/4] Remove writerVersion test case that doesn't test the fix Co-authored-by: Isaac --- .../parquet/ParquetEncodingSuite.scala | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index c434411023799..a2391a3ec22f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -240,9 +240,9 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess test("per-write options take precedence over session config") { val hadoopConf = spark.sessionState.newHadoopConf() - // Test outputTimestampType: session sets INT96, write option overrides to TIMESTAMP_MICROS. withTempPath { dir => val path = s"${dir.getCanonicalPath}/test.parquet" + // Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS. withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts") .write @@ -259,23 +259,5 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) } } - - // Test writerVersion: write option sets PARQUET_2_0 which uses delta encoding. - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/test.parquet" - spark.range(1, 100).toDF("id") - .write - .option(ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_2_0.toString) - .mode("overwrite") - .parquet(path) - - for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { - for (blockMetadata <- footer.getParquetMetadata.getBlocks.asScala) { - val columnChunkMetadata = blockMetadata.getColumns.asScala.head - assert(columnChunkMetadata.getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) - } - } - } } } From d00d6210b53e0c004472e54841a8ce23cee32e57 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 9 Apr 2026 17:01:46 +0000 Subject: [PATCH 4/4] Merge write options into Job conf in FileFormatWriter and add streaming test Co-authored-by: Isaac --- .../datasources/DataSourceUtils.scala | 30 ++++++++++++--- .../datasources/FileFormatWriter.scala | 4 ++ .../sql/streaming/FileStreamSinkSuite.scala | 38 +++++++++++++++++++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index fb488bc779ab3..fe7e8f3a89c0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SparkException, SparkUpgradeException} import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper} -import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, RebaseDateTime, TypeUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions @@ -45,10 +45,30 @@ import org.apache.spark.util.Utils object DataSourceUtils extends PredicateHelper { /** - * Sets a key in the Hadoop configuration only if it is not already present. Write options - * are merged into the conf upstream via `newHadoopConfWithOptions`, so a non-null value - * means the user explicitly set a per-write option which should take precedence over the - * session-level SQLConf default. + * Merges write options into a Hadoop configuration. Keys "path" and "paths" are excluded. + * This ensures per-write options are present in the conf even if they were added to the + * options map after the Hadoop conf was originally created. + */ + def mergeWriteOptionsIntoHadoopConf( + options: Map[String, String], conf: Configuration): Unit = { + // CaseInsensitiveMap.iterator yields lowercased keys, but Hadoop Configuration is + // case-sensitive. Use the original map to preserve the user's key casing. + val rawOptions = options match { + case ci: CaseInsensitiveMap[String @unchecked] => ci.originalMap + case other => other + } + rawOptions.foreach { case (k, v) => + if ((v ne null) && k != "path" && k != "paths") { + conf.set(k, v) + } + } + } + + /** + * Sets a key in the Hadoop configuration only if it is not already present. Per-write + * options should be merged into the conf first (via [[mergeWriteOptionsIntoHadoopConf]]), + * so a non-null value means the user explicitly set a per-write option which should take + * precedence over the session-level SQLConf default. */ def setConfIfAbsent(conf: Configuration, key: String, value: => String): Unit = { if (conf.get(key) == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 55e2271dc058b..59c103577e136 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -102,6 +102,10 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + // Merge write options into the job's configuration so that per-write options + // take precedence over session-level defaults set later in prepareWrite. + DataSourceUtils.mergeWriteOptionsIntoHadoopConf(options, job.getConfiguration) + val partitionSet = AttributeSet(partitionColumns) // cleanup the internal metadata information of // the file source metadata attribute if any before write out diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 4c06a0109e347..87ec48c3743d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -697,6 +697,44 @@ abstract class FileStreamSinkSuite extends StreamTest { } } + test("SPARK-56414: per-write options take precedence over session config in streaming sink") { + val inputData = MemoryStream[java.sql.Timestamp] + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + // Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS. + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + var query: StreamingQuery = null + try { + query = inputData.toDF() + .toDF("ts") + .writeStream + .option("checkpointLocation", checkpointDir) + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .format("parquet") + .start(outputDir) + inputData.addData(java.sql.Timestamp.valueOf("2024-01-01 12:00:00")) + query.processAllAvailable() + } finally { + if (query != null) query.stop() + } + } + + // Read back and verify the timestamp column is INT64 (TIMESTAMP_MICROS), not INT96. + import org.apache.parquet.hadoop.ParquetFileReader + import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + val hadoopConf = spark.sessionState.newHadoopConf() + val parquetFiles = new java.io.File(outputDir).listFiles() + .filter(_.getName.endsWith(".parquet")) + assert(parquetFiles.nonEmpty, "Expected at least one parquet file") + val footer = ParquetFileReader.readFooter(hadoopConf, + new Path(parquetFiles.head.getAbsolutePath)) + val tsField = footer.getFileMetaData.getSchema.getFields.asScala + .find(_.getName == "ts").get.asPrimitiveType() + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") { val fileFormat = new ParquetFileFormat() // any valid FileFormat val partitionColumnNames = Seq.empty[String]