Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@

* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)).
* Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)).

## Security Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,23 @@ DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
}
Map<Integer, byte[]> output = new HashMap<>(input.size());
for (Map.Entry<Integer, ByteBuffer> e : input.entrySet()) {
output.put(e.getKey(), e.getValue().array());
output.put(e.getKey(), toByteArray(e.getValue()));
}
return output;
}

// Copy only [position, limit). ByteBuffer.array() returns the full backing
// array, which is sometimes larger than the buffer's content (e.g. trailing
// 0x00 bytes). Leaking those into manifest bounds shifts the lower bound
// above the real min and breaks equality predicate pushdown in some query
// engines.
private static byte[] toByteArray(ByteBuffer buf) {
ByteBuffer view = buf.duplicate();
byte[] bytes = new byte[view.remaining()];
view.get(bytes);
return bytes;
}

private static @Nullable Map<Integer, ByteBuffer> toByteBufferMap(
@Nullable Map<Integer, byte[]> input) {
if (input == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,27 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Test;

/**
Expand Down Expand Up @@ -73,4 +87,55 @@ public void testFieldsInEqualsMethodInSyncWithGetterFields() {
+ "to this test class's FIELDS_SET.");
}
}

/**
* Bounds with {@code capacity > limit} must be copied by {@code [position, limit)}, not by {@link
* ByteBuffer#array()}. Otherwise trailing 0x00 bytes leak into the manifest bounds and break
* equality predicate pushdown in some query engines.
*/
@Test
public void testBoundByteBufferIsCopiedByLimitNotBackingArrayLength() {
// Encode bounds the same way iceberg-parquet does in the wild — via
// Conversions.toByteBuffer(STRING, value). For UTF-8 strings of 10+
// characters the underlying JDK CharsetEncoder over-allocates by ~10%
// and flips, producing a ByteBuffer with capacity > limit.
int columnId = 3;
String lowerValue = "lower_bound_str";
String upperValue = "upper_bound_str";
byte[] expectedLower = lowerValue.getBytes(StandardCharsets.UTF_8);
byte[] expectedUpper = upperValue.getBytes(StandardCharsets.UTF_8);

ByteBuffer lower = Conversions.toByteBuffer(Types.StringType.get(), lowerValue);
ByteBuffer upper = Conversions.toByteBuffer(Types.StringType.get(), upperValue);

Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
lowerBounds.put(columnId, lower);
Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
upperBounds.put(columnId, upper);

Metrics metrics = new Metrics(1L, null, null, null, null, lowerBounds, upperBounds);

DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withFormat(FileFormat.PARQUET)
.withPath("gs://test-bucket/data/test-file.parquet")
.withFileSizeInBytes(1024L)
.withMetrics(metrics)
.build();

SerializableDataFile serialized = SerializableDataFile.from(dataFile, "");

byte[] serializedLower = serialized.getLowerBounds().get(columnId);
byte[] serializedUpper = serialized.getUpperBounds().get(columnId);
assertEquals(
"lower bound length must match content, not backing array",
expectedLower.length,
serializedLower.length);
assertEquals(
"upper bound length must match content, not backing array",
expectedUpper.length,
serializedUpper.length);
assertArrayEquals(expectedLower, serializedLower);
assertArrayEquals(expectedUpper, serializedUpper);
}
}
Loading