From 3feeb52a3568ffe65cb7e3aacf8607584f18066b Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Sun, 15 Feb 2026 17:26:33 +0100 Subject: [PATCH 1/4] Core, Data, Flink: Moving Flink to use the new FormatModel API --- .../iceberg/formats/FormatModelRegistry.java | 3 +- .../data/RegistryBasedFileWriterFactory.java | 182 ++++++++++++++++++ .../iceberg/flink/data/FlinkFormatModels.java | 58 ++++++ .../flink/data/FlinkSchemaVisitor.java | 39 ++-- .../flink/sink/FlinkFileWriterFactory.java | 98 ++-------- .../source/RowDataFileScanTaskReader.java | 112 ++--------- .../flink/sink/TestCompressionSettings.java | 10 +- .../flink/sink/TestIcebergStreamWriter.java | 93 ++++++++- .../flink/sink/dynamic/TestDynamicWriter.java | 6 +- 9 files changed, 398 insertions(+), 203 deletions(-) create mode 100644 data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index e86dd9f97aa1..7e944510a85e 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -57,7 +57,8 @@ private FormatModelRegistry() {} private static final List CLASSES_TO_REGISTER = ImmutableList.of( "org.apache.iceberg.data.GenericFormatModels", - "org.apache.iceberg.arrow.vectorized.ArrowFormatModels"); + "org.apache.iceberg.arrow.vectorized.ArrowFormatModels", + "org.apache.iceberg.flink.data.FlinkFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = diff --git a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java new file mode 100644 index 000000000000..3d0e2e8fb030 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * A base writer factory to be extended by query engine integrations. + * + * @param row type + */ +public abstract class RegistryBasedFileWriterFactory + implements FileWriterFactory, Serializable { + private final Table table; + private final FileFormat dataFileFormat; + private final Class inputType; + private final Schema dataSchema; + private final SortOrder dataSortOrder; + private final FileFormat deleteFileFormat; + private final int[] equalityFieldIds; + private final Schema equalityDeleteRowSchema; + private final SortOrder equalityDeleteSortOrder; + private final Map writerProperties; + private final S inputSchema; + private final S equalityDeleteInputSchema; + + protected RegistryBasedFileWriterFactory( + Table table, + FileFormat dataFileFormat, + Class inputType, + Schema dataSchema, + SortOrder dataSortOrder, + FileFormat deleteFileFormat, + int[] equalityFieldIds, + Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, + Map writerProperties, + S inputSchema, + S equalityDeleteInputSchema) { + this.table = table; + this.dataFileFormat = dataFileFormat; + this.inputType = inputType; + this.dataSchema = dataSchema; + this.dataSortOrder = dataSortOrder; + this.deleteFileFormat = deleteFileFormat; + this.equalityFieldIds = equalityFieldIds; + this.equalityDeleteRowSchema = equalityDeleteRowSchema; + this.equalityDeleteSortOrder = equalityDeleteSortOrder; + this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of(); + this.inputSchema = inputSchema; + this.equalityDeleteInputSchema = equalityDeleteInputSchema; + } + + protected S inputSchema() { + return inputSchema; + } + + protected S equalityDeleteInputSchema() { + return equalityDeleteInputSchema; + } + + @Override + public DataWriter newDataWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkNotNull(dataSchema, "Data schema must not be null"); + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.dataWriteBuilder(dataFileFormat, inputType, file); + return builder + .schema(dataSchema) + .engineSchema(inputSchema()) + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(dataSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new data writer", e); + } + } + + @Override + public EqualityDeleteWriter newEqualityDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + Preconditions.checkNotNull(equalityDeleteRowSchema, "Equality delete schema must not be null"); + + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.getDefault(); + + try { + FileWriterBuilder, S> builder = + FormatModelRegistry.equalityDeleteWriteBuilder(deleteFileFormat, inputType, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .schema(equalityDeleteRowSchema) + .engineSchema(equalityDeleteInputSchema()) + .equalityFieldIds(equalityFieldIds) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .sortOrder(equalityDeleteSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table != null ? table.properties() : ImmutableMap.of(); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.forPositionDelete(); + + try { + FileWriterBuilder, ?> builder = + FormatModelRegistry.positionDeleteWriteBuilder(deleteFileFormat, file); + return builder + .setAll(properties) + .setAll(writerProperties) + .metricsConfig(metricsConfig) + .spec(spec) + .partition(partition) + .keyMetadata(keyMetadata) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java new file mode 100644 index 000000000000..0026c8a3021d --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.avro.AvroFormatModel; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.orc.ORCFormatModel; +import org.apache.iceberg.parquet.ParquetFormatModel; + +public class FlinkFormatModels { + public static void register() { + FormatModelRegistry.register( + ParquetFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> + FlinkParquetWriters.buildWriter(engineSchema, fileSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + + FormatModelRegistry.register( + AvroFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> new FlinkAvroWriter(engineSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + FlinkPlannedAvroReader.create(icebergSchema, idToConstant))); + + FormatModelRegistry.register( + ORCFormatModel.create( + RowData.class, + RowType.class, + (icebergSchema, fileSchema, engineSchema) -> + FlinkOrcWriter.buildWriter(engineSchema, icebergSchema), + (icebergSchema, fileSchema, engineSchema, idToConstant) -> + new FlinkOrcReader(icebergSchema, fileSchema, idToConstant))); + } + + private FlinkFormatModels() {} +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java index 1440fde3248c..a76bac515b3d 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; @@ -29,9 +30,10 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +@Internal abstract class FlinkSchemaVisitor { - static T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor visitor) { + public static T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor visitor) { return visit(flinkType, schema.asStruct(), visitor); } @@ -94,24 +96,29 @@ private static T visitRecord( List fieldTypes = Lists.newArrayListWithExpectedSize(fieldSize); List nestedFields = struct.fields(); - for (int i = 0; i < fieldSize; i++) { - Types.NestedField iField = nestedFields.get(i); - int fieldIndex = rowType.getFieldIndex(iField.name()); - Preconditions.checkArgument( - fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType); + visitor.beforeStruct(struct.asStructType()); - LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); + try { + for (int i = 0; i < fieldSize; i++) { + Types.NestedField iField = nestedFields.get(i); + int fieldIndex = rowType.getFieldIndex(iField.name()); + Preconditions.checkArgument( + fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType); - fieldTypes.add(fieldFlinkType); + LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); + fieldTypes.add(fieldFlinkType); - visitor.beforeField(iField); - try { - if (iField.type() != Types.UnknownType.get()) { - results.add(visit(fieldFlinkType, iField.type(), visitor)); + visitor.beforeField(iField); + try { + if (iField.type() != Types.UnknownType.get()) { + results.add(visit(fieldFlinkType, iField.type(), visitor)); + } + } finally { + visitor.afterField(iField); } - } finally { - visitor.afterField(iField); } + } finally { + visitor.afterStruct(struct.asStructType()); } return visitor.record(struct, results, fieldTypes); @@ -137,6 +144,10 @@ public void beforeField(Types.NestedField field) {} public void afterField(Types.NestedField field) {} + public void beforeStruct(Types.StructType type) {} + + public void afterStruct(Types.StructType type) {} + public void beforeListElement(Types.NestedField elementField) { beforeField(elementField); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java index b3ada41737bc..d5247941d863 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java @@ -25,28 +25,19 @@ import java.io.Serializable; import java.util.Map; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.data.FlinkAvroWriter; -import org.apache.iceberg.flink.data.FlinkOrcWriter; -import org.apache.iceberg.flink.data.FlinkParquetWriters; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -public class FlinkFileWriterFactory extends BaseFileWriterFactory implements Serializable { - private RowType dataFlinkType; - private RowType equalityDeleteFlinkType; - - private FlinkFileWriterFactory( +public class FlinkFileWriterFactory extends RegistryBasedFileWriterFactory + implements Serializable { + FlinkFileWriterFactory( Table table, FileFormat dataFileFormat, Schema dataSchema, @@ -62,85 +53,30 @@ private FlinkFileWriterFactory( super( table, dataFileFormat, + RowData.class, dataSchema, dataSortOrder, deleteFileFormat, equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - writeProperties); - - this.dataFlinkType = dataFlinkType; - this.equalityDeleteFlinkType = equalityDeleteFlinkType; - } - - static Builder builderFor(Table table) { - return new Builder(table); - } - - @Override - protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType())); - } - - @Override - protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteFlinkType())); - } - - @Override - protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {} - - @Override - protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(dataFlinkType(), msgType)); - } - - @Override - protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { - builder.createWriterFunc( - msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), msgType)); - } - - @Override - protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { - builder.transformPaths(path -> StringData.fromString(path.toString())); - } - - @Override - protected void configureDataWrite(ORC.DataWriteBuilder builder) { - builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema)); - } - - @Override - protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { - builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema)); + writeProperties, + dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : dataFlinkType, + equalityDeleteInputSchema(equalityDeleteFlinkType, equalityDeleteRowSchema)); } - @Override - protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { - builder.transformPaths(path -> StringData.fromString(path.toString())); - } - - private RowType dataFlinkType() { - if (dataFlinkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema()); + private static RowType equalityDeleteInputSchema(RowType rowType, Schema rowSchema) { + if (rowType != null) { + return rowType; + } else if (rowSchema != null) { + return FlinkSchemaUtil.convert(rowSchema); + } else { + return null; } - - return dataFlinkType; } - private RowType equalityDeleteFlinkType() { - if (equalityDeleteFlinkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteFlinkType = FlinkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteFlinkType; + static Builder builderFor(Table table) { + return new Builder(table); } public static class Builder { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index b8fb1ba32edf..ee4aaf4a3da1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -24,10 +24,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.expressions.Expression; @@ -35,19 +33,14 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkSourceFilter; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkOrcReader; -import org.apache.iceberg.flink.data.FlinkParquetReaders; -import org.apache.iceberg.flink.data.FlinkPlannedAvroReader; import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.flink.data.RowDataUtil; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; @Internal @@ -73,8 +66,7 @@ public RowDataFileScanTaskReader( if (filters != null && !filters.isEmpty()) { Expression combinedExpression = filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and); - this.rowFilter = - new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive); + this.rowFilter = new FlinkSourceFilter(projectedSchema, combinedExpression, caseSensitive); } else { this.rowFilter = null; } @@ -112,23 +104,23 @@ private CloseableIterable newIterable( if (task.isDataTask()) { throw new UnsupportedOperationException("Cannot read data task."); } else { - switch (task.file().format()) { - case PARQUET: - iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case AVRO: - iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - case ORC: - iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor); - break; - - default: - throw new UnsupportedOperationException( - "Cannot read unknown format: " + task.file().format()); + ReadBuilder builder = + FormatModelRegistry.readBuilder( + task.file().format(), RowData.class, inputFilesDecryptor.getInputFile(task)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + iter = + builder + .project(schema) + .idToConstant(idToConstant) + .split(task.start(), task.length()) + .caseSensitive(caseSensitive) + .filter(task.residual()) + .reuseContainers() + .build(); } if (rowFilter != null) { @@ -137,72 +129,6 @@ private CloseableIterable newIterable( return iter; } - private CloseableIterable newAvroIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Avro.ReadBuilder builder = - Avro.read(inputFilesDecryptor.getInputFile(task)) - .reuseContainers() - .project(schema) - .split(task.start(), task.length()) - .createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant)); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - - private CloseableIterable newParquetIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Parquet.ReadBuilder builder = - Parquet.read(inputFilesDecryptor.getInputFile(task)) - .split(task.start(), task.length()) - .project(schema) - .createReaderFunc( - fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .reuseContainers(); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - - private CloseableIterable newOrcIterable( - FileScanTask task, - Schema schema, - Map idToConstant, - InputFilesDecryptor inputFilesDecryptor) { - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - ORC.ReadBuilder builder = - ORC.read(inputFilesDecryptor.getInputFile(task)) - .project(readSchemaWithoutConstantAndMetadataFields) - .split(task.start(), task.length()) - .createReaderFunc( - readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant)) - .filter(task.residual()) - .caseSensitive(caseSensitive); - - if (nameMapping != null) { - builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - return builder.build(); - } - private static class FlinkDeleteFilter extends DeleteFilter { private final RowType requiredRowType; private final RowDataWrapper asStructLike; diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index da5b5f6c28f0..339cd0510efb 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -35,7 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynFields; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkWriteConf; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.SimpleDataUtil; @@ -238,21 +238,21 @@ private static Map appenderProperties( testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1); testHarness.prepareSnapshotPreBarrier(1L); - DynFields.BoundField operatorField = + DynFields.BoundField> operatorField = DynFields.builder() .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator") .build(testHarness.getOperatorFactory()); - DynFields.BoundField writerField = + DynFields.BoundField> writerField = DynFields.builder() .hiddenImpl(IcebergStreamWriter.class, "writer") .build(operatorField.get()); - DynFields.BoundField writerFactoryField = + DynFields.BoundField> writerFactoryField = DynFields.builder() .hiddenImpl(BaseTaskWriter.class, "writerFactory") .build(writerField.get()); DynFields.BoundField> propsField = DynFields.builder() - .hiddenImpl(BaseFileWriterFactory.class, "writerProperties") + .hiddenImpl(RegistryBasedFileWriterFactory.class, "writerProperties") .build(writerFactoryField.get()); return propsField.get(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 7f4f7758e519..9f508bbe717d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -34,6 +34,8 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -319,11 +321,32 @@ public void testTableWithTargetFileSize() throws Exception { public void testPromotedFlinkDataType() throws Exception { Schema iSchema = new Schema( + Types.NestedField.required( + 4, "array", Types.ListType.ofOptional(5, Types.IntegerType.get())), + Types.NestedField.required( + 6, + "map", + Types.MapType.ofOptional(7, 8, Types.IntegerType.get(), Types.IntegerType.get())), + Types.NestedField.required( + 9, + "struct", + Types.StructType.of( + Types.NestedField.optional(10, "struct_1", Types.IntegerType.get()), + Types.NestedField.optional(11, "struct_2", Types.IntegerType.get()))), Types.NestedField.required(1, "tinyint", Types.IntegerType.get()), Types.NestedField.required(2, "smallint", Types.IntegerType.get()), Types.NestedField.optional(3, "int", Types.IntegerType.get())); ResolvedSchema flinkSchema = ResolvedSchema.of( + Column.physical("array", DataTypes.ARRAY(DataTypes.TINYINT()).notNull()), + Column.physical( + "map", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.TINYINT()).notNull()), + Column.physical( + "struct", + DataTypes.ROW( + DataTypes.FIELD("struct_1", DataTypes.TINYINT()), + DataTypes.FIELD("struct_2", DataTypes.TINYINT())) + .notNull()), Column.physical("tinyint", DataTypes.TINYINT().notNull()), Column.physical("smallint", DataTypes.SMALLINT().notNull()), Column.physical("int", DataTypes.INT().nullable())); @@ -347,16 +370,74 @@ public void testPromotedFlinkDataType() throws Exception { List rows = Lists.newArrayList( - GenericRowData.of((byte) 0x01, (short) -32768, 101), - GenericRowData.of((byte) 0x02, (short) 0, 102), - GenericRowData.of((byte) 0x03, (short) 32767, 103)); + GenericRowData.of( + new GenericArrayData(new byte[] {(byte) 0x04, (byte) 0x05}), + new GenericMapData(ImmutableMap.of((byte) 0x06, (byte) 0x07)), + GenericRowData.of((byte) 0x08, (byte) 0x09), + (byte) 0x01, + (short) -32768, + 101), + GenericRowData.of( + new GenericArrayData(new byte[] {(byte) 0x0a, (byte) 0x0b}), + new GenericMapData(ImmutableMap.of((byte) 0x0c, (byte) 0x0d)), + GenericRowData.of((byte) 0x0e, (byte) 0x0f), + (byte) 0x02, + (short) 0, + 102), + GenericRowData.of( + new GenericArrayData(new byte[] {(byte) 0x10, (byte) 0x11}), + new GenericMapData(ImmutableMap.of((byte) 0x12, (byte) 0x13)), + GenericRowData.of((byte) 0x14, (byte) 0x15), + (byte) 0x03, + (short) 32767, + 103)); Record record = GenericRecord.create(iSchema); + Record struct = GenericRecord.create(iSchema.findField("struct").type().asStructType()); List expected = Lists.newArrayList( - record.copy(ImmutableMap.of("tinyint", 1, "smallint", -32768, "int", 101)), - record.copy(ImmutableMap.of("tinyint", 2, "smallint", 0, "int", 102)), - record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103))); + record.copy( + ImmutableMap.of( + "array", + Lists.newArrayList(4, 5), + "map", + ImmutableMap.of(6, 7), + "struct", + struct.copy(ImmutableMap.of("struct_1", 8, "struct_2", 9)), + "tinyint", + 1, + "smallint", + -32768, + "int", + 101)), + record.copy( + ImmutableMap.of( + "array", + Lists.newArrayList(10, 11), + "map", + ImmutableMap.of(12, 13), + "struct", + struct.copy(ImmutableMap.of("struct_1", 14, "struct_2", 15)), + "tinyint", + 2, + "smallint", + 0, + "int", + 102)), + record.copy( + ImmutableMap.of( + "array", + Lists.newArrayList(16, 17), + "map", + ImmutableMap.of(18, 19), + "struct", + struct.copy(ImmutableMap.of("struct_1", 20, "struct_2", 21)), + "tinyint", + 3, + "smallint", + 32767, + "int", + 103))); try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, flinkSchema)) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java index 8e346cd8a145..f604f639f217 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -34,7 +34,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynFields; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase; @@ -365,13 +365,13 @@ private Map properties(DynamicWriter dynamicWriter) { DynFields.BoundField>> writerField = DynFields.builder().hiddenImpl(dynamicWriter.getClass(), "writers").build(dynamicWriter); - DynFields.BoundField writerFactoryField = + DynFields.BoundField> writerFactoryField = DynFields.builder() .hiddenImpl(BaseTaskWriter.class, "writerFactory") .build(writerField.get().values().iterator().next()); DynFields.BoundField> propsField = DynFields.builder() - .hiddenImpl(BaseFileWriterFactory.class, "writerProperties") + .hiddenImpl(RegistryBasedFileWriterFactory.class, "writerProperties") .build(writerFactoryField.get()); return propsField.get(); } From f29fb580184a728e692e1cd9d446ef4e46a4fcaa Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 00:32:57 +0100 Subject: [PATCH 2/4] Let's check if these changes are needed --- .../flink/data/FlinkSchemaVisitor.java | 39 +++----- .../flink/sink/TestIcebergStreamWriter.java | 93 ++----------------- 2 files changed, 20 insertions(+), 112 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java index a76bac515b3d..1440fde3248c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.data; import java.util.List; -import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; @@ -30,10 +29,9 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -@Internal abstract class FlinkSchemaVisitor { - public static T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor visitor) { + static T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor visitor) { return visit(flinkType, schema.asStruct(), visitor); } @@ -96,29 +94,24 @@ private static T visitRecord( List fieldTypes = Lists.newArrayListWithExpectedSize(fieldSize); List nestedFields = struct.fields(); - visitor.beforeStruct(struct.asStructType()); + for (int i = 0; i < fieldSize; i++) { + Types.NestedField iField = nestedFields.get(i); + int fieldIndex = rowType.getFieldIndex(iField.name()); + Preconditions.checkArgument( + fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType); - try { - for (int i = 0; i < fieldSize; i++) { - Types.NestedField iField = nestedFields.get(i); - int fieldIndex = rowType.getFieldIndex(iField.name()); - Preconditions.checkArgument( - fieldIndex >= 0, "NestedField: %s is not found in flink RowType: %s", iField, rowType); + LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); - LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); - fieldTypes.add(fieldFlinkType); + fieldTypes.add(fieldFlinkType); - visitor.beforeField(iField); - try { - if (iField.type() != Types.UnknownType.get()) { - results.add(visit(fieldFlinkType, iField.type(), visitor)); - } - } finally { - visitor.afterField(iField); + visitor.beforeField(iField); + try { + if (iField.type() != Types.UnknownType.get()) { + results.add(visit(fieldFlinkType, iField.type(), visitor)); } + } finally { + visitor.afterField(iField); } - } finally { - visitor.afterStruct(struct.asStructType()); } return visitor.record(struct, results, fieldTypes); @@ -144,10 +137,6 @@ public void beforeField(Types.NestedField field) {} public void afterField(Types.NestedField field) {} - public void beforeStruct(Types.StructType type) {} - - public void afterStruct(Types.StructType type) {} - public void beforeListElement(Types.NestedField elementField) { beforeField(elementField); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 9f508bbe717d..7f4f7758e519 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -34,8 +34,6 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.data.GenericArrayData; -import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -321,32 +319,11 @@ public void testTableWithTargetFileSize() throws Exception { public void testPromotedFlinkDataType() throws Exception { Schema iSchema = new Schema( - Types.NestedField.required( - 4, "array", Types.ListType.ofOptional(5, Types.IntegerType.get())), - Types.NestedField.required( - 6, - "map", - Types.MapType.ofOptional(7, 8, Types.IntegerType.get(), Types.IntegerType.get())), - Types.NestedField.required( - 9, - "struct", - Types.StructType.of( - Types.NestedField.optional(10, "struct_1", Types.IntegerType.get()), - Types.NestedField.optional(11, "struct_2", Types.IntegerType.get()))), Types.NestedField.required(1, "tinyint", Types.IntegerType.get()), Types.NestedField.required(2, "smallint", Types.IntegerType.get()), Types.NestedField.optional(3, "int", Types.IntegerType.get())); ResolvedSchema flinkSchema = ResolvedSchema.of( - Column.physical("array", DataTypes.ARRAY(DataTypes.TINYINT()).notNull()), - Column.physical( - "map", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.TINYINT()).notNull()), - Column.physical( - "struct", - DataTypes.ROW( - DataTypes.FIELD("struct_1", DataTypes.TINYINT()), - DataTypes.FIELD("struct_2", DataTypes.TINYINT())) - .notNull()), Column.physical("tinyint", DataTypes.TINYINT().notNull()), Column.physical("smallint", DataTypes.SMALLINT().notNull()), Column.physical("int", DataTypes.INT().nullable())); @@ -370,74 +347,16 @@ public void testPromotedFlinkDataType() throws Exception { List rows = Lists.newArrayList( - GenericRowData.of( - new GenericArrayData(new byte[] {(byte) 0x04, (byte) 0x05}), - new GenericMapData(ImmutableMap.of((byte) 0x06, (byte) 0x07)), - GenericRowData.of((byte) 0x08, (byte) 0x09), - (byte) 0x01, - (short) -32768, - 101), - GenericRowData.of( - new GenericArrayData(new byte[] {(byte) 0x0a, (byte) 0x0b}), - new GenericMapData(ImmutableMap.of((byte) 0x0c, (byte) 0x0d)), - GenericRowData.of((byte) 0x0e, (byte) 0x0f), - (byte) 0x02, - (short) 0, - 102), - GenericRowData.of( - new GenericArrayData(new byte[] {(byte) 0x10, (byte) 0x11}), - new GenericMapData(ImmutableMap.of((byte) 0x12, (byte) 0x13)), - GenericRowData.of((byte) 0x14, (byte) 0x15), - (byte) 0x03, - (short) 32767, - 103)); + GenericRowData.of((byte) 0x01, (short) -32768, 101), + GenericRowData.of((byte) 0x02, (short) 0, 102), + GenericRowData.of((byte) 0x03, (short) 32767, 103)); Record record = GenericRecord.create(iSchema); - Record struct = GenericRecord.create(iSchema.findField("struct").type().asStructType()); List expected = Lists.newArrayList( - record.copy( - ImmutableMap.of( - "array", - Lists.newArrayList(4, 5), - "map", - ImmutableMap.of(6, 7), - "struct", - struct.copy(ImmutableMap.of("struct_1", 8, "struct_2", 9)), - "tinyint", - 1, - "smallint", - -32768, - "int", - 101)), - record.copy( - ImmutableMap.of( - "array", - Lists.newArrayList(10, 11), - "map", - ImmutableMap.of(12, 13), - "struct", - struct.copy(ImmutableMap.of("struct_1", 14, "struct_2", 15)), - "tinyint", - 2, - "smallint", - 0, - "int", - 102)), - record.copy( - ImmutableMap.of( - "array", - Lists.newArrayList(16, 17), - "map", - ImmutableMap.of(18, 19), - "struct", - struct.copy(ImmutableMap.of("struct_1", 20, "struct_2", 21)), - "tinyint", - 3, - "smallint", - 32767, - "int", - 103))); + record.copy(ImmutableMap.of("tinyint", 1, "smallint", -32768, "int", 101)), + record.copy(ImmutableMap.of("tinyint", 2, "smallint", 0, "int", 102)), + record.copy(ImmutableMap.of("tinyint", 3, "smallint", 32767, "int", 103))); try (OneInputStreamOperatorTestHarness testHarness = createIcebergStreamWriter(icebergTable, flinkSchema)) { From 998f433ff8515b384351c9b9c13e94683b05da5c Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 09:43:23 +0100 Subject: [PATCH 3/4] Eduard's comments --- .../iceberg/data/BaseFileWriterFactory.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 55f3b5701e0b..444c0d0226bd 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -40,7 +40,13 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -/** A base writer factory to be extended by query engine integrations. */ +/** + * A base writer factory to be extended by query engine integrations. + * + * @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link + * RegistryBasedFileWriterFactory} + */ +@Deprecated public abstract class BaseFileWriterFactory implements FileWriterFactory, Serializable { private final Table table; private final FileFormat dataFileFormat; @@ -75,13 +81,6 @@ protected BaseFileWriterFactory( this.positionDeleteRowSchema = null; } - /** - * @deprecated This constructor is deprecated as of version 1.11.0 and will be removed in 1.12.0. - * Position deletes that include row data are no longer supported. Use {@link - * #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, FileFormat, int[], Schema, - * SortOrder, Map)} instead. - */ - @Deprecated protected BaseFileWriterFactory( Table table, FileFormat dataFileFormat, From d0fc9ea45218acd78b009c0487285185c987212c Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Feb 2026 08:17:51 +0100 Subject: [PATCH 4/4] Eduard's second batch of comments --- .../apache/iceberg/data/RegistryBasedFileWriterFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java index 3d0e2e8fb030..868b41f5840b 100644 --- a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java @@ -98,7 +98,7 @@ protected S equalityDeleteInputSchema() { @Override public DataWriter newDataWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - Preconditions.checkNotNull(dataSchema, "Data schema must not be null"); + Preconditions.checkArgument(dataSchema != null, "Invalid data schema: null"); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table != null ? table.properties() : ImmutableMap.of(); MetricsConfig metricsConfig = @@ -127,7 +127,7 @@ public DataWriter newDataWriter( @Override public EqualityDeleteWriter newEqualityDeleteWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - Preconditions.checkNotNull(equalityDeleteRowSchema, "Equality delete schema must not be null"); + Preconditions.checkArgument(equalityDeleteRowSchema != null, "Invalid delete schema: null"); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table != null ? table.properties() : ImmutableMap.of();