diff --git a/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java b/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java new file mode 100644 index 000000000000..418a032f5e75 --- /dev/null +++ b/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import me.lemire.integercompression.FastPFOR128; +import me.lemire.integercompression.IntWrapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * A benchmark that evaluates the performance of {@link PFOREncoding} and compares it with + * JavaFastPFOR. + * + *
Two data shapes are exercised: + * + *
To run this benchmark:
+ * ./gradlew :iceberg-core:jmh
+ * -PjmhIncludeRegex=PFOREncodingBenchmark
+ * -PjmhOutputPath=benchmark/pfor-encoding-benchmark.txt
+ *
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.Throughput)
+@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
+public class PFOREncodingBenchmark {
+
+ // Iceberg PFOR input arrays
+ private int[] descriptorValues;
+ private int[] uniformValues;
+
+ // Pre-encoded buffers for decode benchmarks
+ private ByteBuffer descriptorEncoded;
+ private ByteBuffer uniformEncoded;
+
+ // Reusable encode buffer (avoids allocation in the encode hot path)
+ private ByteBuffer encodeBuffer;
+
+ // Reusable decode output (avoids allocation in the decode hot path)
+ private int[] decodeOutput;
+
+ // JavaFastPFOR compressed arrays (pre-encoded for decode benchmarks)
+ private int[] descriptorFastPFOREncoded;
+ private int[] uniformFastPFOREncoded;
+
+ // Reusable output buffers for JavaFastPFOR (avoids allocation in hot path)
+ private int[] fastPFOROutputBuffer;
+
+ @Setup
+ public void setupBenchmark() {
+ Random random = new Random(1938745);
+
+ // 256-value descriptor-like data: mostly [0,31] with ~5% [0,255] outliers
+ descriptorValues = PFORRandomData.exceptions(random, 256, 0.05f);
+
+ // 256-value uniform byte data
+ uniformValues = PFORRandomData.uniform(random, 256, 255);
+
+ // Reusable encode buffer: worst-case for a single 256-value chunk
+ encodeBuffer = ByteBuffer.allocate(3 + 256);
+
+ // Reusable decode output: one entry per value in a chunk
+ decodeOutput = new int[256];
+
+ // Pre-encode for decode benchmarks
+ descriptorEncoded = PFOREncoding.encode(descriptorValues, descriptorValues.length);
+ uniformEncoded = PFOREncoding.encode(uniformValues, uniformValues.length);
+
+ // Pre-encode with JavaFastPFOR for decode benchmarks
+ FastPFOR128 codec = new FastPFOR128();
+ descriptorFastPFOREncoded = fastPFOREncode(codec, descriptorValues);
+ uniformFastPFOREncoded = fastPFOREncode(codec, uniformValues);
+
+ // Output buffer large enough for any 256-value decoded result
+ fastPFOROutputBuffer = new int[256 + 1024];
+ }
+
+ // ---------------------------------------------------------------------------
+ // Iceberg PFOR — descriptor data shape
+ // ---------------------------------------------------------------------------
+
+ @Benchmark
+ @Threads(1)
+ public void encodeDescriptorIceberg(Blackhole blackhole) {
+ blackhole.consume(
+ PFOREncoding.encode(
+ descriptorValues, 0, encodeBuffer, encodeBuffer.position(), descriptorValues.length));
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void decodeDescriptorIceberg(Blackhole blackhole) {
+ PFOREncoding.decode(
+ descriptorEncoded, descriptorEncoded.position(), decodeOutput, 0, descriptorValues.length);
+ blackhole.consume(decodeOutput);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Iceberg PFOR — uniform byte data shape
+ // ---------------------------------------------------------------------------
+
+ @Benchmark
+ @Threads(1)
+ public void encodeUniformIceberg(Blackhole blackhole) {
+ blackhole.consume(
+ PFOREncoding.encode(
+ uniformValues, 0, encodeBuffer, encodeBuffer.position(), uniformValues.length));
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void decodeUniformIceberg(Blackhole blackhole) {
+ PFOREncoding.decode(
+ uniformEncoded, uniformEncoded.position(), decodeOutput, 0, uniformValues.length);
+ blackhole.consume(decodeOutput);
+ }
+
+ // ---------------------------------------------------------------------------
+ // JavaFastPFOR — descriptor data shape
+ // ---------------------------------------------------------------------------
+
+ @Benchmark
+ @Threads(1)
+ public void encodeDescriptorFastPFOR(Blackhole blackhole) {
+ FastPFOR128 codec = new FastPFOR128();
+ blackhole.consume(fastPFOREncode(codec, descriptorValues));
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void decodeDescriptorFastPFOR(Blackhole blackhole) {
+ FastPFOR128 codec = new FastPFOR128();
+ blackhole.consume(fastPFORDecode(codec, descriptorFastPFOREncoded, fastPFOROutputBuffer));
+ }
+
+ // ---------------------------------------------------------------------------
+ // JavaFastPFOR — uniform byte data shape
+ // ---------------------------------------------------------------------------
+
+ @Benchmark
+ @Threads(1)
+ public void encodeUniformFastPFOR(Blackhole blackhole) {
+ FastPFOR128 codec = new FastPFOR128();
+ blackhole.consume(fastPFOREncode(codec, uniformValues));
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void decodeUniformFastPFOR(Blackhole blackhole) {
+ FastPFOR128 codec = new FastPFOR128();
+ blackhole.consume(fastPFORDecode(codec, uniformFastPFOREncoded, fastPFOROutputBuffer));
+ }
+
+ // ---------------------------------------------------------------------------
+ // JavaFastPFOR helpers
+ // ---------------------------------------------------------------------------
+
+ private static int[] fastPFOREncode(FastPFOR128 codec, int[] values) {
+ int[] output = new int[values.length + 1024];
+ IntWrapper inPos = new IntWrapper(0);
+ IntWrapper outPos = new IntWrapper(0);
+ codec.compress(values, inPos, values.length, output, outPos);
+ int[] result = new int[outPos.get()];
+ System.arraycopy(output, 0, result, 0, outPos.get());
+ return result;
+ }
+
+ private static int[] fastPFORDecode(FastPFOR128 codec, int[] encoded, int[] output) {
+ IntWrapper inPos = new IntWrapper(0);
+ IntWrapper outPos = new IntWrapper(0);
+ codec.uncompress(encoded, inPos, encoded.length, output, outPos);
+ return output;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java b/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java
new file mode 100644
index 000000000000..080d9556504b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.mumbling;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Bit packing and unpacking for values of 1–7 bits.
+ *
+ *
The least-significant bits of each value are packed with the first value occupying the most + * significant bits of the output. Output is padded to the nearest byte with 0s. For example, + * packing values [0b11, 0b10, 0b01] with width 2 produces 0b11100100. + */ +class BitPacking { + + private BitPacking() {} + + /** + * Packs {@code width} least-significant bits of {@code count} values into a data buffer. + * + *
Output is padded to the nearest byte with 0s. + * + *
Values are written to the buffer's underlying storage, but the buffer's position and limit + * are not modified. + * + * @param width number of bits of each value to pack + * @param values array containing source values to pack + * @param valueOffset starting index of values to pack + * @param data an output {@link ByteBuffer} + * @param dataOffset starting index for output in the data buffer + * @param count the number of values to pack + * @return the number of bytes written to the data buffer + */ + static int packBits( + int width, int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + return switch (width) { + case 0 -> 0; + case 1 -> packBits1(values, valueOffset, data, dataOffset, count); + case 2 -> packBits2(values, valueOffset, data, dataOffset, count); + case 3 -> packBits3(values, valueOffset, data, dataOffset, count); + case 4 -> packBits4(values, valueOffset, data, dataOffset, count); + case 5 -> packBits5(values, valueOffset, data, dataOffset, count); + case 6 -> packBits6(values, valueOffset, data, dataOffset, count); + case 7 -> packBits7(values, valueOffset, data, dataOffset, count); + case 8 -> copyAsBytes(values, valueOffset, data, dataOffset, count); + default -> throw new IllegalArgumentException("Invalid bit width: " + width); + }; + } + + /** + * Unpacks {@code count} values from a data buffer containing {@code width} bits of each value. + * + *
Unused bits in the last input byte are ignored. + * + *
The input buffer's position and limit are not modified. + * + * @param width number of bits of each value to unpack + * @param data an input {@link ByteBuffer} + * @param dataOffset starting index for input in the data buffer + * @param output array for unpacked output values + * @param outputOffset starting index to store values in the output array + * @param count the number of values to unpack + * @return the number of bytes read from the data buffer + */ + static int unpackBits( + int width, ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + return switch (width) { + case 0 -> 0; + case 1 -> unpackBits1(data, dataOffset, output, outputOffset, count); + case 2 -> unpackBits2(data, dataOffset, output, outputOffset, count); + case 3 -> unpackBits3(data, dataOffset, output, outputOffset, count); + case 4 -> unpackBits4(data, dataOffset, output, outputOffset, count); + case 5 -> unpackBits5(data, dataOffset, output, outputOffset, count); + case 6 -> unpackBits6(data, dataOffset, output, outputOffset, count); + case 7 -> unpackBits7(data, dataOffset, output, outputOffset, count); + case 8 -> copyAsBytes(data, dataOffset, output, outputOffset, count); + default -> throw new IllegalArgumentException("Invalid bit width: " + width); + }; + } + + /** + * Copy byte values from src into a buffer. + * + *
Values must be bytes stored in an integer array. The 3 most significant bytes of the values + * are ignored. + * + * @param source array of source values to copy + * @param sourceOffset starting offset of values to copy + * @param out output buffer values will be copied to + * @param outOffset starting offset in the output buffer + * @param count number of values (bytes) to copy + * @return the number of bytes written to the buffer + */ + private static int copyAsBytes( + int[] source, int sourceOffset, ByteBuffer out, int outOffset, int count) { + for (int i = 0; i < count; i += 1) { + out.put(outOffset + i, (byte) source[sourceOffset + i]); + } + + return count; + } + + /** + * Copy byte values from a buffer into an int[]. + * + * @param data buffer of source values to copyß + * @param dataOffset starting offset in the input buffer + * @param out output array values will be copied to + * @param outOffset starting offset in the output buffer + * @param count number of values (bytes) to copy + * @return the number of bytes read from the buffer + */ + private static int copyAsBytes( + ByteBuffer data, int dataOffset, int[] out, int outOffset, int count) { + for (int i = 0; i < count; i += 1) { + out[outOffset + i] = (data.get(dataOffset + i) & 0xFF); + } + + return count; + } + + // --------------------------------------------------------------------------- + // Specialized pack: width=1..7 + // Each method packs 8 values into exactly width bytes (full groups), plus a + // partial group of remaining < 8 values in ceil(remaining*width/8) bytes. + // --------------------------------------------------------------------------- + + /** 1-bit values: 8 values into 1 byte. */ + private static int packBits1( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + group; + int word = + packWord1( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + fullGroups; + int word = + packWord1( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + data.put(outputOffset, (byte) word); + } + + return byteWidth(count); + } + + private static int packWord1(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b1) << 7) + | ((b & 0b1) << 6) + | ((c & 0b1) << 5) + | ((d & 0b1) << 4) + | ((e & 0b1) << 3) + | ((f & 0b1) << 2) + | ((g & 0b1) << 1) + | (h & 0b1); + } + + /** 2-bit values: 8 values into 2 bytes. */ + private static int packBits2( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 2 * group; + int word = + packWord2( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 8)); + data.put(outputOffset + 1, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 2 * fullGroups; + int word = + packWord2( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + data.put(outputOffset, (byte) (word >>> 8)); + if (remaining > 4) { + data.put(outputOffset + 1, (byte) word); + } + } + + return byteWidth(2 * count); + } + + private static int packWord2(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b11) << 14) + | ((b & 0b11) << 12) + | ((c & 0b11) << 10) + | ((d & 0b11) << 8) + | ((e & 0b11) << 6) + | ((f & 0b11) << 4) + | ((g & 0b11) << 2) + | (h & 0b11); + } + + /** 3-bit values: 8 values into 3 bytes. */ + private static int packBits3( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 3 * group; + int word = + packWord3( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 16)); + data.put(outputOffset + 1, (byte) (word >>> 8)); + data.put(outputOffset + 2, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 3 * fullGroups; + int word = + packWord3( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(3 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (16 - 8 * k))); + } + } + + return byteWidth(3 * count); + } + + private static int packWord3(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b111) << 21) + | ((b & 0b111) << 18) + | ((c & 0b111) << 15) + | ((d & 0b111) << 12) + | ((e & 0b111) << 9) + | ((f & 0b111) << 6) + | ((g & 0b111) << 3) + | (h & 0b111); + } + + /** 4-bit values: 8 values into 4 bytes. */ + private static int packBits4( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 4 * group; + int word = + packWord4( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 24)); + data.put(outputOffset + 1, (byte) (word >>> 16)); + data.put(outputOffset + 2, (byte) (word >>> 8)); + data.put(outputOffset + 3, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 4 * fullGroups; + int word = + packWord4( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(4 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (24 - 8 * k))); + } + } + + return byteWidth(4 * count); + } + + private static int packWord4(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b1111) << 28) + | ((b & 0b1111) << 24) + | ((c & 0b1111) << 20) + | ((d & 0b1111) << 16) + | ((e & 0b1111) << 12) + | ((f & 0b1111) << 8) + | ((g & 0b1111) << 4) + | (h & 0b1111); + } + + /** 5-bit values: 8 values into 5 bytes. */ + private static int packBits5( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 5 * group; + long word = + packWord5( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 32)); + data.put(outputOffset + 1, (byte) (word >>> 24)); + data.put(outputOffset + 2, (byte) (word >>> 16)); + data.put(outputOffset + 3, (byte) (word >>> 8)); + data.put(outputOffset + 4, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 5 * fullGroups; + long word = + packWord5( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(5 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (32 - 8 * k))); + } + } + + return byteWidth(5 * count); + } + + private static long packWord5(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b11111) << 35) + | ((long) (b & 0b11111) << 30) + | ((long) (c & 0b11111) << 25) + | ((long) (d & 0b11111) << 20) + | ((long) (e & 0b11111) << 15) + | ((long) (f & 0b11111) << 10) + | ((long) (g & 0b11111) << 5) + | (long) (h & 0b11111); + } + + /** 6-bit values: 8 values into 6 bytes. */ + private static int packBits6( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 6 * group; + long word = + packWord6( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 40)); + data.put(outputOffset + 1, (byte) (word >>> 32)); + data.put(outputOffset + 2, (byte) (word >>> 24)); + data.put(outputOffset + 3, (byte) (word >>> 16)); + data.put(outputOffset + 4, (byte) (word >>> 8)); + data.put(outputOffset + 5, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 6 * fullGroups; + long word = + packWord6( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(6 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (40 - 8 * k))); + } + } + + return byteWidth(6 * count); + } + + private static long packWord6(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b111111) << 42) + | ((long) (b & 0b111111) << 36) + | ((long) (c & 0b111111) << 30) + | ((long) (d & 0b111111) << 24) + | ((long) (e & 0b111111) << 18) + | ((long) (f & 0b111111) << 12) + | ((long) (g & 0b111111) << 6) + | (long) (h & 0b111111); + } + + /** 7-bit values: 8 values into 7 bytes. */ + private static int packBits7( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 7 * group; + long word = + packWord7( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 48)); + data.put(outputOffset + 1, (byte) (word >>> 40)); + data.put(outputOffset + 2, (byte) (word >>> 32)); + data.put(outputOffset + 3, (byte) (word >>> 24)); + data.put(outputOffset + 4, (byte) (word >>> 16)); + data.put(outputOffset + 5, (byte) (word >>> 8)); + data.put(outputOffset + 6, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 7 * fullGroups; + long word = + packWord7( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(7 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (48 - 8 * k))); + } + } + + return byteWidth(7 * count); + } + + private static long packWord7(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b1111111) << 49) + | ((long) (b & 0b1111111) << 42) + | ((long) (c & 0b1111111) << 35) + | ((long) (d & 0b1111111) << 28) + | ((long) (e & 0b1111111) << 21) + | ((long) (f & 0b1111111) << 14) + | ((long) (g & 0b1111111) << 7) + | (long) (h & 0b1111111); + } + + // --------------------------------------------------------------------------- + // Specialized unpack: width=1..7 + // --------------------------------------------------------------------------- + + /** 1-bit values: 1 byte into 8 values. */ + private static int unpackBits1( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + group, 1); + output[groupOffset] = (word >>> 7) & 0b1; + output[groupOffset + 1] = (word >>> 6) & 0b1; + output[groupOffset + 2] = (word >>> 5) & 0b1; + output[groupOffset + 3] = (word >>> 4) & 0b1; + output[groupOffset + 4] = (word >>> 3) & 0b1; + output[groupOffset + 5] = (word >>> 2) & 0b1; + output[groupOffset + 6] = (word >>> 1) & 0b1; + output[groupOffset + 7] = word & 0b1; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + fullGroups, byteWidth(remaining)); + unpackRemainder(1, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(count); + } + + /** 2-bit values: 2 bytes into 8 values. */ + private static int unpackBits2( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + 2 * group, 2); + output[groupOffset] = (word >>> 14) & 0b11; + output[groupOffset + 1] = (word >>> 12) & 0b11; + output[groupOffset + 2] = (word >>> 10) & 0b11; + output[groupOffset + 3] = (word >>> 8) & 0b11; + output[groupOffset + 4] = (word >>> 6) & 0b11; + output[groupOffset + 5] = (word >>> 4) & 0b11; + output[groupOffset + 6] = (word >>> 2) & 0b11; + output[groupOffset + 7] = word & 0b11; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 2 * fullGroups, byteWidth(2 * remaining)); + unpackRemainder(2, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(2 * count); + } + + /** 3-bit values: 3 bytes into 8 values. */ + private static int unpackBits3( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + 3 * group, 3); + output[groupOffset] = (word >>> 21) & 0b111; + output[groupOffset + 1] = (word >>> 18) & 0b111; + output[groupOffset + 2] = (word >>> 15) & 0b111; + output[groupOffset + 3] = (word >>> 12) & 0b111; + output[groupOffset + 4] = (word >>> 9) & 0b111; + output[groupOffset + 5] = (word >>> 6) & 0b111; + output[groupOffset + 6] = (word >>> 3) & 0b111; + output[groupOffset + 7] = word & 0b111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 3 * fullGroups, byteWidth(3 * remaining)); + unpackRemainder(3, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(3 * count); + } + + /** 4-bit values: 4 bytes into 8 values. */ + private static int unpackBits4( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 4 * group, 4); + output[groupOffset] = (int) (word >>> 28) & 0b1111; + output[groupOffset + 1] = (int) (word >>> 24) & 0b1111; + output[groupOffset + 2] = (int) (word >>> 20) & 0b1111; + output[groupOffset + 3] = (int) (word >>> 16) & 0b1111; + output[groupOffset + 4] = (int) (word >>> 12) & 0b1111; + output[groupOffset + 5] = (int) (word >>> 8) & 0b1111; + output[groupOffset + 6] = (int) (word >>> 4) & 0b1111; + output[groupOffset + 7] = (int) word & 0b1111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 4 * fullGroups, byteWidth(4 * remaining)); + unpackRemainder(4, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(4 * count); + } + + /** 5-bit values: 5 bytes into 8 values. */ + private static int unpackBits5( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 5 * group, 5); + output[groupOffset] = (int) (word >>> 35) & 0b11111; + output[groupOffset + 1] = (int) (word >>> 30) & 0b11111; + output[groupOffset + 2] = (int) (word >>> 25) & 0b11111; + output[groupOffset + 3] = (int) (word >>> 20) & 0b11111; + output[groupOffset + 4] = (int) (word >>> 15) & 0b11111; + output[groupOffset + 5] = (int) (word >>> 10) & 0b11111; + output[groupOffset + 6] = (int) (word >>> 5) & 0b11111; + output[groupOffset + 7] = (int) word & 0b11111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 5 * fullGroups, byteWidth(5 * remaining)); + unpackRemainder(5, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(5 * count); + } + + /** 6-bit values: 6 bytes into 8 values. */ + private static int unpackBits6( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 6 * group, 6); + output[groupOffset] = (int) (word >>> 42) & 0b111111; + output[groupOffset + 1] = (int) (word >>> 36) & 0b111111; + output[groupOffset + 2] = (int) (word >>> 30) & 0b111111; + output[groupOffset + 3] = (int) (word >>> 24) & 0b111111; + output[groupOffset + 4] = (int) (word >>> 18) & 0b111111; + output[groupOffset + 5] = (int) (word >>> 12) & 0b111111; + output[groupOffset + 6] = (int) (word >>> 6) & 0b111111; + output[groupOffset + 7] = (int) word & 0b111111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 6 * fullGroups, byteWidth(6 * remaining)); + unpackRemainder(6, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(6 * count); + } + + /** 7-bit values: 7 bytes into 8 values. */ + private static int unpackBits7( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 7 * group, 7); + output[groupOffset] = (int) (word >>> 49) & 0b1111111; + output[groupOffset + 1] = (int) (word >>> 42) & 0b1111111; + output[groupOffset + 2] = (int) (word >>> 35) & 0b1111111; + output[groupOffset + 3] = (int) (word >>> 28) & 0b1111111; + output[groupOffset + 4] = (int) (word >>> 21) & 0b1111111; + output[groupOffset + 5] = (int) (word >>> 14) & 0b1111111; + output[groupOffset + 6] = (int) (word >>> 7) & 0b1111111; + output[groupOffset + 7] = (int) word & 0b1111111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 7 * fullGroups, byteWidth(7 * remaining)); + unpackRemainder(7, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(7 * count); + } + + /** + * Unpack {@code count < 8} values of {@code width} bits from {@code word}. + * + * @param width number of bits stored for each value + * @param word a long containing the bytes of the remaining values + * @param output array for unpacked output values + * @param outputOffset starting index to store values in the output array + * @param count number of values to unpack from the word + */ + private static void unpackRemainder( + int width, long word, int[] output, int outputOffset, int count) { + int mask = (1 << width) - 1; + // value bits are stored in the last bytes of the word + int valueBytes = byteWidth(count * width); + // the first value is width bits starting with the most-significant bits of the value bytes + int shift = 8 * valueBytes - width; + for (int i = 0; i < count; i += 1) { + output[outputOffset + i] = (int) ((word >>> shift) & mask); + shift -= width; + } + } + + /** + * Read {@code count} bytes of data into the last {@code count} bytes of {@code word}. + * + *
The first input byte occupies the most significant bits of the last {@code count} bytes and + * the last input byte occupies the least significant bits of the word. The remaining most + * significant bytes of the word are 0. + */ + private static long readWord(ByteBuffer data, int offset, int count) { + long word = 0; + for (int k = 0; k < count; k += 1) { + word = (word << 8) | (data.get(offset + k) & 0xFF); + } + + return word; + } + + /** Returns the number of whole bytes needed to hold {@code bits} bits. */ + private static int byteWidth(int bits) { + return (bits + 7) / 8; + } +} diff --git a/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java b/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java new file mode 100644 index 000000000000..74c5400acecd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import org.apache.curator.shaded.com.google.common.base.Preconditions; + +/** + * Read-only view of a Mumbling compressed bitmap stored in a {@link ByteBuffer}. + * + *
The bitmap is lazy: no decoding is done at construction time. On the first call to {@link + * #isSet}, the PFOR-encoded descriptor array is decoded and used to build an offsets array that + * maps each container index to its absolute byte position in the buffer. This offsets array is the + * only derived state kept by this class. + * + *
Format (all integers unsigned, little-endian): + * + *
Positions beyond the range of any container are always unset. + */ + public boolean isSet(int pos) { + Preconditions.checkArgument(pos >= 0, "Invalid bit position: %s < 0", pos); + int containerIndex = pos >>> 8; + int posInContainer = pos & 0xFF; + + if (containerIndex >= containerCount) { + return false; + } + + int containerStart = offset(containerIndex); + int descriptor = descriptor(containerIndex); + + if (isDense(descriptor)) { + // Dense: 32-byte bitset, MSB of byte 0 is position 0 + int byteIndex = posInContainer >>> 3; + int bitShift = 7 - (posInContainer & 0b111); + return ((data.get(containerStart + byteIndex) >>> bitShift) & 0b1) == 0b1; + + } else { + // Sparse: sorted list of set positions; scan until found or exceeded + for (int i = 0; i < descriptor; i += 1) { + int stored = data.get(containerStart + i) & 0xFF; + if (stored == posInContainer) { + return true; + } + + if (stored > posInContainer) { + return false; + } + } + + return false; + } + } + + private int descriptor(int containerIndex) { + if (null == descriptors) { + decodeDescriptors(); + } + + return descriptors[containerIndex]; + } + + private int offset(int containerIndex) { + if (null == offsets) { + decodeDescriptors(); + } + + return offsets[containerIndex]; + } + + /** + * Decode the descriptor array and produce an array of absolute container offsets in the buffer. + */ + private void decodeDescriptors() { + this.descriptors = new int[containerCount]; + int bytesRead = + PFOREncoding.decode(data, data.position() + HEADER_SIZE, descriptors, 0, containerCount); + + this.offsets = new int[containerCount + 1]; + int firstContainerOffset = data.position() + HEADER_SIZE + bytesRead; + descriptorsToOffsets(firstContainerOffset, descriptors, offsets); + } + + private static boolean isDense(int descriptor) { + return (descriptor & DENSE_CONTAINER_BIT) == DENSE_CONTAINER_BIT; + } + + /** + * Convert an array of lengths into an array of offsets starting at 0. + * + *
For example, descriptorsToOffsets([1, 1, 2]) produces [0, 1, 2, 4]. + * + * @param baseOffset initial offset of the first container + * @param descriptors an array of descriptor bytes + * @param offsets output array of offsets + */ + private static void descriptorsToOffsets(int baseOffset, int[] descriptors, int[] offsets) { + Preconditions.checkArgument( + offsets.length > descriptors.length, + "Cannot decode %s lengths into %s offsets (not enough space)", + descriptors.length, + offsets.length); + + offsets[0] = baseOffset; + for (int i = 0; i < descriptors.length; i += 1) { + if (isDense(descriptors[i])) { + offsets[i + 1] = offsets[i] + 32; + } else { + offsets[i + 1] = offsets[i] + descriptors[i]; + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java b/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java new file mode 100644 index 000000000000..fad8915811a1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import org.apache.curator.shaded.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; + +/** + * Patched Frame of Reference (PFOR) encoding for arrays of unsigned byte values. + * + *
Implements the encoding described in Appendix A of the Mumbling bitmap specification. The + * input array is split into 256-value chunks (the last chunk may be shorter). Each chunk is + * independently encoded using 4 configuration values: + * + *
Each chunk is stored as: + * + *
The buffer's position and limit are not modified. + * + * @param values unsigned byte values to encode + * @param valueOffset starting offset of values to encode + * @param out buffer to write encoded values to + * @param outOffset starting offset in the output bufferß + * @param count number of values to encode + * @return the number of bytes written to the buffer + */ + static int encode(int[] values, int valueOffset, ByteBuffer out, int outOffset, int count) { + // check the buffer's position and limit are compatible with outOffset and count + Preconditions.checkArgument( + outOffset >= out.position(), + "Cannot encode starting at %s to buffer with position %s", + outOffset, + out.position()); + Preconditions.checkArgument( + estimateEncodedSize(count) <= out.limit() - outOffset, + "Cannot encode %s values to buffer with %s remaining space", + count, + out.remaining()); + + int bytesWritten = 0; + int currentOffset = valueOffset; + + while (currentOffset < count) { + int chunkLength = Math.min(CHUNK_SIZE, count - currentOffset); + bytesWritten += + encodeChunk(values, currentOffset, out, outOffset + bytesWritten, chunkLength); + currentOffset += chunkLength; + } + + return bytesWritten; + } + + /** + * Decode to produce unsigned byte values. + * + *
Decodes starting at {@code encoded.position()} and does not modify the input buffer. + * + * @param encoded PFOR-encoded ByteBuffer produced by {@link #encode} + * @param count total number of values to decode + * @return decoded unsigned byte values + */ + static int[] decode(ByteBuffer encoded, int count) { + int[] out = new int[count]; + decode(encoded, encoded.position(), out, 0, count); + return out; + } + + /** + * Decode {@code count} unsigned bytes from a buffer into {@code out}. + * + *
This does not modify the input buffer.
+ *
+ * @param encoded a buffer containing encoded data
+ * @param offset starting offset of encoded values
+ * @param out an output value array
+ * @param outOffset starting offset in the output array
+ * @param count number of values to decode
+ * @return the number of bytes read from the encoded buffer
+ */
+ static int decode(ByteBuffer encoded, int offset, int[] out, int outOffset, int count) {
+ Preconditions.checkArgument(
+ offset >= encoded.position(),
+ "Cannot decode starting at %s from buffer with position %s",
+ offset,
+ encoded.position());
+
+ int bytesRead = 0;
+ int valuesRead = 0;
+
+ while (valuesRead < count) {
+ int chunkSize = Math.min(CHUNK_SIZE, count - valuesRead);
+ bytesRead += decodeChunk(encoded, offset + bytesRead, out, outOffset + valuesRead, chunkSize);
+ valuesRead += chunkSize;
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Encode one chunk into {@code out} starting at absolute position {@code outPos}.
+ *
+ * @param values array containing source values to encode
+ * @param valueOffset starting index of values to encode
+ * @param out an output {@link ByteBuffer}
+ * @param outOffset starting index for output in the out buffer
+ * @param count number of values to encode
+ * @return the number of bytes written to the output buffer
+ */
+ private static int encodeChunk(
+ int[] values, int valueOffset, ByteBuffer out, int outOffset, int count) {
+ Preconditions.checkArgument(count >= 0, "Invalid value count to encode: %s", count);
+ Preconditions.checkArgument(
+ valueOffset + count <= values.length,
+ "Cannot encode %s values starting at %s from int[%s]: not enough values",
+ count,
+ valueOffset,
+ values.length);
+
+ // find base=min(values) for normalization
+ int base = min(values, valueOffset, count);
+
+ // normalize by subtracting base
+ int[] normalized = new int[count];
+ int setBits = 0;
+ int normalizedSetBits = 0;
+ for (int i = 0; i < count; i += 1) {
+ setBits |= values[valueOffset + i];
+ normalized[i] = values[valueOffset + i] - base;
+ normalizedSetBits |= normalized[i];
+ }
+
+ Preconditions.checkArgument(
+ width(setBits) <= 8,
+ "Cannot encode values wider than 8 bits: %s bits needed",
+ width(setBits));
+
+ // Choose b1 to minimize total encoded data size (excluding 3-byte header)
+ int maxWidth = width(normalizedSetBits);
+ Pair This produces the width that results in the smallest total size and the number of exceptions
+ * for that width.
+ *
+ * Larger width is preferred on ties to reduce the number of exceptions.
+ *
+ * @param normalized value array to encode, after normalization
+ * @param length number of values in the array to encode
+ * @param maxWidth the largest bit width of normalized values
+ * @return a {@link Pair} of the chosen width and number of exceptions for that width
+ */
+ private static Pair If length is < 1, the result will be larger than Byte.MAX_VALUE.
+ *
+ * @param values array of values
+ * @param start starting index
+ * @param length number of values to check
+ * @return the min of the values in the array slice
+ */
+ private static int min(int[] values, int start, int length) {
+ // Use min > Byte.MAX_VALUE to signal no min (length < 1)
+ int min = 256;
+ for (int i = start; i < start + length; i += 1) {
+ if (values[i] < min) {
+ min = values[i];
+ }
+ }
+
+ return min;
+ }
+
+ /** Returns the number of bytes required in the worst case to encode {@code valueCount} values. */
+ static int estimateEncodedSize(int valueCount) {
+ // Worst-case per chunk is b1=8: 3-byte header + 1 byte per value. Any other b1 chosen by the
+ // encoder costs <= n bytes of data (otherwise b1=8 would have been selected instead).
+ int numChunks = ceilDiv(valueCount, CHUNK_SIZE);
+ return 3 * numChunks + valueCount;
+ }
+
+ /** Returns the number of bytes required to encode a chunk of values. */
+ private static int encodedSize(int count, int b1, int b2, int excCount) {
+ return 3 + byteWidth(b1 * count) + excCount + byteWidth(b2 * excCount);
+ }
+
+ /** Returns the number of bits required to represent {@code v} (0 for v=0). */
+ static int width(int value) {
+ return 32 - Integer.numberOfLeadingZeros(value);
+ }
+
+ private static int byteWidth(int bits) {
+ return ceilDiv(bits, 8);
+ }
+
+ private static int ceilDiv(int a, int b) {
+ return (a + b - 1) / b;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java b/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java
new file mode 100644
index 000000000000..1ba375adb8bf
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.mumbling;
+
+import java.util.Random;
+
+/** Data generation for {@link PFOREncoding} tests and benchmarks. */
+class PFORRandomData {
+
+ private PFORRandomData() {}
+
+ /** Generates {@code count} values between 0 and {@code maxValue}. */
+ static int[] uniform(Random random, int count, int maxValue) {
+ int[] values = new int[count];
+ for (int i = 0; i < count; i++) {
+ values[i] = random.nextInt(maxValue + 1);
+ }
+
+ return values;
+ }
+
+ /** Generates {@code count} values (0-3) with about {@code excPercent}% exceptions (0-255). */
+ static int[] exceptions(Random random, int count, float excPercent) {
+ int[] values = new int[count];
+ for (int i = 0; i < count; i++) {
+ values[i] = random.nextFloat() < excPercent ? random.nextInt(256) : random.nextInt(4);
+ }
+
+ return values;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java b/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java
new file mode 100644
index 000000000000..31ac59551a87
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.mumbling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.junit.jupiter.api.Test;
+
+class TestMumblingBitmap {
+
+ @Test
+ void testEmptyBitmap() {
+ MumblingBitmap bitmap = bitmap();
+ assertThat(bitmap.cardinality()).isEqualTo(0);
+
+ // all positions beyond the bitmap range are false
+ assertThat(bitmap.isSet(0)).isFalse();
+ assertThat(bitmap.isSet(255)).isFalse();
+ assertThat(bitmap.isSet(256)).isFalse();
+ }
+
+ @Test
+ void testInvalidPosition() {
+ MumblingBitmap bitmap = bitmap();
+ assertThat(bitmap.cardinality()).isEqualTo(0);
+ assertThat(bitmap.isSet(0)).isFalse();
+ assertThatThrownBy(() -> bitmap.isSet(-1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid bit position: -1 < 0");
+ }
+
+ @Test
+ void testEmptySparseContainer() {
+ MumblingBitmap bitmap = bitmap(sparse());
+ assertThat(bitmap.cardinality()).isEqualTo(0);
+ assertThat(bitmap.isSet(0)).isFalse();
+ assertThat(bitmap.isSet(100)).isFalse();
+ assertThat(bitmap.isSet(255)).isFalse();
+ }
+
+ @Test
+ void testSparseContainerSetPositions() {
+ MumblingBitmap bitmap = bitmap(sparse(0, 5, 100, 255));
+ assertThat(bitmap.cardinality()).isEqualTo(4);
+
+ assertThat(bitmap.isSet(0)).isTrue();
+ assertThat(bitmap.isSet(5)).isTrue();
+ assertThat(bitmap.isSet(100)).isTrue();
+ assertThat(bitmap.isSet(255)).isTrue();
+
+ assertThat(bitmap.isSet(1)).isFalse();
+ assertThat(bitmap.isSet(4)).isFalse();
+ assertThat(bitmap.isSet(6)).isFalse();
+ assertThat(bitmap.isSet(99)).isFalse();
+ assertThat(bitmap.isSet(101)).isFalse();
+ assertThat(bitmap.isSet(254)).isFalse();
+ assertThat(bitmap.isSet(256)).isFalse();
+ }
+
+ @Test
+ void testFullSparseContainer() {
+ int[] positions = new int[31];
+ for (int i = 0; i < 31; i += 1) {
+ positions[i] = i * 8; // 0, 8, 16, ..., 240
+ }
+
+ MumblingBitmap bitmap = bitmap(sparse(positions));
+ assertThat(bitmap.cardinality()).isEqualTo(31);
+
+ for (int p : positions) {
+ assertThat(bitmap.isSet(p)).isTrue();
+ }
+
+ assertThat(bitmap.isSet(1)).isFalse();
+ assertThat(bitmap.isSet(7)).isFalse();
+ assertThat(bitmap.isSet(255)).isFalse();
+ }
+
+ @Test
+ void testFullDenseContainer() {
+ byte[] container = new byte[32];
+ Arrays.fill(container, (byte) 0xFF);
+
+ MumblingBitmap bitmap = bitmap(dense(container));
+ assertThat(bitmap.cardinality()).isEqualTo(256);
+
+ for (int i = 0; i < 256; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+
+ assertThat(bitmap.isSet(256)).isFalse();
+ }
+
+ // Example 1: positions 0-31: `FF FF FF FF 00 ... 00`
+ @Test
+ void testDenseSpecExample1() {
+ byte[] container = new byte[32];
+ container[0] = (byte) 0xFF;
+ container[1] = (byte) 0xFF;
+ container[2] = (byte) 0xFF;
+ container[3] = (byte) 0xFF;
+ MumblingBitmap bitmap = bitmap(dense(container));
+ assertThat(bitmap.cardinality()).isEqualTo(32);
+
+ for (int i = 0; i <= 31; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+
+ assertThat(bitmap.isSet(32)).isFalse();
+ assertThat(bitmap.isSet(255)).isFalse();
+ }
+
+ // Example 2: positions 0-32: `FF FF FF FF 80 00 ... 00`
+ @Test
+ void testDenseSpecExample2() {
+ byte[] container = new byte[32];
+ container[0] = (byte) 0xFF;
+ container[1] = (byte) 0xFF;
+ container[2] = (byte) 0xFF;
+ container[3] = (byte) 0xFF;
+ container[4] = (byte) 0x80;
+
+ MumblingBitmap bitmap = bitmap(dense(container));
+ assertThat(bitmap.cardinality()).isEqualTo(33);
+
+ for (int i = 0; i <= 32; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+
+ assertThat(bitmap.isSet(33)).isFalse();
+ assertThat(bitmap.isSet(255)).isFalse();
+ }
+
+ // Example 3: positions 0-15 and 240-255: `FF FF 00 ... 00 FF FF`
+ @Test
+ void testDenseSpecExample3() {
+ byte[] container = new byte[32];
+ container[0] = (byte) 0xFF;
+ container[1] = (byte) 0xFF;
+ container[30] = (byte) 0xFF;
+ container[31] = (byte) 0xFF;
+
+ MumblingBitmap bitmap = bitmap(dense(container));
+ assertThat(bitmap.cardinality()).isEqualTo(32);
+
+ for (int i = 0; i <= 15; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+ for (int i = 240; i <= 255; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+ assertThat(bitmap.isSet(16)).isFalse();
+ assertThat(bitmap.isSet(239)).isFalse();
+ assertThat(bitmap.isSet(256)).isFalse();
+ }
+
+ // Example 4: even positions 0, 2, 4, ...: `AA AA ... AA AA`
+ @Test
+ void testDenseSpecExample4() {
+ byte[] container = new byte[32];
+ Arrays.fill(container, (byte) 0xAA);
+
+ MumblingBitmap bitmap = bitmap(dense(container));
+ assertThat(bitmap.cardinality()).isEqualTo(128);
+
+ for (int i = 0; i < 256; i += 1) {
+ assertThat(bitmap.isSet(i)).isEqualTo(i % 2 == 0);
+ }
+
+ assertThat(bitmap.isSet(256)).isFalse();
+ }
+
+ @Test
+ void testMultipleContainers() {
+ MumblingBitmap bitmap = bitmap(sparse(5), sparse(), sparse(10));
+ assertThat(bitmap.cardinality()).isEqualTo(2);
+
+ assertThat(bitmap.isSet(5)).isTrue(); // container 0, pos 5
+ assertThat(bitmap.isSet(256)).isFalse(); // container 1
+ assertThat(bitmap.isSet(522)).isTrue(); // container 2, pos 10
+
+ assertThat(bitmap.isSet(512)).isFalse();
+ assertThat(bitmap.isSet(4)).isFalse();
+ assertThat(bitmap.isSet(265)).isFalse();
+ assertThat(bitmap.isSet(267)).isFalse();
+ }
+
+ @Test
+ void testMixedSparseAndDense() {
+ byte[] denseContainer = new byte[32];
+ denseContainer[0] = (byte) 0xFF;
+ denseContainer[1] = (byte) 0xFF;
+ denseContainer[2] = (byte) 0xFF;
+ denseContainer[3] = (byte) 0xFF;
+
+ MumblingBitmap bitmap = bitmap(dense(denseContainer), sparse(1));
+ assertThat(bitmap.cardinality()).isEqualTo(33);
+
+ for (int i = 0; i < 32; i += 1) {
+ assertThat(bitmap.isSet(i)).isTrue();
+ }
+ assertThat(bitmap.isSet(32)).isFalse();
+
+ assertThat(bitmap.isSet(256)).isFalse();
+ assertThat(bitmap.isSet(257)).isTrue(); // container 1, pos 1
+ assertThat(bitmap.isSet(258)).isFalse();
+ }
+
+ @Test
+ void testBufferWithOffset() {
+ // Prepend 4 bytes of garbage before the actual bitmap data
+ ByteBuffer buffer = build(sparse(42));
+ byte[] rawBytes = new byte[buffer.remaining()];
+ buffer.get(rawBytes);
+
+ ByteBuffer padded = ByteBuffer.allocate(4 + rawBytes.length);
+ padded.position(4);
+ padded.put(rawBytes);
+ padded.position(4); // position the buffer at the start of bitmap data
+
+ MumblingBitmap bitmap = new MumblingBitmap(padded);
+ assertThat(bitmap.cardinality()).isEqualTo(1);
+
+ assertThat(bitmap.isSet(41)).isFalse();
+ assertThat(bitmap.isSet(42)).isTrue();
+ assertThat(bitmap.isSet(43)).isFalse();
+ }
+
+ private static Container sparse(int... positions) {
+ byte[] bytes = new byte[positions.length];
+ for (int i = 0; i < positions.length; i += 1) {
+ if (i > 0) {
+ Preconditions.checkArgument(
+ positions[i] < 256, "Invalid position in container: %s", positions[i]);
+ Preconditions.checkArgument(
+ positions[i] > positions[i - 1],
+ "Invalid sparse container: pos %s=%s >= pos %s=%s",
+ i - 1,
+ positions[i - 1],
+ i,
+ positions[i]);
+ }
+
+ bytes[i] = (byte) positions[i];
+ }
+
+ return new Container(bytes);
+ }
+
+ /** Descriptor + bytes for a dense container. */
+ private static Container dense(byte[] container) {
+ Preconditions.checkArgument(container.length == 32, "Dense container must be 32 bytes");
+ return new Container(container);
+ }
+
+ private static class Container {
+ private final byte[] bytes;
+ private final int descriptor;
+ private final int cardinality;
+
+ Container(byte[] bytes) {
+ this.bytes = bytes;
+ this.descriptor = bytes.length;
+ this.cardinality = cardinality(bytes);
+ }
+
+ private static int cardinality(byte[] bytes) {
+ if (bytes.length < 32) {
+ return bytes.length;
+ } else if (bytes.length == 32) {
+ int setBits = 0;
+ for (byte b : bytes) {
+ setBits += Integer.bitCount(b & 0xFF);
+ }
+
+ Preconditions.checkArgument(
+ setBits > 31, "Invalid dense container: %s values should be sparse", setBits);
+
+ return setBits;
+ } else {
+ throw new IllegalArgumentException("Invalid container: longer than 32 bytes");
+ }
+ }
+ }
+
+ private static MumblingBitmap bitmap(Container... containers) {
+ return new MumblingBitmap(build(containers));
+ }
+
+ private static ByteBuffer build(Container... containers) {
+ Preconditions.checkArgument(
+ containers.length <= 8192, "Invalid container count (max 8192): %s", containers.length);
+
+ int[] descriptors = new int[containers.length];
+ int cardinality = 0;
+ int sizeEstimate = 6;
+ for (int i = 0; i < containers.length; i += 1) {
+ descriptors[i] = containers[i].descriptor;
+ cardinality += containers[i].cardinality;
+ sizeEstimate += containers[i].bytes.length;
+ }
+
+ Preconditions.checkArgument(
+ cardinality <= 2_097_152, "Invalid cardinality (max 2,097,152): %s", cardinality);
+
+ sizeEstimate += PFOREncoding.estimateEncodedSize(containers.length);
+ ByteBuffer buf = ByteBuffer.allocate(sizeEstimate);
+
+ // header: version (1 byte), cardinality (3 bytes LE), container count (2 bytes LE)
+ buf.put(0, (byte) 1);
+ buf.put(1, (byte) (cardinality & 0xFF));
+ buf.put(2, (byte) ((cardinality >>> 8) & 0xFF));
+ buf.put(3, (byte) ((cardinality >>> 16) & 0xFF));
+ buf.put(4, (byte) (containers.length & 0xFF));
+ buf.put(5, (byte) ((containers.length >>> 8) & 0xFF));
+
+ // write encoded descriptors
+ int descriptorArraySize =
+ PFOREncoding.encode(descriptors, 0, buf, buf.position() + 6, descriptors.length);
+
+ // copy container bytes into the array
+ int containerOffset = 6 + descriptorArraySize;
+ for (Container spec : containers) {
+ buf.put(containerOffset, spec.bytes);
+ containerOffset += spec.bytes.length;
+ }
+
+ // the offset after the last container is the length
+ buf.limit(containerOffset);
+
+ return buf;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java
new file mode 100644
index 000000000000..20193fbd23fd
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.mumbling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.iceberg.util.ByteBuffers;
+import org.junit.jupiter.api.Test;
+
+public class TestPFOREncoding {
+
+ // Example 1: 256 values, b1=0, m=0, all=0: `00 00 00`
+ @Test
+ public void testExample1AllZeros() {
+ int[] values = new int[256];
+ Arrays.fill(values, 0);
+
+ ByteBuffer encoded = PFOREncoding.encode(values, values.length);
+ assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(bytes(0x00, 0x00, 0x00));
+
+ int[] decoded = PFOREncoding.decode(encoded, 256);
+ assertThat(decoded).isEqualTo(values);
+ }
+
+ // Example 2: 51 values, b1=0, m=5, all=5: `00 00 05`
+ @Test
+ public void testExample2AllFives() {
+ int[] values = new int[51];
+ Arrays.fill(values, 5);
+
+ ByteBuffer encoded = PFOREncoding.encode(values, values.length);
+ assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(bytes(0x00, 0x00, 0x05));
+
+ int[] decoded = PFOREncoding.decode(encoded, 51);
+ assertThat(decoded).isEqualTo(values);
+ }
+
+ // Example 3: only exception values, b1=0, b2=8: `80 02 00 04 07 FF FE`
+ @Test
+ public void testExample3SparseExceptions() {
+ int[] values = {0, 0, 0, 0, 0xFF, 0, 0, 0xFE};
+
+ byte[] expected =
+ bytes(0x80, 0x02, 0x00, /* offsets */ 0x04, 0x07, /* exceptions */ 0xFF, 0xFE);
+
+ ByteBuffer encoded = PFOREncoding.encode(values, values.length);
+ assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected);
+
+ int[] decoded = PFOREncoding.decode(encoded, values.length);
+ assertThat(decoded).isEqualTo(values);
+ }
+
+ // Example 4: [6, 7, 8], no exceptions, b1=2, m=6: `02 00 06 18`
+ @Test
+ public void testExample4TwoBitsNoExceptions() {
+ int[] values = {6, 7, 8};
+
+ byte[] expected = bytes(0x02, 0x00, 0x06, 0x18);
+
+ ByteBuffer encoded = PFOREncoding.encode(values, values.length);
+ assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected);
+
+ int[] decoded = PFOREncoding.decode(encoded, values.length);
+ assertThat(decoded).isEqualTo(values);
+ }
+
+ // Example 5: [6, 34, 8, 7], b1=2, b2=3, m=6, 1 exception: `32 01 06 09 01 E0`
+ @Test
+ public void testExample5() {
+ int[] values = {6, 34, 8, 7};
+
+ // impl prefers larger widths when the storage is the same size
+ byte[] expected = bytes(0x05, 0x00, 0x06, 0x07, 0x04, 0x10);
+ byte[] fromSpec = bytes(0x32, 0x01, 0x06, 0x09, 0x01, 0xE0);
+
+ ByteBuffer encoded = PFOREncoding.encode(values, values.length);
+ assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected);
+
+ int[] decoded = PFOREncoding.decode(encoded, values.length);
+ assertThat(decoded).isEqualTo(values);
+
+ int[] decodedFromSpec = PFOREncoding.decode(ByteBuffer.wrap(fromSpec), values.length);
+ assertThat(decodedFromSpec).isEqualTo(values);
+ }
+
+ private static byte[] bytes(int... values) {
+ byte[] result = new byte[values.length];
+ for (int i = 0; i < values.length; i++) {
+ result[i] = (byte) values[i];
+ }
+ return result;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java
new file mode 100644
index 000000000000..5f36ab5ce99e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.mumbling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** Randomized round-trip tests for {@link PFOREncoding}. */
+public class TestPFOREncodingRandom {
+ private static final Random RANDOM = new Random(3546521684L);
+
+ static Stream