Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, OutputWriterFactory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -151,15 +151,17 @@ private[sql] object AvroUtils extends Logging {
case DEFLATE => Some(sqlConf.getConf(SQLConf.AVRO_DEFLATE_LEVEL), codecName)
case XZ => Some(sqlConf.getConf(SQLConf.AVRO_XZ_LEVEL), codecName)
case ZSTANDARD =>
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY,
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED))
DataSourceUtils.setConfIfAbsent(jobConf,
AvroOutputFormat.ZSTD_BUFFERPOOL_KEY,
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED).toString)
Some(sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_LEVEL), "zstd")
case _ => None
}
levelAndCodecName.foreach { case (level, mapredCodecName) =>
val levelKey = s"avro.mapred.$mapredCodecName.level"
DataSourceUtils.setConfIfAbsent(jobConf, levelKey, level.toString)
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} " +
log"codec at level ${MDC(CODEC_LEVEL, level)}")
jobConf.setInt(s"avro.mapred.$mapredCodecName.level", level.toInt)
log"codec at level ${MDC(CODEC_LEVEL, jobConf.get(levelKey))}")
}
} else {
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
Expand All @@ -30,7 +31,7 @@ import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.{sources, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util.{RebaseDateTime, TypeUtils}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, RebaseDateTime, TypeUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
Expand All @@ -42,6 +43,39 @@ import org.apache.spark.util.Utils


object DataSourceUtils extends PredicateHelper {

/**
* Merges write options into a Hadoop configuration. Keys "path" and "paths" are excluded.
* This ensures per-write options are present in the conf even if they were added to the
* options map after the Hadoop conf was originally created.
*/
def mergeWriteOptionsIntoHadoopConf(
options: Map[String, String], conf: Configuration): Unit = {
// CaseInsensitiveMap.iterator yields lowercased keys, but Hadoop Configuration is
// case-sensitive. Use the original map to preserve the user's key casing.
val rawOptions = options match {
case ci: CaseInsensitiveMap[String @unchecked] => ci.originalMap
case other => other
}
rawOptions.foreach { case (k, v) =>
if ((v ne null) && k != "path" && k != "paths") {
conf.set(k, v)
}
}
}

/**
* Sets a key in the Hadoop configuration only if it is not already present. Per-write
* options should be merged into the conf first (via [[mergeWriteOptionsIntoHadoopConf]]),
* so a non-null value means the user explicitly set a per-write option which should take
* precedence over the session-level SQLConf default.
*/
def setConfIfAbsent(conf: Configuration, key: String, value: => String): Unit = {
if (conf.get(key) == null) {
conf.set(key, value)
}
}

/**
* The key to use for storing partitionBy columns as options.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

// Merge write options into the job's configuration so that per-write options
// take precedence over session-level defaults set later in prepareWrite.
DataSourceUtils.mergeWriteOptionsIntoHadoopConf(options, job.getConfiguration)

val partitionSet = AttributeSet(partitionColumns)
// cleanup the internal metadata information of
// the file source metadata attribute if any before write out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED
Expand Down Expand Up @@ -507,23 +507,15 @@ object ParquetUtils extends Logging {

// Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
// schema and writes actual rows to Parquet files.
conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sqlConf.writeLegacyParquetFormat.toString)

conf.set(
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sqlConf.parquetOutputTimestampType.toString)

conf.set(
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sqlConf.parquetFieldIdWriteEnabled.toString)

conf.set(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sqlConf.legacyParquetNanosAsLong.toString)

conf.set(
DataSourceUtils.setConfIfAbsent(conf,
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString)
DataSourceUtils.setConfIfAbsent(conf,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, sqlConf.parquetOutputTimestampType.toString)
DataSourceUtils.setConfIfAbsent(conf,
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString)
DataSourceUtils.setConfIfAbsent(conf,
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong.toString)
DataSourceUtils.setConfIfAbsent(conf,
SQLConf.PARQUET_ANNOTATE_VARIANT_LOGICAL_TYPE.key,
sqlConf.parquetAnnotateVariantLogicalType.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.TestUtils
import org.apache.spark.memory.MemoryMode
Expand Down Expand Up @@ -236,4 +237,27 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
}
}
}

test("per-write options take precedence over session config") {
val hadoopConf = spark.sessionState.newHadoopConf()
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/test.parquet"
// Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS.
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts")
.write
.option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS")
.mode("overwrite")
.parquet(path)
}

for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) {
val schema = footer.getParquetMetadata.getFileMetaData.getSchema
val tsField = schema.getFields.asScala.find(_.getName == "ts").get
.asPrimitiveType()
// TIMESTAMP_MICROS is stored as INT64, not INT96
assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,44 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}

test("SPARK-56414: per-write options take precedence over session config in streaming sink") {
val inputData = MemoryStream[java.sql.Timestamp]

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

// Session sets INT96, but the per-write option overrides to TIMESTAMP_MICROS.
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
var query: StreamingQuery = null
try {
query = inputData.toDF()
.toDF("ts")
.writeStream
.option("checkpointLocation", checkpointDir)
.option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS")
.format("parquet")
.start(outputDir)
inputData.addData(java.sql.Timestamp.valueOf("2024-01-01 12:00:00"))
query.processAllAvailable()
} finally {
if (query != null) query.stop()
}
}

// Read back and verify the timestamp column is INT64 (TIMESTAMP_MICROS), not INT96.
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
val hadoopConf = spark.sessionState.newHadoopConf()
val parquetFiles = new java.io.File(outputDir).listFiles()
.filter(_.getName.endsWith(".parquet"))
assert(parquetFiles.nonEmpty, "Expected at least one parquet file")
val footer = ParquetFileReader.readFooter(hadoopConf,
new Path(parquetFiles.head.getAbsolutePath))
val tsField = footer.getFileMetaData.getSchema.getFields.asScala
.find(_.getName == "ts").get.asPrimitiveType()
assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64)
}

test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") {
val fileFormat = new ParquetFileFormat() // any valid FileFormat
val partitionColumnNames = Seq.empty[String]
Expand Down