diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 71c117ab36ad..4daa581b589c 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -135,6 +135,10 @@ private TableProperties() {} "write.delete.parquet.page-size-bytes"; public static final int PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1 MB + public static final String PARQUET_FORMAT_VERSION = "write.parquet.format-version"; + public static final String DELETE_PARQUET_FORMAT_VERSION = "write.delete.parquet.format-version"; + public static final String PARQUET_FORMAT_VERSION_DEFAULT = "v1"; + public static final String PARQUET_PAGE_VERSION = "write.parquet.page-version"; public static final String DELETE_PARQUET_PAGE_VERSION = "write.delete.parquet.page-version"; public static final String PARQUET_PAGE_VERSION_DEFAULT = "v1"; diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index c9458ca4b1c6..d80b568890d1 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -46,7 +46,10 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size | | write.parquet.row-group-size-track-uncompressed | false | Track raw data size to enforce the row group size target accurately with compressing codecs | | write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | -| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) | +| write.parquet.format-version | v1 | Parquet writer format version: v1 (PARQUET_1_0) or v2 (PARQUET_2_0) | +| write.delete.parquet.format-version | data file format version | Parquet writer format version for delete files: v1 or v2 | +| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) | +| write.delete.parquet.page-version | data file page version | Parquet data page version for delete files: v1 (DataPage V1) or v2 (DataPage V2) | | write.parquet.page-row-limit | 20000 | Parquet page row limit | | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | | write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md index 2f70cbf576d8..de466eb19cc0 100644 --- a/docs/docs/flink-configuration.md +++ b/docs/docs/flink-configuration.md @@ -160,6 +160,7 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | write-parallelism | Upstream operator parallelism | Overrides the writer parallelism | | uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table | +| parquet-format-version | Table write.parquet.format-version | Overrides this table's Parquet writer format version for this write (v1 or v2) | | shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write | | variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size for this write | diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index 8d9dd6dd2756..27287958bc7e 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -196,6 +196,7 @@ val spark = SparkSession.builder() | spark.sql.iceberg.compression-codec | Table default | Write compression codec (e.g., `zstd`, `snappy`) | | spark.sql.iceberg.compression-level | Table default | Compression level for Parquet/Avro | | spark.sql.iceberg.compression-strategy | Table default | Compression strategy for ORC | +| spark.sql.iceberg.parquet-format-version | Table default | Parquet writer format version for writes (v1 or v2) | | spark.sql.iceberg.data-planning-mode | AUTO | Scan planning mode for data files (`AUTO`, `LOCAL`, `DISTRIBUTED`) | | spark.sql.iceberg.delete-planning-mode | AUTO | Scan planning mode for delete files (`AUTO`, `LOCAL`, `DISTRIBUTED`) | | spark.sql.iceberg.advisory-partition-size | Table default | Advisory size (bytes) used for writing to the Table when Spark's Adaptive Query Execution is enabled. Used to size output files | @@ -266,6 +267,7 @@ df.writeTo("catalog.db.table") | compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | | distribution-mode | See [Spark Writes](spark-writes.md#writing-distribution-modes) for defaults | Override this table's distribution mode for this write | | delete-granularity | file | Override this table's delete granularity for this write | +| parquet-format-version | Table write.parquet.format-version | Overrides this table's Parquet writer format version for this write (v1 or v2) | | shred-variants | false | Overrides this table's write.parquet.shred-variants for this write | | variant-inference-buffer-size | 100 | Overrides this table's write.parquet.variant-inference-buffer-size for this write | diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 990d23f2aaff..8b432f27acda 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -262,4 +262,13 @@ public Duration tableRefreshInterval() { .flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL) .parseOptional(); } + + public String parquetFormatVersion() { + return confParser + .stringConf() + .option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key()) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index ee2aeaa45007..dc96eeb43623 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -105,4 +105,7 @@ private FlinkWriteOptions() {} // specify the uidSuffix to be used for the underlying IcebergSink public static final ConfigOption UID_SUFFIX = ConfigOptions.key("uid-suffix").stringType().defaultValue(""); + + public static final ConfigOption PARQUET_FORMAT_VERSION = + ConfigOptions.key("parquet-format-version").stringType().noDefaultValue(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index b3a9ac6ba2eb..76044d7619a7 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import java.util.List; import java.util.Map; @@ -128,6 +129,8 @@ public static Map writeProperties( writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } + writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion()); + break; case AVRO: writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 990d23f2aaff..8b432f27acda 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -262,4 +262,13 @@ public Duration tableRefreshInterval() { .flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL) .parseOptional(); } + + public String parquetFormatVersion() { + return confParser + .stringConf() + .option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key()) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index ee2aeaa45007..dc96eeb43623 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -105,4 +105,7 @@ private FlinkWriteOptions() {} // specify the uidSuffix to be used for the underlying IcebergSink public static final ConfigOption UID_SUFFIX = ConfigOptions.key("uid-suffix").stringType().defaultValue(""); + + public static final ConfigOption PARQUET_FORMAT_VERSION = + ConfigOptions.key("parquet-format-version").stringType().noDefaultValue(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index b3a9ac6ba2eb..76044d7619a7 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import java.util.List; import java.util.Map; @@ -128,6 +129,8 @@ public static Map writeProperties( writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } + writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion()); + break; case AVRO: writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index fd3fccb224a2..6c4e595714bb 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -280,4 +280,13 @@ public int parquetVariantInferenceBufferSize() { .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) .parse(); } + + public String parquetFormatVersion() { + return confParser + .stringConf() + .option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key()) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 1fdd6df8d753..777d24378a55 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -111,4 +111,7 @@ private FlinkWriteOptions() {} public static final ConfigOption VARIANT_INFERENCE_BUFFER_SIZE = ConfigOptions.key("variant-inference-buffer-size").intType().noDefaultValue(); + + public static final ConfigOption PARQUET_FORMAT_VERSION = + ConfigOptions.key("parquet-format-version").stringType().noDefaultValue(); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java index d4c3d3beb80f..530862b91653 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE; @@ -133,6 +134,7 @@ public static Map writeProperties( writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.parquetShredVariants())); writeProperties.put( PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.parquetVariantInferenceBufferSize())); + writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion()); break; case AVRO: diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index bcaf394c4e8b..fd654be3a885 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_VERSION; @@ -39,12 +40,13 @@ import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_PAGE_VERSION; -import static org.apache.iceberg.TableProperties.PARQUET_PAGE_VERSION_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; @@ -274,10 +276,14 @@ public WriteBuilder writerVersion(WriterVersion version) { version == WriterVersion.PARQUET_1_0 || version == WriterVersion.PARQUET_2_0, "Unsupported writer version: %s", version); - config.put(PARQUET_PAGE_VERSION, version.name()); + config.put(PARQUET_FORMAT_VERSION, writerVersionToFormatVersion(version)); return this; } + private static String writerVersionToFormatVersion(WriterVersion version) { + return version == WriterVersion.PARQUET_2_0 ? "v2" : "v1"; + } + public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { this.fileEncryptionKey = encryptionKey; return this; @@ -572,9 +578,7 @@ static Context dataContext(Map config) { config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT); Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); - WriterVersion writerVersion = - toWriterVersion( - config.getOrDefault(PARQUET_PAGE_VERSION, PARQUET_PAGE_VERSION_DEFAULT)); + WriterVersion writerVersion = resolveWriterVersion(config); String codecAsString = config.getOrDefault(PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT); @@ -671,11 +675,7 @@ static Context deleteContext(Map config) { config, DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize()); Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0"); - String deletePageVersion = config.get(DELETE_PARQUET_PAGE_VERSION); - WriterVersion writerVersion = - deletePageVersion != null - ? toWriterVersion(deletePageVersion) - : dataContext.writerVersion(); + WriterVersion writerVersion = resolveDeleteWriterVersion(config, dataContext); String codecAsString = config.get(DELETE_PARQUET_COMPRESSION); CompressionCodecName codec = @@ -733,15 +733,55 @@ private static CompressionCodecName toCodec(String codecAsString) { } } - private static WriterVersion toWriterVersion(String pageVersion) { + private static WriterVersion resolveWriterVersion(Map config) { + String formatVersion = config.get(PARQUET_FORMAT_VERSION); + if (formatVersion != null) { + return toWriterVersion(formatVersion, "format version"); + } + + String pageVersion = config.get(PARQUET_PAGE_VERSION); + if (pageVersion != null) { + return toWriterVersion(pageVersion, "page version"); + } + + return toWriterVersion(PARQUET_FORMAT_VERSION_DEFAULT, "format version"); + } + + private static WriterVersion resolveDeleteWriterVersion( + Map config, Context dataContext) { + String deleteFormatVersion = config.get(DELETE_PARQUET_FORMAT_VERSION); + if (deleteFormatVersion != null) { + return toWriterVersion(deleteFormatVersion, "format version"); + } + + String deletePageVersion = config.get(DELETE_PARQUET_PAGE_VERSION); + if (deletePageVersion != null) { + return toWriterVersion(deletePageVersion, "page version"); + } + + return dataContext.writerVersion(); + } + + private static WriterVersion toWriterVersion(String version, String versionType) { try { - return WriterVersion.fromString(pageVersion); + return WriterVersion.fromString(normalizeWriterVersion(version)); } catch (IllegalArgumentException e) { + String validValues = "v1 or v2"; throw new IllegalArgumentException( - "Unsupported Parquet page version: " + pageVersion + " (must be v1 or v2)"); + "Unsupported Parquet " + + versionType + + ": " + + version + + " (must be " + + validValues + + ")"); } } + private static String normalizeWriterVersion(String version) { + return version; + } + int rowGroupSize() { return rowGroupSize; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFormatVersion.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFormatVersion.java new file mode 100644 index 000000000000..88228ea97319 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFormatVersion.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestParquetFormatVersion { + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + private List records; + + @TempDir private Path temp; + + @BeforeEach + void createRecords() { + GenericRecord record = GenericRecord.create(SCHEMA); + + this.records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "data", "a")), + record.copy(ImmutableMap.of("id", 2L, "data", "b")), + record.copy(ImmutableMap.of("id", 3L, "data", "c")), + record.copy(ImmutableMap.of("id", 4L, "data", "d")), + record.copy(ImmutableMap.of("id", 5L, "data", "e"))); + } + + @Test + void testWriterDefaultsToFormatVersion1() throws IOException { + OutputFile outputFile = newOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .build()) { + writer.addAll(records); + } + + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV1.class); + } + + @Test + void testWriterUsesConfiguredFormatVersion() throws IOException { + OutputFile outputFile = newOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .set(TableProperties.PARQUET_FORMAT_VERSION, "v2") + .createWriterFunc(GenericParquetWriter::create) + .build()) { + writer.addAll(records); + } + + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV2.class); + } + + @Test + void testDeleteWriterUsesDeleteSpecificFormatVersion() throws IOException { + OutputFile outputFile = newOutputFile(); + + EqualityDeleteWriter deleteWriter = + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .set(TableProperties.PARQUET_FORMAT_VERSION, "v1") + .set(TableProperties.DELETE_PARQUET_FORMAT_VERSION, "v2") + .overwrite() + .rowSchema(SCHEMA) + .withSpec(PartitionSpec.unpartitioned()) + .equalityFieldIds(1) + .buildEqualityWriter(); + + try (EqualityDeleteWriter writer = deleteWriter) { + writer.write(records); + } + + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV2.class); + } + + @Test + void testFormatVersionOverridesPageVersionProperty() throws IOException { + OutputFile outputFile = newOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .set(TableProperties.PARQUET_PAGE_VERSION, "v1") + .set(TableProperties.PARQUET_FORMAT_VERSION, "v2") + .createWriterFunc(GenericParquetWriter::create) + .build()) { + writer.addAll(records); + } + + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV2.class); + } + + @Test + void testFormatVersionPropertyAfterWriterVersionSetsVersion() throws IOException { + OutputFile outputFile = newOutputFile(); + + try (FileAppender writer = + Parquet.write(outputFile) + .schema(SCHEMA) + .writerVersion(WriterVersion.PARQUET_1_0) + .set(TableProperties.PARQUET_FORMAT_VERSION, "v2") + .createWriterFunc(GenericParquetWriter::create) + .build()) { + writer.addAll(records); + } + + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV2.class); + } + + @Test + void testInvalidNumericFormatVersionFails() { + OutputFile outputFile = newOutputFile(); + + assertThatThrownBy( + () -> + Parquet.write(outputFile) + .schema(SCHEMA) + .set(TableProperties.PARQUET_FORMAT_VERSION, "2.0") + .createWriterFunc(GenericParquetWriter::create) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported Parquet format version: 2.0 (must be v1 or v2)"); + } + + @Test + void testInvalidFormatVersionFails() { + OutputFile outputFile = newOutputFile(); + + assertThatThrownBy( + () -> + Parquet.write(outputFile) + .schema(SCHEMA) + .set(TableProperties.PARQUET_FORMAT_VERSION, "3") + .createWriterFunc(GenericParquetWriter::create) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported Parquet format version: 3 (must be v1 or v2)"); + } + + @Test + void testInvalidDeleteFormatVersionFails() { + OutputFile outputFile = newOutputFile(); + + assertThatThrownBy( + () -> + Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .set(TableProperties.DELETE_PARQUET_FORMAT_VERSION, "3") + .overwrite() + .rowSchema(SCHEMA) + .withSpec(PartitionSpec.unpartitioned()) + .equalityFieldIds(1) + .buildEqualityWriter()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported Parquet format version: 3 (must be v1 or v2)"); + } + + private OutputFile newOutputFile() { + try { + return Files.localOutput(createTempFile(temp)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private DataPage firstDataPage(OutputFile outputFile) throws IOException { + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetIO.file(outputFile.toInputFile()))) { + PageReadStore rowGroup = reader.readNextRowGroup(); + assertThat(rowGroup).isNotNull(); + + DataPage dataPage = + rowGroup + .getPageReader( + reader.getFileMetaData().getSchema().getColumnDescription(new String[] {"id"})) + .readPage(); + assertThat(dataPage).isNotNull(); + return dataPage; + } + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPageVersion.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPageVersion.java index 3be1dce4d9ea..8df5d19e2d0e 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPageVersion.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPageVersion.java @@ -180,7 +180,7 @@ void testExplicitWriterVersion1OverridesPageVersionProperty() throws IOException } @Test - void testPageVersionPropertyAfterWriterVersionSetsVersion() throws IOException { + void testPageVersionDoesNotOverrideWriterVersionFormatVersion() throws IOException { OutputFile outputFile = newOutputFile(); try (FileAppender writer = @@ -193,7 +193,22 @@ void testPageVersionPropertyAfterWriterVersionSetsVersion() throws IOException { writer.addAll(records); } - assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV2.class); + assertThat(firstDataPage(outputFile)).isInstanceOf(DataPageV1.class); + } + + @Test + void testInvalidNumericPageVersionFails() throws IOException { + OutputFile outputFile = newOutputFile(); + + assertThatThrownBy( + () -> + Parquet.write(outputFile) + .schema(SCHEMA) + .set(TableProperties.PARQUET_PAGE_VERSION, "2.0") + .createWriterFunc(GenericParquetWriter::create) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported Parquet page version: 2.0 (must be v1 or v2)"); } @Test diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 3d3287e471e4..66a9c0e8b906 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -117,4 +117,6 @@ private SparkSQLProperties() {} // defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions). public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM = "spark.sql.iceberg.read.adaptive-split-size.parallelism"; + + public static final String PARQUET_FORMAT_VERSION = "spark.sql.iceberg.parquet-format-version"; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 9da48ae51e5c..450a26d4590b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -29,10 +29,12 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; import java.util.Locale; @@ -525,6 +527,7 @@ private Map dataWriteProperties() { if (parquetCompressionLevel != null) { writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); } + writeProperties.put(PARQUET_FORMAT_VERSION, parquetFormatVersion()); break; case AVRO: @@ -558,6 +561,7 @@ private Map deleteWriteProperties() { if (deleteParquetCompressionLevel != null) { writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); } + writeProperties.put(DELETE_PARQUET_FORMAT_VERSION, deleteParquetFormatVersion()); break; case AVRO: @@ -745,4 +749,24 @@ public DeleteGranularity deleteGranularity() { .defaultValue(DeleteGranularity.FILE) .parse(); } + + private String parquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } + + private String deleteParquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.DELETE_PARQUET_FORMAT_VERSION) + .defaultValue(parquetFormatVersion()) + .parse(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 1be02feaf0c0..e13662f8f848 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -86,4 +86,6 @@ private SparkWriteOptions() {} // Overrides the delete granularity public static final String DELETE_GRANULARITY = "delete-granularity"; + + public static final String PARQUET_FORMAT_VERSION = "parquet-format-version"; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 89daf195ca73..5aa846bd5b76 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -29,11 +29,13 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_HASH; @@ -342,6 +344,10 @@ public void testSparkConfOverride() { ImmutableMap.of( DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -415,6 +421,10 @@ public void testDataPropsDefaultsAsDeleteProps() { ImmutableMap.of( DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -486,6 +496,10 @@ public void testDeleteFileWriteConf() { ImmutableMap.of( DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index b0ac0c055bbe..17e56627146a 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -128,4 +128,6 @@ private SparkSQLProperties() {} // This determines how many rows are buffered before inferring shredded schema public static final String VARIANT_INFERENCE_BUFFER_SIZE = "spark.sql.iceberg.variant-inference-buffer-size"; + + public static final String PARQUET_FORMAT_VERSION = "spark.sql.iceberg.parquet-format-version"; } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index add12e6040b0..90c82314122a 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -29,10 +29,12 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; @@ -539,6 +541,7 @@ private Map dataWriteProperties() { writeProperties.put( PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(variantInferenceBufferSize())); } + writeProperties.put(PARQUET_FORMAT_VERSION, parquetFormatVersion()); break; case AVRO: @@ -572,6 +575,7 @@ private Map deleteWriteProperties() { if (deleteParquetCompressionLevel != null) { writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); } + writeProperties.put(DELETE_PARQUET_FORMAT_VERSION, deleteParquetFormatVersion()); break; case AVRO: @@ -779,4 +783,24 @@ public int variantInferenceBufferSize() { .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) .parse(); } + + private String parquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } + + private String deleteParquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.DELETE_PARQUET_FORMAT_VERSION) + .defaultValue(parquetFormatVersion()) + .parse(); + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 6c76b5c873c5..dd81afdbd7b3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -92,4 +92,6 @@ private SparkWriteOptions() {} // Controls the buffer size for variant schema inference during writes public static final String VARIANT_INFERENCE_BUFFER_SIZE = "variant-inference-buffer-size"; + + public static final String PARQUET_FORMAT_VERSION = "parquet-format-version"; } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index c5cfbe62b1be..25177c268b5f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -29,11 +29,13 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; @@ -346,6 +348,10 @@ public void testSparkConfOverride() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -469,6 +475,10 @@ public void testDataPropsDefaultsAsDeleteProps() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -542,6 +552,10 @@ public void testDeleteFileWriteConf() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index ddedc36c7126..e0b88f263b5c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -131,4 +131,6 @@ private SparkSQLProperties() {} // This determines how many rows are buffered before inferring shredded schema public static final String VARIANT_INFERENCE_BUFFER_SIZE = "spark.sql.iceberg.variant-inference-buffer-size"; + + public static final String PARQUET_FORMAT_VERSION = "spark.sql.iceberg.parquet-format-version"; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 80f93427805a..64849a6d68c4 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -29,10 +29,12 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE; import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE; @@ -514,6 +516,7 @@ private Map dataWriteProperties() { writeProperties.put( PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(variantInferenceBufferSize())); } + writeProperties.put(PARQUET_FORMAT_VERSION, parquetFormatVersion()); break; case AVRO: @@ -547,6 +550,7 @@ private Map deleteWriteProperties() { if (deleteParquetCompressionLevel != null) { writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel); } + writeProperties.put(DELETE_PARQUET_FORMAT_VERSION, deleteParquetFormatVersion()); break; case AVRO: @@ -754,4 +758,24 @@ public int variantInferenceBufferSize() { .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT) .parse(); } + + private String parquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.PARQUET_FORMAT_VERSION) + .defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT) + .parse(); + } + + private String deleteParquetFormatVersion() { + return confParser + .stringConf() + .option(SparkWriteOptions.PARQUET_FORMAT_VERSION) + .sessionConf(SparkSQLProperties.PARQUET_FORMAT_VERSION) + .tableProperty(TableProperties.DELETE_PARQUET_FORMAT_VERSION) + .defaultValue(parquetFormatVersion()) + .parse(); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 621db891d46c..47837e867990 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -92,4 +92,6 @@ private SparkWriteOptions() {} // Controls the buffer size for variant schema inference during writes public static final String VARIANT_INFERENCE_BUFFER_SIZE = "variant-inference-buffer-size"; + + public static final String PARQUET_FORMAT_VERSION = "parquet-format-version"; } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 336067c31235..548069f29b02 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -29,11 +29,13 @@ import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; +import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION; import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS; import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; @@ -350,6 +352,10 @@ public void testSparkConfOverride() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -474,6 +480,10 @@ public void testDataPropsDefaultsAsDeleteProps() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -547,6 +557,10 @@ public void testDeleteFileWriteConf() { "false", DELETE_PARQUET_COMPRESSION, "zstd", + PARQUET_FORMAT_VERSION, + "v1", + DELETE_PARQUET_FORMAT_VERSION, + "v1", PARQUET_COMPRESSION, "zstd", PARQUET_COMPRESSION_LEVEL, @@ -782,4 +796,29 @@ public void testWritePropertiesIncludeVariantShredding() { assertThat(writeProperties).containsEntry(PARQUET_SHRED_VARIANTS, "true"); assertThat(writeProperties).containsEntry(TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "200"); } + + @TestTemplate + public void testWritePropertiesIncludeParquetFormatVersion() { + Table table = validationCatalog.loadTable(tableIdent); + table.updateProperties().set(TableProperties.PARQUET_FORMAT_VERSION, "v2").commit(); + + SparkWriteConf writeConf = new SparkWriteConf(spark, table); + Map writeProperties = writeConf.writeProperties(); + assertThat(writeProperties).containsEntry(PARQUET_FORMAT_VERSION, "v2"); + assertThat(writeProperties).containsEntry(TableProperties.DELETE_PARQUET_FORMAT_VERSION, "v2"); + } + + @TestTemplate + public void testParquetFormatVersionWriteOptionOverridesTableProperty() { + Table table = validationCatalog.loadTable(tableIdent); + table.updateProperties().set(TableProperties.PARQUET_FORMAT_VERSION, "v1").commit(); + + SparkWriteConf writeConf = + new SparkWriteConf( + spark, + table, + new CaseInsensitiveStringMap( + ImmutableMap.of(SparkWriteOptions.PARQUET_FORMAT_VERSION, "v2"))); + assertThat(writeConf.writeProperties()).containsEntry(PARQUET_FORMAT_VERSION, "v2"); + } }