diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index f9e15322c031b..02220e6c85688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.avro.AvroCompressionCodec._ import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriterFactory} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -151,15 +151,17 @@ private[sql] object AvroUtils extends Logging { case DEFLATE => Some(sqlConf.getConf(SQLConf.AVRO_DEFLATE_LEVEL), codecName) case XZ => Some(sqlConf.getConf(SQLConf.AVRO_XZ_LEVEL), codecName) case ZSTANDARD => - jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, - sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)) + DataSourceUtils.setConfIfAbsent(jobConf, + AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, + sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED).toString) Some(sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_LEVEL), "zstd") case _ => None } levelAndCodecName.foreach { case (level, mapredCodecName) => + val levelKey = s"avro.mapred.$mapredCodecName.level" + DataSourceUtils.setConfIfAbsent(jobConf, levelKey, level.toString) logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} " + - log"codec at level ${MDC(CODEC_LEVEL, level)}") - jobConf.setInt(s"avro.mapred.$mapredCodecName.level", level.toInt) + log"codec at level ${MDC(CODEC_LEVEL, jobConf.get(levelKey))}") } } else { logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index d43c0355017f6..fe7e8f3a89c0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.jdk.CollectionConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization @@ -30,7 +31,7 @@ import org.apache.spark.{SparkException, SparkUpgradeException} import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper} -import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, RebaseDateTime, TypeUtils} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions @@ -42,6 +43,39 @@ import org.apache.spark.util.Utils object DataSourceUtils extends PredicateHelper { + + /** + * Merges write options into a Hadoop configuration. Keys "path" and "paths" are excluded. + * This ensures per-write options are present in the conf even if they were added to the + * options map after the Hadoop conf was originally created. + */ + def mergeWriteOptionsIntoHadoopConf( + options: Map[String, String], conf: Configuration): Unit = { + // CaseInsensitiveMap.iterator yields lowercased keys, but Hadoop Configuration is + // case-sensitive. Use the original map to preserve the user's key casing. + val rawOptions = options match { + case ci: CaseInsensitiveMap[String @unchecked] => ci.originalMap + case other => other + } + rawOptions.foreach { case (k, v) => + if ((v ne null) && k != "path" && k != "paths") { + conf.set(k, v) + } + } + } + + /** + * Sets a key in the Hadoop configuration only if it is not already present. Per-write + * options should be merged into the conf first (via [[mergeWriteOptionsIntoHadoopConf]]), + * so a non-null value means the user explicitly set a per-write option which should take + * precedence over the session-level SQLConf default. + */ + def setConfIfAbsent(conf: Configuration, key: String, value: => String): Unit = { + if (conf.get(key) == null) { + conf.set(key, value) + } + } + /** * The key to use for storing partitionBy columns as options. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 55e2271dc058b..59c103577e136 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -102,6 +102,10 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) + // Merge write options into the job's configuration so that per-write options + // take precedence over session-level defaults set later in prepareWrite. + DataSourceUtils.mergeWriteOptionsIntoHadoopConf(options, job.getConfiguration) + val partitionSet = AttributeSet(partitionColumns) // cleanup the internal metadata information of // the file source metadata attribute if any before write out diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 1f11a67b08fff..b4e0fe5fddb81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} -import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED @@ -507,23 +507,15 @@ object ParquetUtils extends Logging { // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet // schema and writes actual rows to Parquet files. - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sqlConf.writeLegacyParquetFormat.toString) - - conf.set( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, - sqlConf.parquetOutputTimestampType.toString) - - conf.set( - SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, - sqlConf.parquetFieldIdWriteEnabled.toString) - - conf.set( - SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, - sqlConf.legacyParquetNanosAsLong.toString) - - conf.set( + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, sqlConf.parquetOutputTimestampType.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) + DataSourceUtils.setConfIfAbsent(conf, + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong.toString) + DataSourceUtils.setConfIfAbsent(conf, SQLConf.PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE.key, sqlConf.parquetAnnotateVariantLogicalType.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index cd6f41b4ef45e..a2391a3ec22f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.TestUtils import org.apache.spark.memory.MemoryMode @@ -236,4 +237,27 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("per-write options take precedence over session config") { + val hadoopConf = spark.sessionState.newHadoopConf() + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + // Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS. + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts") + .write + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .mode("overwrite") + .parquet(path) + } + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + val schema = footer.getParquetMetadata.getFileMetaData.getSchema + val tsField = schema.getFields.asScala.find(_.getName == "ts").get + .asPrimitiveType() + // TIMESTAMP_MICROS is stored as INT64, not INT96 + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 4c06a0109e347..87ec48c3743d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -697,6 +697,44 @@ abstract class FileStreamSinkSuite extends StreamTest { } } + test("SPARK-56414: per-write options take precedence over session config in streaming sink") { + val inputData = MemoryStream[java.sql.Timestamp] + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + // Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS. + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + var query: StreamingQuery = null + try { + query = inputData.toDF() + .toDF("ts") + .writeStream + .option("checkpointLocation", checkpointDir) + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .format("parquet") + .start(outputDir) + inputData.addData(java.sql.Timestamp.valueOf("2024-01-01 12:00:00")) + query.processAllAvailable() + } finally { + if (query != null) query.stop() + } + } + + // Read back and verify the timestamp column is INT64 (TIMESTAMP_MICROS), not INT96. + import org.apache.parquet.hadoop.ParquetFileReader + import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + val hadoopConf = spark.sessionState.newHadoopConf() + val parquetFiles = new java.io.File(outputDir).listFiles() + .filter(_.getName.endsWith(".parquet")) + assert(parquetFiles.nonEmpty, "Expected at least one parquet file") + val footer = ParquetFileReader.readFooter(hadoopConf, + new Path(parquetFiles.head.getAbsolutePath)) + val tsField = footer.getFileMetaData.getSchema.getFields.asScala + .find(_.getName == "ts").get.asPrimitiveType() + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") { val fileFormat = new ParquetFileFormat() // any valid FileFormat val partitionColumnNames = Seq.empty[String]