diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index fbd21f737450..fec413ca079a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -41,6 +41,7 @@ import org.apache.spark.sql.types.IntegerType$; import org.apache.spark.sql.types.LongType$; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -238,5 +239,6 @@ public Type primitive(Type.PrimitiveType primitive) { .put(TypeID.STRING, ImmutableSet.of(StringType$.class)) .put(TypeID.FIXED, ImmutableSet.of(BinaryType$.class)) .put(TypeID.BINARY, ImmutableSet.of(BinaryType$.class)) + .put(TypeID.UNKNOWN, ImmutableSet.of(NullType$.class)) .buildOrThrow(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java index 8beaefc5cc8f..d4e440129af1 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType; import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructField; @@ -153,8 +154,12 @@ public Type atomic(DataType atomic) { } else if (atomic instanceof DecimalType) { return Types.DecimalType.of( ((DecimalType) atomic).precision(), ((DecimalType) atomic).scale()); + } else if (atomic instanceof BinaryType) { return Types.BinaryType.get(); + + } else if (atomic instanceof NullType) { + return Types.UnknownType.get(); } throw new UnsupportedOperationException("Not a supported type: " + atomic.catalogString()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index dfb9b30be603..d33632bbbd54 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.MapType$; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.NullType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType$; @@ -124,9 +125,11 @@ public DataType primitive(Type.PrimitiveType primitive) { case DECIMAL: Types.DecimalType decimal = (Types.DecimalType) primitive; return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale()); + case UNKNOWN: + return NullType$.MODULE$; default: throw new UnsupportedOperationException( - "Cannot convert unknown type to Spark: " + primitive); + "Cannot convert unsupported type to Spark: " + primitive); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java index d74a76f94e87..2a2eef198b76 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.Type.Repetition; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -181,21 +182,27 @@ private static T visitField( private static List visitFields( StructType struct, GroupType group, ParquetWithSparkSchemaVisitor visitor) { - StructField[] sFields = struct.fields(); - Preconditions.checkArgument( - sFields.length == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.length; i += 1) { - Type field = group.getFields().get(i); - StructField sField = sFields[i]; - Preconditions.checkArgument( - field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())), - "Structs do not match: field %s != %s", - field.getName(), - sField.name()); - results.add(visitField(sField, field, visitor)); + + int fieldIndex = 0; + for (StructField sField : struct.fields()) { + if (sField.dataType() != DataTypes.NullType) { + Type field = group.getFields().get(fieldIndex); + Preconditions.checkArgument( + field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())), + "Structs do not match: field %s != %s", + field.getName(), + sField.name()); + results.add(visitField(sField, field, visitor)); + + fieldIndex += 1; + } } + // All the group fields should have been visited + Preconditions.checkArgument( + fieldIndex == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); + return results; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 6b799e677bf4..6fc8849c82b2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.iceberg.FieldMetrics; @@ -77,7 +79,7 @@ public OrcValueWriter record( TypeDescription record, List names, List> fields) { - return new InternalRowWriter(fields, record.getChildren()); + return new InternalRowWriter(fields, iStruct, record.getChildren()); } @Override @@ -133,12 +135,16 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio private static class InternalRowWriter extends GenericOrcWriters.StructWriter { private final List> fieldGetters; - InternalRowWriter(List> writers, List orcTypes) { - super(writers); + InternalRowWriter( + List> writers, Types.StructType iStruct, List orcTypes) { + super(iStruct, writers); this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); - for (TypeDescription orcType : orcTypes) { - fieldGetters.add(createFieldGetter(orcType)); + Map idToType = + orcTypes.stream().collect(Collectors.toMap(ORCSchemaUtil::fieldId, s -> s)); + + for (Types.NestedField iField : iStruct.fields()) { + fieldGetters.add(createFieldGetter(idToType.get(iField.fieldId()))); } } @@ -149,6 +155,11 @@ protected Object get(InternalRow struct, int index) { } static FieldGetter createFieldGetter(TypeDescription fieldType) { + // In the case of an UnknownType + if (fieldType == null) { + return (row, ordinal) -> null; + } + final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index f4ae6114c8ab..a1ed6c66f337 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -26,6 +26,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.UUID; +import java.util.stream.IntStream; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; @@ -55,6 +56,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType; import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -94,15 +96,18 @@ public ParquetValueWriter message( public ParquetValueWriter struct( StructType sStruct, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); - StructField[] sparkFields = sStruct.fields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); - List sparkTypes = Lists.newArrayList(); for (int i = 0; i < fields.size(); i += 1) { writers.add(newOption(struct.getType(i), fieldWriters.get(i))); - sparkTypes.add(sparkFields[i].dataType()); } - return new InternalRowWriter(writers, sparkTypes); + StructField[] sFields = sStruct.fields(); + DataType[] types = new DataType[sFields.length]; + for (int i = 0; i < sFields.length; i += 1) { + types[i] = sFields[i].dataType(); + } + + return new InternalRowWriter(writers, types); } @Override @@ -566,14 +571,33 @@ public Map.Entry next() { private static class InternalRowWriter extends ParquetValueWriters.StructWriter { private final DataType[] types; - private InternalRowWriter(List> writers, List types) { - super(writers); - this.types = types.toArray(new DataType[0]); + private InternalRowWriter(List> writers, DataType[] types) { + super(writerToFieldIndex(types, writers.size()), writers); + this.types = types; } @Override protected Object get(InternalRow struct, int index) { return struct.get(index, types[index]); } + + /** Returns a mapping from writer index to field index, skipping Unknown columns. */ + private static int[] writerToFieldIndex(DataType[] types, int numWriters) { + if (null == types) { + return IntStream.rangeClosed(0, numWriters).toArray(); + } + + // value writer index to record field index + int[] indexes = new int[numWriters]; + int writerIndex = 0; + for (int pos = 0; pos < types.length; pos += 1) { + if (!(types[pos] instanceof NullType)) { + indexes[writerIndex] = pos; + writerIndex += 1; + } + } + + return indexes; + } } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 8dceb075e604..4f324239881e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -465,6 +465,8 @@ public ColumnVector convert( DeletedColumnVector deletedVector = new DeletedColumnVector(field.type()); deletedVector.setValue(new boolean[batchSize]); fieldVectors.add(deletedVector); + } else if (field.type().equals(Types.UnknownType.get())) { + fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, null)); } else { fieldVectors.add( fieldConverters diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 9b5b207a5b6b..0846cf6f1161 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.MetadataAttribute; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; @@ -79,4 +81,18 @@ public void testSchemaConversionWithMetaDataColumnSchema() { } } } + + @Test + public void testUnknownTypeToSpark() { + Schema schema = new Schema(optional(1, "col", Types.UnknownType.get())); + StructType sparkType = SparkSchemaUtil.convert(schema); + assertThat(sparkType.fields()[0].dataType()).isEqualTo(DataTypes.NullType); + } + + @Test + public void testNullTypeToIceberg() { + StructType sparkType = new StructType().add("col", DataTypes.NullType, true); + Type icebergType = SparkSchemaUtil.convert(sparkType).findField("col").type(); + assertThat(icebergType).isEqualTo(Types.UnknownType.get()); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java index 0db6a65fd394..45053c1a4f1f 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -108,8 +109,8 @@ protected boolean supportsRowLineage() { required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding - required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision - ); + required(117, "dec_38_10", Types.DecimalType.of(38, 10)), // Spark's maximum precision + optional(118, "unk", Types.UnknownType.get())); @TempDir protected Path temp; @@ -120,10 +121,13 @@ public void testSimpleStruct() throws IOException { @Test public void testStructWithRequiredFields() throws IOException { + List supportedPrimitives = + SUPPORTED_PRIMITIVES.fields().stream() + .filter(f -> f.type().typeId() != Type.TypeID.UNKNOWN) + .collect(Collectors.toList()); writeAndValidate( TypeUtil.assignIncreasingFreshIds( - new Schema( - Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired)))); + new Schema(Lists.transform(supportedPrimitives, Types.NestedField::asRequired)))); } @Test @@ -603,4 +607,48 @@ public void testRowLineage() throws Exception { record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), record.copy(Map.of("id", 5L, "data", "e")))); } + + @Test + public void testUnknownNestedLevel() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(1, "id", LongType.get()), + optional( + 2, + "nested", + Types.StructType.of( + required(20, "int", Types.IntegerType.get()), + optional(21, "unk", Types.UnknownType.get())))); + + writeAndValidate(schema); + } + + @Test + public void testUnknownListType() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional(1, "data", ListType.ofOptional(2, Types.UnknownType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testUnknownMapType() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional( + 1, + "data", + MapType.ofOptional(2, 3, Types.StringType.get(), Types.UnknownType.get()))); + + writeAndValidate(schema); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java index 546a44fc77bb..a1f71848b14e 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -106,4 +107,20 @@ private void writeAndValidateRecords(Schema schema, Iterable expect private Iterator batchesToRows(Iterator batches) { return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator)); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 9ae8b8cbe530..993dc868bba8 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -249,4 +249,20 @@ public void testMissingRequiredWithoutDefault() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Missing required field: missing_str"); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java index 8e1f860085c6..3c88db139e47 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -152,4 +153,20 @@ private static void assertEqualsUnsafe( .isFalse(); assertThat(actualIter.hasNext()).as("Actual iterator should not have any extra rows").isFalse(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java index c368c4a815fe..aa8966877021 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java @@ -41,6 +41,7 @@ import org.apache.iceberg.spark.data.AvroDataTestBase; import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -95,14 +96,16 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw HadoopTables tables = new HadoopTables(CONF); // If V3 spec features are used, set the format version to 3 - Map tableProperties = + boolean requiresV3 = writeSchema.columns().stream() .anyMatch(f -> f.initialDefaultLiteral() != null || f.writeDefaultLiteral() != null) - ? ImmutableMap.of(TableProperties.FORMAT_VERSION, "3") - : ImmutableMap.of(); + || TypeUtil.find(writeSchema, t -> t.typeId() == Type.TypeID.UNKNOWN) != null; + Map tableProperties = + requiresV3 ? ImmutableMap.of(TableProperties.FORMAT_VERSION, "3") : ImmutableMap.of(); Table table = tables.create( writeSchema, PartitionSpec.unpartitioned(), tableProperties, location.toString()); + configureTable(table); // Important: use the table's schema for the rest of the test // When tables are created, the column ids are reassigned. diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java index 35be6423ee23..892e260f66f0 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java @@ -18,9 +18,13 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.spark.SparkException; +import org.junit.jupiter.api.Test; public class TestORCDataFrameWrite extends DataFrameWriteTestBase { @Override @@ -30,4 +34,24 @@ protected void configureTable(Table table) { .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.toString()) .commit(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java index 90a9ac48a486..c24d92ef30af 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java @@ -18,9 +18,13 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.spark.SparkException; +import org.junit.jupiter.api.Test; public class TestParquetDataFrameWrite extends DataFrameWriteTestBase { @Override @@ -30,4 +34,24 @@ protected void configureTable(Table table) { .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.toString()) .commit(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java index 6b9ec85b7f0b..6056f1a7929d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.Files.localOutput; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; @@ -37,6 +38,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; public class TestParquetScan extends ScanTestBase { protected boolean vectorized() { @@ -84,4 +86,20 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw super.writeAndValidate(writeSchema, expectedSchema); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index fbd21f737450..fec413ca079a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -41,6 +41,7 @@ import org.apache.spark.sql.types.IntegerType$; import org.apache.spark.sql.types.LongType$; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -238,5 +239,6 @@ public Type primitive(Type.PrimitiveType primitive) { .put(TypeID.STRING, ImmutableSet.of(StringType$.class)) .put(TypeID.FIXED, ImmutableSet.of(BinaryType$.class)) .put(TypeID.BINARY, ImmutableSet.of(BinaryType$.class)) + .put(TypeID.UNKNOWN, ImmutableSet.of(NullType$.class)) .buildOrThrow(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java index 8beaefc5cc8f..d4e440129af1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTypeToType.java @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType; import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructField; @@ -153,8 +154,12 @@ public Type atomic(DataType atomic) { } else if (atomic instanceof DecimalType) { return Types.DecimalType.of( ((DecimalType) atomic).precision(), ((DecimalType) atomic).scale()); + } else if (atomic instanceof BinaryType) { return Types.BinaryType.get(); + + } else if (atomic instanceof NullType) { + return Types.UnknownType.get(); } throw new UnsupportedOperationException("Not a supported type: " + atomic.catalogString()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index dfb9b30be603..d33632bbbd54 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.types.MapType$; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.NullType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType$; @@ -124,9 +125,11 @@ public DataType primitive(Type.PrimitiveType primitive) { case DECIMAL: Types.DecimalType decimal = (Types.DecimalType) primitive; return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale()); + case UNKNOWN: + return NullType$.MODULE$; default: throw new UnsupportedOperationException( - "Cannot convert unknown type to Spark: " + primitive); + "Cannot convert unsupported type to Spark: " + primitive); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java index 9480385d5452..e11a85d538a6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/ParquetWithSparkSchemaVisitor.java @@ -31,6 +31,7 @@ import org.apache.parquet.schema.Type.Repetition; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -173,21 +174,27 @@ private static T visitField( private static List visitFields( StructType struct, GroupType group, ParquetWithSparkSchemaVisitor visitor) { - StructField[] sFields = struct.fields(); - Preconditions.checkArgument( - sFields.length == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); - for (int i = 0; i < sFields.length; i += 1) { - Type field = group.getFields().get(i); - StructField sField = sFields[i]; - Preconditions.checkArgument( - field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())), - "Structs do not match: field %s != %s", - field.getName(), - sField.name()); - results.add(visitField(sField, field, visitor)); + + int fieldIndex = 0; + for (StructField sField : struct.fields()) { + if (sField.dataType() != DataTypes.NullType) { + Type field = group.getFields().get(fieldIndex); + Preconditions.checkArgument( + field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())), + "Structs do not match: field %s != %s", + field.getName(), + sField.name()); + results.add(visitField(sField, field, visitor)); + + fieldIndex += 1; + } } + // All the group fields should have been visited + Preconditions.checkArgument( + fieldIndex == group.getFieldCount(), "Structs do not match: %s and %s", struct, group); + return results; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 6b799e677bf4..6fc8849c82b2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.iceberg.FieldMetrics; @@ -77,7 +79,7 @@ public OrcValueWriter record( TypeDescription record, List names, List> fields) { - return new InternalRowWriter(fields, record.getChildren()); + return new InternalRowWriter(fields, iStruct, record.getChildren()); } @Override @@ -133,12 +135,16 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio private static class InternalRowWriter extends GenericOrcWriters.StructWriter { private final List> fieldGetters; - InternalRowWriter(List> writers, List orcTypes) { - super(writers); + InternalRowWriter( + List> writers, Types.StructType iStruct, List orcTypes) { + super(iStruct, writers); this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size()); - for (TypeDescription orcType : orcTypes) { - fieldGetters.add(createFieldGetter(orcType)); + Map idToType = + orcTypes.stream().collect(Collectors.toMap(ORCSchemaUtil::fieldId, s -> s)); + + for (Types.NestedField iField : iStruct.fields()) { + fieldGetters.add(createFieldGetter(idToType.get(iField.fieldId()))); } } @@ -149,6 +155,11 @@ protected Object get(InternalRow struct, int index) { } static FieldGetter createFieldGetter(TypeDescription fieldType) { + // In the case of an UnknownType + if (fieldType == null) { + return (row, ordinal) -> null; + } + final FieldGetter fieldGetter; switch (fieldType.getCategory()) { case BOOLEAN: diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 58be7f610c81..a1ed6c66f337 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -26,6 +26,7 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.UUID; +import java.util.stream.IntStream; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; @@ -55,6 +56,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NullType; import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -94,14 +96,18 @@ public ParquetValueWriter message( public ParquetValueWriter struct( StructType sStruct, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); - StructField[] sparkFields = sStruct.fields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); - List sparkTypes = Lists.newArrayList(); for (int i = 0; i < fields.size(); i += 1) { writers.add(newOption(struct.getType(i), fieldWriters.get(i))); - sparkTypes.add(sparkFields[i].dataType()); } - return new InternalRowWriter(writers, sparkTypes); + + StructField[] sFields = sStruct.fields(); + DataType[] types = new DataType[sFields.length]; + for (int i = 0; i < sFields.length; i += 1) { + types[i] = sFields[i].dataType(); + } + + return new InternalRowWriter(writers, types); } @Override @@ -565,14 +571,33 @@ public Map.Entry next() { private static class InternalRowWriter extends ParquetValueWriters.StructWriter { private final DataType[] types; - private InternalRowWriter(List> writers, List types) { - super(writers); - this.types = types.toArray(new DataType[0]); + private InternalRowWriter(List> writers, DataType[] types) { + super(writerToFieldIndex(types, writers.size()), writers); + this.types = types; } @Override protected Object get(InternalRow struct, int index) { return struct.get(index, types[index]); } + + /** Returns a mapping from writer index to field index, skipping Unknown columns. */ + private static int[] writerToFieldIndex(DataType[] types, int numWriters) { + if (null == types) { + return IntStream.rangeClosed(0, numWriters).toArray(); + } + + // value writer index to record field index + int[] indexes = new int[numWriters]; + int writerIndex = 0; + for (int pos = 0; pos < types.length; pos += 1) { + if (!(types[pos] instanceof NullType)) { + indexes[writerIndex] = pos; + writerIndex += 1; + } + } + + return indexes; + } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 4045847d5a4a..b8f436cf2d86 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -24,10 +24,12 @@ import java.util.List; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.MetadataAttribute; import org.apache.spark.sql.catalyst.types.DataTypeUtils; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; @@ -80,4 +82,18 @@ public void testSchemaConversionWithMetaDataColumnSchema() { } } } + + @Test + public void testUnknownTypeToSpark() { + Schema schema = new Schema(optional(1, "col", Types.UnknownType.get())); + StructType sparkType = SparkSchemaUtil.convert(schema); + assertThat(sparkType.fields()[0].dataType()).isEqualTo(DataTypes.NullType); + } + + @Test + public void testNullTypeToIceberg() { + StructType sparkType = new StructType().add("col", DataTypes.NullType, true); + Type icebergType = SparkSchemaUtil.convert(sparkType).findField("col").type(); + assertThat(icebergType).isEqualTo(Types.UnknownType.get()); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java index 0db6a65fd394..45053c1a4f1f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTestBase.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -108,8 +109,8 @@ protected boolean supportsRowLineage() { required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding - required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision - ); + required(117, "dec_38_10", Types.DecimalType.of(38, 10)), // Spark's maximum precision + optional(118, "unk", Types.UnknownType.get())); @TempDir protected Path temp; @@ -120,10 +121,13 @@ public void testSimpleStruct() throws IOException { @Test public void testStructWithRequiredFields() throws IOException { + List supportedPrimitives = + SUPPORTED_PRIMITIVES.fields().stream() + .filter(f -> f.type().typeId() != Type.TypeID.UNKNOWN) + .collect(Collectors.toList()); writeAndValidate( TypeUtil.assignIncreasingFreshIds( - new Schema( - Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired)))); + new Schema(Lists.transform(supportedPrimitives, Types.NestedField::asRequired)))); } @Test @@ -603,4 +607,48 @@ public void testRowLineage() throws Exception { record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), record.copy(Map.of("id", 5L, "data", "e")))); } + + @Test + public void testUnknownNestedLevel() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(1, "id", LongType.get()), + optional( + 2, + "nested", + Types.StructType.of( + required(20, "int", Types.IntegerType.get()), + optional(21, "unk", Types.UnknownType.get())))); + + writeAndValidate(schema); + } + + @Test + public void testUnknownListType() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional(1, "data", ListType.ofOptional(2, Types.UnknownType.get()))); + + writeAndValidate(schema); + } + + @Test + public void testUnknownMapType() throws IOException { + assumeThat(supportsNestedTypes()).isTrue(); + + Schema schema = + new Schema( + required(0, "id", LongType.get()), + optional( + 1, + "data", + MapType.ofOptional(2, 3, Types.StringType.get(), Types.UnknownType.get()))); + + writeAndValidate(schema); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java index 3b68a830b088..3fcfe6845c99 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -107,4 +108,20 @@ private void writeAndValidateRecords(Schema schema, Iterable expect private Iterator batchesToRows(Iterator batches) { return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator)); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 328dcaa0014c..bc4b77059d43 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -248,4 +248,20 @@ public void testMissingRequiredWithoutDefault() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Missing required field: missing_str"); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java index bf738be59cb8..634327a81d86 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -150,4 +151,20 @@ private static void assertEqualsUnsafe( assertThat(expectedIter).as("Expected iterator should not have any extra rows.").isExhausted(); assertThat(actualIter).as("Actual iterator should not have any extra rows.").isExhausted(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java index 39ea25ae6f54..aa8966877021 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java @@ -41,6 +41,7 @@ import org.apache.iceberg.spark.data.AvroDataTestBase; import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -95,11 +96,12 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw HadoopTables tables = new HadoopTables(CONF); // If V3 spec features are used, set the format version to 3 - Map tableProperties = + boolean requiresV3 = writeSchema.columns().stream() .anyMatch(f -> f.initialDefaultLiteral() != null || f.writeDefaultLiteral() != null) - ? ImmutableMap.of(TableProperties.FORMAT_VERSION, "3") - : ImmutableMap.of(); + || TypeUtil.find(writeSchema, t -> t.typeId() == Type.TypeID.UNKNOWN) != null; + Map tableProperties = + requiresV3 ? ImmutableMap.of(TableProperties.FORMAT_VERSION, "3") : ImmutableMap.of(); Table table = tables.create( writeSchema, PartitionSpec.unpartitioned(), tableProperties, location.toString()); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java index 35be6423ee23..892e260f66f0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java @@ -18,9 +18,13 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.spark.SparkException; +import org.junit.jupiter.api.Test; public class TestORCDataFrameWrite extends DataFrameWriteTestBase { @Override @@ -30,4 +34,24 @@ protected void configureTable(Table table) { .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.toString()) .commit(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create ListType with unknown element type"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot create MapType with unknown value type"); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java index 90a9ac48a486..c24d92ef30af 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java @@ -18,9 +18,13 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.spark.SparkException; +import org.junit.jupiter.api.Test; public class TestParquetDataFrameWrite extends DataFrameWriteTestBase { @Override @@ -30,4 +34,24 @@ protected void configureTable(Table table) { .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.toString()) .commit(); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(SparkException.class) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java index c0dee43d6de1..8b567bcaf11e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.Files.localOutput; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; @@ -37,6 +38,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; public class TestParquetScan extends ScanTestBase { protected boolean vectorized() { @@ -83,4 +85,20 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw super.writeAndValidate(writeSchema, expectedSchema); } + + @Test + @Override + public void testUnknownListType() { + assertThatThrownBy(super::testUnknownListType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert element Parquet: unknown"); + } + + @Test + @Override + public void testUnknownMapType() { + assertThatThrownBy(super::testUnknownMapType) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot convert value Parquet: unknown"); + } }