Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ private TableProperties() {}
"write.delete.parquet.page-size-bytes";
public static final int PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1 MB

public static final String PARQUET_FORMAT_VERSION = "write.parquet.format-version";
public static final String DELETE_PARQUET_FORMAT_VERSION = "write.delete.parquet.format-version";
public static final String PARQUET_FORMAT_VERSION_DEFAULT = "v1";

public static final String PARQUET_PAGE_VERSION = "write.parquet.page-version";
public static final String DELETE_PARQUET_PAGE_VERSION = "write.delete.parquet.page-version";
public static final String PARQUET_PAGE_VERSION_DEFAULT = "v1";
Expand Down
5 changes: 4 additions & 1 deletion docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
| write.parquet.row-group-size-track-uncompressed | false | Track raw data size to enforce the row group size target accurately with compressing codecs |
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) |
| write.parquet.format-version | v1 | Parquet writer format version: v1 (PARQUET_1_0) or v2 (PARQUET_2_0) |
| write.delete.parquet.format-version | data file format version | Parquet writer format version for delete files: v1 or v2 |
| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) |
| write.delete.parquet.page-version | data file page version | Parquet data page version for delete files: v1 (DataPage V1) or v2 (DataPage V2) |
| write.parquet.page-row-limit | 20000 | Parquet page row limit |
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
| uid-suffix | As per table property | Overrides the uid suffix used in the underlying IcebergSink for this table |
| parquet-format-version | Table write.parquet.format-version | Overrides this table's Parquet writer format version for this write (v1 or v2) |
| shred-variants | Table write.parquet.shred-variants | Overrides this table's shred variants for this write |
| variant-inference-buffer-size | Table write.parquet.variant-inference-buffer-size | Overrides this table's variant inference buffer size for this write |

Expand Down
2 changes: 2 additions & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ val spark = SparkSession.builder()
| spark.sql.iceberg.compression-codec | Table default | Write compression codec (e.g., `zstd`, `snappy`) |
| spark.sql.iceberg.compression-level | Table default | Compression level for Parquet/Avro |
| spark.sql.iceberg.compression-strategy | Table default | Compression strategy for ORC |
| spark.sql.iceberg.parquet-format-version | Table default | Parquet writer format version for writes (v1 or v2) |
| spark.sql.iceberg.data-planning-mode | AUTO | Scan planning mode for data files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
| spark.sql.iceberg.delete-planning-mode | AUTO | Scan planning mode for delete files (`AUTO`, `LOCAL`, `DISTRIBUTED`) |
| spark.sql.iceberg.advisory-partition-size | Table default | Advisory size (bytes) used for writing to the Table when Spark's Adaptive Query Execution is enabled. Used to size output files |
Expand Down Expand Up @@ -266,6 +267,7 @@ df.writeTo("catalog.db.table")
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
| distribution-mode | See [Spark Writes](spark-writes.md#writing-distribution-modes) for defaults | Override this table's distribution mode for this write |
| delete-granularity | file | Override this table's delete granularity for this write |
| parquet-format-version | Table write.parquet.format-version | Overrides this table's Parquet writer format version for this write (v1 or v2) |
| shred-variants | false | Overrides this table's write.parquet.shred-variants for this write |
| variant-inference-buffer-size | 100 | Overrides this table's write.parquet.variant-inference-buffer-size for this write |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,13 @@ public Duration tableRefreshInterval() {
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
.parseOptional();
}

public String parquetFormatVersion() {
return confParser
.stringConf()
.option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key())
.tableProperty(TableProperties.PARQUET_FORMAT_VERSION)
.defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ private FlinkWriteOptions() {}
// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");

public static final ConfigOption<String> PARQUET_FORMAT_VERSION =
ConfigOptions.key("parquet-format-version").stringType().noDefaultValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,6 +129,8 @@ public static Map<String, String> writeProperties(
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion());

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,13 @@ public Duration tableRefreshInterval() {
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
.parseOptional();
}

public String parquetFormatVersion() {
return confParser
.stringConf()
.option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key())
.tableProperty(TableProperties.PARQUET_FORMAT_VERSION)
.defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ private FlinkWriteOptions() {}
// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");

public static final ConfigOption<String> PARQUET_FORMAT_VERSION =
ConfigOptions.key("parquet-format-version").stringType().noDefaultValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,6 +129,8 @@ public static Map<String, String> writeProperties(
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion());

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,13 @@ public int parquetVariantInferenceBufferSize() {
.defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT)
.parse();
}

