From b9a370e439210d9992556e6c9acfad20a85f3ea4 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Thu, 21 May 2026 11:27:25 +0100 Subject: [PATCH 1/4] [Iceberg] Fix manifest bounds being padded with trailing 0x00 bytes --- .../sdk/io/iceberg/SerializableDataFile.java | 14 +++- .../io/iceberg/SerializableDataFileTest.java | 66 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 1f717b82c21e..9e75be0a1987 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -205,11 +205,23 @@ DataFile createDataFile(Map partitionSpecs) { } Map output = new HashMap<>(input.size()); for (Map.Entry e : input.entrySet()) { - output.put(e.getKey(), e.getValue().array()); + output.put(e.getKey(), toByteArray(e.getValue())); } return output; } + // Copy only [position, limit). ByteBuffer.array() returns the full backing + // array, which is sometimes larger than the buffer's content (e.g. trailing + // 0x00 bytes). Leaking those into manifest bounds shifts the lower bound + // above the real min and breaks equality predicate pushdown in some query + // engines. + private static byte[] toByteArray(ByteBuffer buf) { + ByteBuffer view = buf.duplicate(); + byte[] bytes = new byte[view.remaining()]; + view.get(bytes); + return bytes; + } + private static @Nullable Map toByteBufferMap( @Nullable Map input) { if (input == null) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java index 983f021fd7ce..964ad5194130 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java @@ -17,13 +17,25 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; import org.junit.Test; /** @@ -73,4 +85,58 @@ public void testFieldsInEqualsMethodInSyncWithGetterFields() { + "to this test class's FIELDS_SET."); } } + + /** + * Bounds with {@code capacity > limit} must be copied by {@code [position, limit)}, not by {@link + * ByteBuffer#array()}. Otherwise trailing 0x00 bytes leak into the manifest bounds and break + * equality predicate pushdown in some query engines. + */ + @Test + public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() { + // Reproduce the shape iceberg-parquet produces in the wild: a ByteBuffer + // whose backing array is larger than [position, limit), with trailing + // 0x00 bytes. iceberg-parquet hits this because the JDK UTF-8 encoder + // over-allocates; here we build it explicitly so the test doesn't depend + // on encoder internals. + int columnId = 3; + byte[] expectedLower = "lower_bound_str".getBytes(StandardCharsets.UTF_8); + byte[] expectedUpper = "upper_bound_str".getBytes(StandardCharsets.UTF_8); + + ByteBuffer lower = ByteBuffer.allocate(expectedLower.length + 1); + lower.put(expectedLower); + lower.flip(); + ByteBuffer upper = ByteBuffer.allocate(expectedUpper.length + 1); + upper.put(expectedUpper); + upper.flip(); + + Map lowerBounds = new HashMap<>(); + lowerBounds.put(columnId, lower); + Map upperBounds = new HashMap<>(); + upperBounds.put(columnId, upper); + + Metrics metrics = new Metrics(1L, null, null, null, null, lowerBounds, upperBounds); + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat(FileFormat.PARQUET) + .withPath("gs://test-bucket/data/test-file.parquet") + .withFileSizeInBytes(1024L) + .withMetrics(metrics) + .build(); + + SerializableDataFile serialized = SerializableDataFile.from(dataFile, ""); + + byte[] serializedLower = serialized.getLowerBounds().get(columnId); + byte[] serializedUpper = serialized.getUpperBounds().get(columnId); + assertEquals( + "lower bound length must match content, not backing array", + expectedLower.length, + serializedLower.length); + assertEquals( + "upper bound length must match content, not backing array", + expectedUpper.length, + serializedUpper.length); + assertArrayEquals(expectedLower, serializedLower); + assertArrayEquals(expectedUpper, serializedUpper); + } } From 370dae744b8077257e1a6ee6d31d9fdb400d63c9 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Thu, 21 May 2026 11:29:44 +0100 Subject: [PATCH 2/4] update changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 52475a99d8e1..fa4fdb096601 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -98,6 +98,7 @@ * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). * Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)). +* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)). ## Security Fixes From 0b677ad30556a1449b1b907db6cc8d560dc9ab5f Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Thu, 21 May 2026 19:09:32 +0100 Subject: [PATCH 3/4] test: encode bound ByteBuffers via Conversions.toByteBuffer --- .../io/iceberg/SerializableDataFileTest.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java index 964ad5194130..d4e7793718d8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java @@ -36,6 +36,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; import org.junit.Test; /** @@ -93,21 +95,18 @@ public void testFieldsInEqualsMethodInSyncWithGetterFields() { */ @Test public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() { - // Reproduce the shape iceberg-parquet produces in the wild: a ByteBuffer - // whose backing array is larger than [position, limit), with trailing - // 0x00 bytes. iceberg-parquet hits this because the JDK UTF-8 encoder - // over-allocates; here we build it explicitly so the test doesn't depend - // on encoder internals. + // Encode bounds the same way iceberg-parquet does in the wild — via + // Conversions.toByteBuffer(STRING, value). For UTF-8 strings of 10+ + // characters the underlying JDK CharsetEncoder over-allocates by ~10% + // and flips, producing a ByteBuffer with capacity > limit. int columnId = 3; - byte[] expectedLower = "lower_bound_str".getBytes(StandardCharsets.UTF_8); - byte[] expectedUpper = "upper_bound_str".getBytes(StandardCharsets.UTF_8); + String lowerValue = "lower_bound_str"; + String upperValue = "upper_bound_str"; + byte[] expectedLower = lowerValue.getBytes(StandardCharsets.UTF_8); + byte[] expectedUpper = upperValue.getBytes(StandardCharsets.UTF_8); - ByteBuffer lower = ByteBuffer.allocate(expectedLower.length + 1); - lower.put(expectedLower); - lower.flip(); - ByteBuffer upper = ByteBuffer.allocate(expectedUpper.length + 1); - upper.put(expectedUpper); - upper.flip(); + ByteBuffer lower = Conversions.toByteBuffer(Types.StringType.get(), lowerValue); + ByteBuffer upper = Conversions.toByteBuffer(Types.StringType.get(), upperValue); Map lowerBounds = new HashMap<>(); lowerBounds.put(columnId, lower); From bda9e02be2962a5acf3666c451736bc9b493ba35 Mon Sep 17 00:00:00 2001 From: Deji Ibrahim Date: Fri, 22 May 2026 11:35:08 +0100 Subject: [PATCH 4/4] trigger build