diff --git a/CHANGES.md b/CHANGES.md index 52475a99d8e1..fa4fdb096601 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -98,6 +98,7 @@ * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). * Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)). +* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)). ## Security Fixes diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 1f717b82c21e..9e75be0a1987 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -205,11 +205,23 @@ DataFile createDataFile(Map partitionSpecs) { } Map output = new HashMap<>(input.size()); for (Map.Entry e : input.entrySet()) { - output.put(e.getKey(), e.getValue().array()); + output.put(e.getKey(), toByteArray(e.getValue())); } return output; } + // Copy only [position, limit). ByteBuffer.array() returns the full backing + // array, which is sometimes larger than the buffer's content (e.g. trailing + // 0x00 bytes). Leaking those into manifest bounds shifts the lower bound + // above the real min and breaks equality predicate pushdown in some query + // engines. + private static byte[] toByteArray(ByteBuffer buf) { + ByteBuffer view = buf.duplicate(); + byte[] bytes = new byte[view.remaining()]; + view.get(bytes); + return bytes; + } + private static @Nullable Map toByteBufferMap( @Nullable Map input) { if (input == null) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java index 983f021fd7ce..d4e7793718d8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java @@ -17,13 +17,27 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; import org.junit.Test; /** @@ -73,4 +87,55 @@ public void testFieldsInEqualsMethodInSyncWithGetterFields() { + "to this test class's FIELDS_SET."); } } + + /** + * Bounds with {@code capacity > limit} must be copied by {@code [position, limit)}, not by {@link + * ByteBuffer#array()}. Otherwise trailing 0x00 bytes leak into the manifest bounds and break + * equality predicate pushdown in some query engines. + */ + @Test + public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() { + // Encode bounds the same way iceberg-parquet does in the wild — via + // Conversions.toByteBuffer(STRING, value). For UTF-8 strings of 10+ + // characters the underlying JDK CharsetEncoder over-allocates by ~10% + // and flips, producing a ByteBuffer with capacity > limit. + int columnId = 3; + String lowerValue = "lower_bound_str"; + String upperValue = "upper_bound_str"; + byte[] expectedLower = lowerValue.getBytes(StandardCharsets.UTF_8); + byte[] expectedUpper = upperValue.getBytes(StandardCharsets.UTF_8); + + ByteBuffer lower = Conversions.toByteBuffer(Types.StringType.get(), lowerValue); + ByteBuffer upper = Conversions.toByteBuffer(Types.StringType.get(), upperValue); + + Map lowerBounds = new HashMap<>(); + lowerBounds.put(columnId, lower); + Map upperBounds = new HashMap<>(); + upperBounds.put(columnId, upper); + + Metrics metrics = new Metrics(1L, null, null, null, null, lowerBounds, upperBounds); + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat(FileFormat.PARQUET) + .withPath("gs://test-bucket/data/test-file.parquet") + .withFileSizeInBytes(1024L) + .withMetrics(metrics) + .build(); + + SerializableDataFile serialized = SerializableDataFile.from(dataFile, ""); + + byte[] serializedLower = serialized.getLowerBounds().get(columnId); + byte[] serializedUpper = serialized.getUpperBounds().get(columnId); + assertEquals( + "lower bound length must match content, not backing array", + expectedLower.length, + serializedLower.length); + assertEquals( + "upper bound length must match content, not backing array", + expectedUpper.length, + serializedUpper.length); + assertArrayEquals(expectedLower, serializedLower); + assertArrayEquals(expectedUpper, serializedUpper); + } }