public String parquetFormatVersion() {
return confParser
.stringConf()
.option(FlinkWriteOptions.PARQUET_FORMAT_VERSION.key())
.tableProperty(TableProperties.PARQUET_FORMAT_VERSION)
.defaultValue(TableProperties.PARQUET_FORMAT_VERSION_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,7 @@ private FlinkWriteOptions() {}

public static final ConfigOption<Integer> VARIANT_INFERENCE_BUFFER_SIZE =
ConfigOptions.key("variant-inference-buffer-size").intType().noDefaultValue();

public static final ConfigOption<String> PARQUET_FORMAT_VERSION =
ConfigOptions.key("parquet-format-version").stringType().noDefaultValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS;
import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE;

Expand Down Expand Up @@ -133,6 +134,7 @@ public static Map<String, String> writeProperties(
writeProperties.put(PARQUET_SHRED_VARIANTS, String.valueOf(conf.parquetShredVariants()));
writeProperties.put(
PARQUET_VARIANT_BUFFER_SIZE, String.valueOf(conf.parquetVariantInferenceBufferSize()));
writeProperties.put(PARQUET_FORMAT_VERSION, conf.parquetFormatVersion());

break;
case AVRO:
Expand Down
66 changes: 53 additions & 13 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_VERSION;
Expand All @@ -39,12 +40,13 @@
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.PARQUET_FORMAT_VERSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_VERSION;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_VERSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
Expand Down Expand Up @@ -274,10 +276,14 @@ public WriteBuilder writerVersion(WriterVersion version) {
version == WriterVersion.PARQUET_1_0 || version == WriterVersion.PARQUET_2_0,
"Unsupported writer version: %s",
version);
config.put(PARQUET_PAGE_VERSION, version.name());
config.put(PARQUET_FORMAT_VERSION, writerVersionToFormatVersion(version));
return this;
}

private static String writerVersionToFormatVersion(WriterVersion version) {
return version == WriterVersion.PARQUET_2_0 ? "v2" : "v1";
}

public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
this.fileEncryptionKey = encryptionKey;
return this;
Expand Down Expand Up @@ -572,9 +578,7 @@ static Context dataContext(Map<String, String> config) {
config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT);
Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0");

WriterVersion writerVersion =
toWriterVersion(
config.getOrDefault(PARQUET_PAGE_VERSION, PARQUET_PAGE_VERSION_DEFAULT));
WriterVersion writerVersion = resolveWriterVersion(config);

String codecAsString =
config.getOrDefault(PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT);
Expand Down Expand Up @@ -671,11 +675,7 @@ static Context deleteContext(Map<String, String> config) {
config, DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize());
Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page size must be > 0");

String deletePageVersion = config.get(DELETE_PARQUET_PAGE_VERSION);
WriterVersion writerVersion =
deletePageVersion != null
? toWriterVersion(deletePageVersion)
: dataContext.writerVersion();
WriterVersion writerVersion = resolveDeleteWriterVersion(config, dataContext);

String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
CompressionCodecName codec =
Expand Down Expand Up @@ -733,15 +733,55 @@ private static CompressionCodecName toCodec(String codecAsString) {
}
}

private static WriterVersion toWriterVersion(String pageVersion) {
private static WriterVersion resolveWriterVersion(Map<String, String> config) {
String formatVersion = config.get(PARQUET_FORMAT_VERSION);
if (formatVersion != null) {
return toWriterVersion(formatVersion, "format version");
}

String pageVersion = config.get(PARQUET_PAGE_VERSION);
if (pageVersion != null) {
return toWriterVersion(pageVersion, "page version");
}

return toWriterVersion(PARQUET_FORMAT_VERSION_DEFAULT, "format version");
}

private static WriterVersion resolveDeleteWriterVersion(
Map<String, String> config, Context dataContext) {
String deleteFormatVersion = config.get(DELETE_PARQUET_FORMAT_VERSION);
if (deleteFormatVersion != null) {
return toWriterVersion(deleteFormatVersion, "format version");
}

String deletePageVersion = config.get(DELETE_PARQUET_PAGE_VERSION);
if (deletePageVersion != null) {
return toWriterVersion(deletePageVersion, "page version");
}

return dataContext.writerVersion();
}

private static WriterVersion toWriterVersion(String version, String versionType) {
try {
return WriterVersion.fromString(pageVersion);
return WriterVersion.fromString(normalizeWriterVersion(version));
} catch (IllegalArgumentException e) {
String validValues = "v1 or v2";
throw new IllegalArgumentException(
"Unsupported Parquet page version: " + pageVersion + " (must be v1 or v2)");
"Unsupported Parquet "
+ versionType
+ ": "
+ version
+ " (must be "
+ validValues
+ ")");
}
}

private static String normalizeWriterVersion(String version) {
return version;
}

int rowGroupSize() {
return rowGroupSize;
}
Expand Down
Loading