Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9187924
Add unknown type support to Spark 3.4 and 3.5
Apr 9, 2026
d9d5a03
Spark: Add unknown type support to Spark 3.4 and 3.5
Apr 9, 2026
369deb1
Merge branch 'apache:main' into spark-3x-unknown-type-support
Kurtiscwright Apr 20, 2026
b340145
Add .factorypath to .gitignore (#16067)
wangyum Apr 21, 2026
08b0b43
Spark: Replace deprecated registerTempTable with createOrReplaceTempV…
drexler-sky Apr 21, 2026
dd703ab
AWS: Add proxy system property and environment variable configuration…
lunar-shadow Apr 21, 2026
7014749
Kafka Connect: Do not fail if no partitions assigned (#15955)
kumarpritam863 Apr 21, 2026
e1091dd
Core: Use Stream overload for reading response in HTTPClient (#15648)
alpbeysir Apr 21, 2026
28b6df3
Spark: Fix RoaringBitmap version in runtime-deps.txt (#16076)
ebyhr Apr 22, 2026
181e7b1
Core: Use Idiomatic ThreadLocal cleanup in CommitMetadata (#15284) (#…
yadavay-amzn Apr 22, 2026
b6977d1
Spark: fix delete from branch for canDeleteWhere where it does not re…
yingjianwu98 Apr 22, 2026
8ca9664
Kafka Connect: Support VARIANT when record convert (#15283)
seokyun-ha-toss Apr 22, 2026
77ab506
REST Spec: Clarify identifier uniqueness across tables and views (#15…
stevenzwu Apr 23, 2026
eaf39d7
Spark 3.4, 3.5, 4.0: Include snapshotId and branch in SparkTable equa…
bharos Apr 23, 2026
361ad5a
Core, Spark: Verify that TRUNCATE removes orphaned DVs (#16078)
nastra Apr 23, 2026
435a02e
Spark: Add unknown type support to Spark 3.4 and 3.5
Apr 23, 2026
9ae0370
Merge branch 'apache:main' into spark-3x-unknown-type-support
Kurtiscwright Apr 23, 2026
0778865
Merge branch 'apache:main' into spark-3x-unknown-type-support
Kurtiscwright Apr 24, 2026
873cfe7
Spark 3.4: Fix unknown type tests by calling configureTable in ScanTe…
Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -238,5 +239,6 @@ public Type primitive(Type.PrimitiveType primitive) {
.put(TypeID.STRING, ImmutableSet.of(StringType$.class))
.put(TypeID.FIXED, ImmutableSet.of(BinaryType$.class))
.put(TypeID.BINARY, ImmutableSet.of(BinaryType$.class))
.put(TypeID.UNKNOWN, ImmutableSet.of(NullType$.class))
.buildOrThrow();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.NullType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -153,8 +154,12 @@ public Type atomic(DataType atomic) {
} else if (atomic instanceof DecimalType) {
return Types.DecimalType.of(
((DecimalType) atomic).precision(), ((DecimalType) atomic).scale());

} else if (atomic instanceof BinaryType) {
return Types.BinaryType.get();

} else if (atomic instanceof NullType) {
return Types.UnknownType.get();
}

throw new UnsupportedOperationException("Not a supported type: " + atomic.catalogString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
Expand Down Expand Up @@ -124,9 +125,11 @@ public DataType primitive(Type.PrimitiveType primitive) {
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale());
case UNKNOWN:
return NullType$.MODULE$;
default:
throw new UnsupportedOperationException(
"Cannot convert unknown type to Spark: " + primitive);
"Cannot convert unsupported type to Spark: " + primitive);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.parquet.schema.Type.Repetition;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -181,21 +182,27 @@ private static <T> T visitField(

private static <T> List<T> visitFields(
StructType struct, GroupType group, ParquetWithSparkSchemaVisitor<T> visitor) {
StructField[] sFields = struct.fields();
Preconditions.checkArgument(
sFields.length == group.getFieldCount(), "Structs do not match: %s and %s", struct, group);
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (int i = 0; i < sFields.length; i += 1) {
Type field = group.getFields().get(i);
StructField sField = sFields[i];
Preconditions.checkArgument(
field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())),
"Structs do not match: field %s != %s",
field.getName(),
sField.name());
results.add(visitField(sField, field, visitor));

int fieldIndex = 0;
for (StructField sField : struct.fields()) {
if (sField.dataType() != DataTypes.NullType) {
Type field = group.getFields().get(fieldIndex);
Preconditions.checkArgument(
field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.name())),
"Structs do not match: field %s != %s",
field.getName(),
sField.name());
results.add(visitField(sField, field, visitor));

fieldIndex += 1;
}
}

// All the group fields should have been visited
Preconditions.checkArgument(
fieldIndex == group.getFieldCount(), "Structs do not match: %s and %s", struct, group);

return results;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.iceberg.FieldMetrics;
Expand Down Expand Up @@ -77,7 +79,7 @@ public OrcValueWriter<?> record(
TypeDescription record,
List<String> names,
List<OrcValueWriter<?>> fields) {
return new InternalRowWriter(fields, record.getChildren());
return new InternalRowWriter(fields, iStruct, record.getChildren());
}

@Override
Expand Down Expand Up @@ -133,12 +135,16 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
private static class InternalRowWriter extends GenericOrcWriters.StructWriter<InternalRow> {
private final List<FieldGetter<?>> fieldGetters;

InternalRowWriter(List<OrcValueWriter<?>> writers, List<TypeDescription> orcTypes) {
super(writers);
InternalRowWriter(
List<OrcValueWriter<?>> writers, Types.StructType iStruct, List<TypeDescription> orcTypes) {
super(iStruct, writers);
this.fieldGetters = Lists.newArrayListWithExpectedSize(orcTypes.size());

for (TypeDescription orcType : orcTypes) {
fieldGetters.add(createFieldGetter(orcType));
Map<Integer, TypeDescription> idToType =
orcTypes.stream().collect(Collectors.toMap(ORCSchemaUtil::fieldId, s -> s));

for (Types.NestedField iField : iStruct.fields()) {
fieldGetters.add(createFieldGetter(idToType.get(iField.fieldId())));
}
}

Expand All @@ -149,6 +155,11 @@ protected Object get(InternalRow struct, int index) {
}

static FieldGetter<?> createFieldGetter(TypeDescription fieldType) {
// In the case of an UnknownType
if (fieldType == null) {
return (row, ordinal) -> null;
}

final FieldGetter<?> fieldGetter;
switch (fieldType.getCategory()) {
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.NullType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -94,15 +96,18 @@ public ParquetValueWriter<?> message(
public ParquetValueWriter<?> struct(
StructType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
List<Type> fields = struct.getFields();
StructField[] sparkFields = sStruct.fields();
List<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
List<DataType> sparkTypes = Lists.newArrayList();
for (int i = 0; i < fields.size(); i += 1) {
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
sparkTypes.add(sparkFields[i].dataType());
}

return new InternalRowWriter(writers, sparkTypes);
StructField[] sFields = sStruct.fields();
DataType[] types = new DataType[sFields.length];
for (int i = 0; i < sFields.length; i += 1) {
types[i] = sFields[i].dataType();
}

return new InternalRowWriter(writers, types);
}

@Override
Expand Down Expand Up @@ -566,14 +571,33 @@ public Map.Entry<K, V> next() {
private static class InternalRowWriter extends ParquetValueWriters.StructWriter<InternalRow> {
private final DataType[] types;

private InternalRowWriter(List<ParquetValueWriter<?>> writers, List<DataType> types) {
super(writers);
this.types = types.toArray(new DataType[0]);
private InternalRowWriter(List<ParquetValueWriter<?>> writers, DataType[] types) {
super(writerToFieldIndex(types, writers.size()), writers);
this.types = types;
}

@Override
protected Object get(InternalRow struct, int index) {
return struct.get(index, types[index]);
}

/** Returns a mapping from writer index to field index, skipping Unknown columns. */
private static int[] writerToFieldIndex(DataType[] types, int numWriters) {
if (null == types) {
return IntStream.rangeClosed(0, numWriters).toArray();
}

// value writer index to record field index
int[] indexes = new int[numWriters];
int writerIndex = 0;
for (int pos = 0; pos < types.length; pos += 1) {
if (!(types[pos] instanceof NullType)) {
indexes[writerIndex] = pos;
writerIndex += 1;
}
}

return indexes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ public ColumnVector convert(
DeletedColumnVector deletedVector = new DeletedColumnVector(field.type());
deletedVector.setValue(new boolean[batchSize]);
fieldVectors.add(deletedVector);
} else if (field.type().equals(Types.UnknownType.get())) {
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, null));
} else {
fieldVectors.add(
fieldConverters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.List;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.MetadataAttribute;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -79,4 +81,18 @@ public void testSchemaConversionWithMetaDataColumnSchema() {
}
}
}

@Test
public void testUnknownTypeToSpark() {
Schema schema = new Schema(optional(1, "col", Types.UnknownType.get()));
StructType sparkType = SparkSchemaUtil.convert(schema);
assertThat(sparkType.fields()[0].dataType()).isEqualTo(DataTypes.NullType);
}

@Test
public void testNullTypeToIceberg() {
StructType sparkType = new StructType().add("col", DataTypes.NullType, true);
Type icebergType = SparkSchemaUtil.convert(sparkType).findField("col").type();
assertThat(icebergType).isEqualTo(Types.UnknownType.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -108,8 +109,8 @@ protected boolean supportsRowLineage() {
required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded
required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding
required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision
);
required(117, "dec_38_10", Types.DecimalType.of(38, 10)), // Spark's maximum precision
optional(118, "unk", Types.UnknownType.get()));

@TempDir protected Path temp;

Expand All @@ -120,10 +121,13 @@ public void testSimpleStruct() throws IOException {

@Test
public void testStructWithRequiredFields() throws IOException {
List<Types.NestedField> supportedPrimitives =
SUPPORTED_PRIMITIVES.fields().stream()
.filter(f -> f.type().typeId() != Type.TypeID.UNKNOWN)
.collect(Collectors.toList());
writeAndValidate(
TypeUtil.assignIncreasingFreshIds(
new Schema(
Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired))));
new Schema(Lists.transform(supportedPrimitives, Types.NestedField::asRequired))));
}

@Test
Expand Down Expand Up @@ -603,4 +607,48 @@ public void testRowLineage() throws Exception {
record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
record.copy(Map.of("id", 5L, "data", "e"))));
}

@Test
public void testUnknownNestedLevel() throws IOException {
assumeThat(supportsNestedTypes()).isTrue();

Schema schema =
new Schema(
required(1, "id", LongType.get()),
optional(
2,
"nested",
Types.StructType.of(
required(20, "int", Types.IntegerType.get()),
optional(21, "unk", Types.UnknownType.get()))));

writeAndValidate(schema);
}

@Test
public void testUnknownListType() throws IOException {
assumeThat(supportsNestedTypes()).isTrue();

Schema schema =
new Schema(
required(0, "id", LongType.get()),
optional(1, "data", ListType.ofOptional(2, Types.UnknownType.get())));

writeAndValidate(schema);
}

@Test
public void testUnknownMapType() throws IOException {
assumeThat(supportsNestedTypes()).isTrue();

Schema schema =
new Schema(
required(0, "id", LongType.get()),
optional(
1,
"data",
MapType.ofOptional(2, 3, Types.StringType.get(), Types.UnknownType.get())));

writeAndValidate(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.spark.data.TestHelpers.assertEquals;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -106,4 +107,20 @@ private void writeAndValidateRecords(Schema schema, Iterable<InternalRow> expect
private Iterator<InternalRow> batchesToRows(Iterator<ColumnarBatch> batches) {
return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator));
}

@Test
@Override
public void testUnknownListType() {
assertThatThrownBy(super::testUnknownListType)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot create ListType with unknown element type");
}

@Test
@Override
public void testUnknownMapType() {
assertThatThrownBy(super::testUnknownMapType)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot create MapType with unknown value type");
}
}
Loading
Loading