From d55d399a1d9b7c5c3767cd6fc9979dc62b344e07 Mon Sep 17 00:00:00 2001 From: Divjot Arora Date: Thu, 9 Apr 2026 11:52:12 +0000 Subject: [PATCH] [SPARK-XXXXX][SQL] Add writer version and output timestamp type write option plumbing Add WRITER_VERSION and OUTPUT_TIMESTAMP_TYPE options to ParquetOptions so callers (e.g., Delta Lake) can control the Parquet writer version and timestamp encoding through the write options map. Wire both through ParquetUtils.prepareWrite() to the Hadoop Configuration. Co-authored-by: Isaac --- .../datasources/parquet/ParquetOptions.scala | 13 ++++++ .../datasources/parquet/ParquetUtils.scala | 8 ++-- .../parquet/ParquetEncodingSuite.scala | 42 +++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 36658156cb8b..79009f1f38a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -62,6 +62,17 @@ class ParquetOptions( shortParquetCompressionCodecNames(codecName).name() } + /** + * Parquet writer version to use. If this returns None, the default format should be used. + */ + val writerVersion: Option[String] = parameters.get(WRITER_VERSION) + + /** + * The Parquet physical type to use for timestamp columns. If None, the default type from the + * SQL conf should be used. + */ + val outputTimestampType: Option[String] = parameters.get(OUTPUT_TIMESTAMP_TYPE) + /** * Whether it merges schemas or not. When the given Parquet files have different schemas, * the schemas can be merged. By default use the value specified in SQLConf. @@ -108,6 +119,8 @@ object ParquetOptions extends DataSourceOptions { val MERGE_SCHEMA = newOption("mergeSchema") val PARQUET_COMPRESSION = newOption(ParquetOutputFormat.COMPRESSION) val COMPRESSION = newOption("compression") + val WRITER_VERSION = newOption(ParquetOutputFormat.WRITER_VERSION) + val OUTPUT_TIMESTAMP_TYPE = newOption(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) // The option controls rebasing of the DATE and TIMESTAMP values between // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 1f11a67b08ff..9360dc444ffd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -511,9 +511,9 @@ object ParquetUtils extends Logging { SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) - conf.set( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, - sqlConf.parquetOutputTimestampType.toString) + val outputTimestampType = parquetOptions.outputTimestampType + .getOrElse(sqlConf.parquetOutputTimestampType.toString) + conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, outputTimestampType) conf.set( SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, @@ -530,6 +530,8 @@ object ParquetUtils extends Logging { // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + parquetOptions.writerVersion.foreach(conf.set(ParquetOptions.WRITER_VERSION, _)) + // SPARK-15719: Disables writing Parquet summary files by default. if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index cd6f41b4ef45..a50b52f5311a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.TestUtils import org.apache.spark.memory.MemoryMode @@ -236,4 +237,45 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("writer version write option produces v2 data pages") { + val hadoopConf = spark.sessionState.newHadoopConf() + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + spark.range(1, 100).toDF("id") + .write + .option(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + .mode("overwrite") + .parquet(path) + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + for (blockMetadata <- footer.getParquetMetadata.getBlocks.asScala) { + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + assert(columnChunkMetadata.getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + } + } + } + } + + test("output timestamp type write option overrides session default") { + val hadoopConf = spark.sessionState.newHadoopConf() + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts") + .write + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .mode("overwrite") + .parquet(path) + } + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + val schema = footer.getParquetMetadata.getFileMetaData.getSchema + val tsField = schema.getFields.asScala.find(_.getName == "ts").get + .asPrimitiveType() + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + } + } }