diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 36658156cb8b..79009f1f38a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -62,6 +62,17 @@ class ParquetOptions( shortParquetCompressionCodecNames(codecName).name() } + /** + * Parquet writer version to use. If this returns None, the default format should be used. + */ + val writerVersion: Option[String] = parameters.get(WRITER_VERSION) + + /** + * The Parquet physical type to use for timestamp columns. If None, the default type from the + * SQL conf should be used. + */ + val outputTimestampType: Option[String] = parameters.get(OUTPUT_TIMESTAMP_TYPE) + /** * Whether it merges schemas or not. When the given Parquet files have different schemas, * the schemas can be merged. By default use the value specified in SQLConf. @@ -108,6 +119,8 @@ object ParquetOptions extends DataSourceOptions { val MERGE_SCHEMA = newOption("mergeSchema") val PARQUET_COMPRESSION = newOption(ParquetOutputFormat.COMPRESSION) val COMPRESSION = newOption("compression") + val WRITER_VERSION = newOption(ParquetOutputFormat.WRITER_VERSION) + val OUTPUT_TIMESTAMP_TYPE = newOption(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) // The option controls rebasing of the DATE and TIMESTAMP values between // Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 1f11a67b08ff..9360dc444ffd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -511,9 +511,9 @@ object ParquetUtils extends Logging { SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, sqlConf.writeLegacyParquetFormat.toString) - conf.set( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, - sqlConf.parquetOutputTimestampType.toString) + val outputTimestampType = parquetOptions.outputTimestampType + .getOrElse(sqlConf.parquetOutputTimestampType.toString) + conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, outputTimestampType) conf.set( SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, @@ -530,6 +530,8 @@ object ParquetUtils extends Logging { // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) + parquetOptions.writerVersion.foreach(conf.set(ParquetOptions.WRITER_VERSION, _)) + // SPARK-15719: Disables writing Parquet summary files by default. if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index cd6f41b4ef45..a50b52f5311a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.TestUtils import org.apache.spark.memory.MemoryMode @@ -236,4 +237,45 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess } } } + + test("writer version write option produces v2 data pages") { + val hadoopConf = spark.sessionState.newHadoopConf() + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + spark.range(1, 100).toDF("id") + .write + .option(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + .mode("overwrite") + .parquet(path) + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + for (blockMetadata <- footer.getParquetMetadata.getBlocks.asScala) { + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + assert(columnChunkMetadata.getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + } + } + } + } + + test("output timestamp type write option overrides session default") { + val hadoopConf = spark.sessionState.newHadoopConf() + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + spark.sql("SELECT TIMESTAMP '2024-01-01 12:00:00' AS ts") + .write + .option(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "TIMESTAMP_MICROS") + .mode("overwrite") + .parquet(path) + } + + for (footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)) { + val schema = footer.getParquetMetadata.getFileMetaData.getSchema + val tsField = schema.getFields.asScala.find(_.getName == "ts").get + .asPrimitiveType() + assert(tsField.getPrimitiveTypeName === PrimitiveTypeName.INT64) + } + } + } }