diff --git a/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java b/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java new file mode 100644 index 000000000000..418a032f5e75 --- /dev/null +++ b/core/src/jmh/java/org/apache/iceberg/mumbling/PFOREncodingBenchmark.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import me.lemire.integercompression.FastPFOR128; +import me.lemire.integercompression.IntWrapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * A benchmark that evaluates the performance of {@link PFOREncoding} and compares it with + * JavaFastPFOR. + * + *

Two data shapes are exercised: + * + *

+ * + *

To run this benchmark: + * ./gradlew :iceberg-core:jmh + * -PjmhIncludeRegex=PFOREncodingBenchmark + * -PjmhOutputPath=benchmark/pfor-encoding-benchmark.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.Throughput) +@Timeout(time = 5, timeUnit = TimeUnit.MINUTES) +public class PFOREncodingBenchmark { + + // Iceberg PFOR input arrays + private int[] descriptorValues; + private int[] uniformValues; + + // Pre-encoded buffers for decode benchmarks + private ByteBuffer descriptorEncoded; + private ByteBuffer uniformEncoded; + + // Reusable encode buffer (avoids allocation in the encode hot path) + private ByteBuffer encodeBuffer; + + // Reusable decode output (avoids allocation in the decode hot path) + private int[] decodeOutput; + + // JavaFastPFOR compressed arrays (pre-encoded for decode benchmarks) + private int[] descriptorFastPFOREncoded; + private int[] uniformFastPFOREncoded; + + // Reusable output buffers for JavaFastPFOR (avoids allocation in hot path) + private int[] fastPFOROutputBuffer; + + @Setup + public void setupBenchmark() { + Random random = new Random(1938745); + + // 256-value descriptor-like data: mostly [0,31] with ~5% [0,255] outliers + descriptorValues = PFORRandomData.exceptions(random, 256, 0.05f); + + // 256-value uniform byte data + uniformValues = PFORRandomData.uniform(random, 256, 255); + + // Reusable encode buffer: worst-case for a single 256-value chunk + encodeBuffer = ByteBuffer.allocate(3 + 256); + + // Reusable decode output: one entry per value in a chunk + decodeOutput = new int[256]; + + // Pre-encode for decode benchmarks + descriptorEncoded = PFOREncoding.encode(descriptorValues, descriptorValues.length); + uniformEncoded = PFOREncoding.encode(uniformValues, uniformValues.length); + + // Pre-encode with JavaFastPFOR for decode benchmarks + FastPFOR128 codec = new FastPFOR128(); + descriptorFastPFOREncoded = fastPFOREncode(codec, descriptorValues); + uniformFastPFOREncoded = fastPFOREncode(codec, uniformValues); + + // Output buffer large enough for any 256-value decoded result + fastPFOROutputBuffer = new int[256 + 1024]; + } + + // --------------------------------------------------------------------------- + // Iceberg PFOR — descriptor data shape + // --------------------------------------------------------------------------- + + @Benchmark + @Threads(1) + public void encodeDescriptorIceberg(Blackhole blackhole) { + blackhole.consume( + PFOREncoding.encode( + descriptorValues, 0, encodeBuffer, encodeBuffer.position(), descriptorValues.length)); + } + + @Benchmark + @Threads(1) + public void decodeDescriptorIceberg(Blackhole blackhole) { + PFOREncoding.decode( + descriptorEncoded, descriptorEncoded.position(), decodeOutput, 0, descriptorValues.length); + blackhole.consume(decodeOutput); + } + + // --------------------------------------------------------------------------- + // Iceberg PFOR — uniform byte data shape + // --------------------------------------------------------------------------- + + @Benchmark + @Threads(1) + public void encodeUniformIceberg(Blackhole blackhole) { + blackhole.consume( + PFOREncoding.encode( + uniformValues, 0, encodeBuffer, encodeBuffer.position(), uniformValues.length)); + } + + @Benchmark + @Threads(1) + public void decodeUniformIceberg(Blackhole blackhole) { + PFOREncoding.decode( + uniformEncoded, uniformEncoded.position(), decodeOutput, 0, uniformValues.length); + blackhole.consume(decodeOutput); + } + + // --------------------------------------------------------------------------- + // JavaFastPFOR — descriptor data shape + // --------------------------------------------------------------------------- + + @Benchmark + @Threads(1) + public void encodeDescriptorFastPFOR(Blackhole blackhole) { + FastPFOR128 codec = new FastPFOR128(); + blackhole.consume(fastPFOREncode(codec, descriptorValues)); + } + + @Benchmark + @Threads(1) + public void decodeDescriptorFastPFOR(Blackhole blackhole) { + FastPFOR128 codec = new FastPFOR128(); + blackhole.consume(fastPFORDecode(codec, descriptorFastPFOREncoded, fastPFOROutputBuffer)); + } + + // --------------------------------------------------------------------------- + // JavaFastPFOR — uniform byte data shape + // --------------------------------------------------------------------------- + + @Benchmark + @Threads(1) + public void encodeUniformFastPFOR(Blackhole blackhole) { + FastPFOR128 codec = new FastPFOR128(); + blackhole.consume(fastPFOREncode(codec, uniformValues)); + } + + @Benchmark + @Threads(1) + public void decodeUniformFastPFOR(Blackhole blackhole) { + FastPFOR128 codec = new FastPFOR128(); + blackhole.consume(fastPFORDecode(codec, uniformFastPFOREncoded, fastPFOROutputBuffer)); + } + + // --------------------------------------------------------------------------- + // JavaFastPFOR helpers + // --------------------------------------------------------------------------- + + private static int[] fastPFOREncode(FastPFOR128 codec, int[] values) { + int[] output = new int[values.length + 1024]; + IntWrapper inPos = new IntWrapper(0); + IntWrapper outPos = new IntWrapper(0); + codec.compress(values, inPos, values.length, output, outPos); + int[] result = new int[outPos.get()]; + System.arraycopy(output, 0, result, 0, outPos.get()); + return result; + } + + private static int[] fastPFORDecode(FastPFOR128 codec, int[] encoded, int[] output) { + IntWrapper inPos = new IntWrapper(0); + IntWrapper outPos = new IntWrapper(0); + codec.uncompress(encoded, inPos, encoded.length, output, outPos); + return output; + } +} diff --git a/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java b/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java new file mode 100644 index 000000000000..080d9556504b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/mumbling/BitPacking.java @@ -0,0 +1,768 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; + +/** + * Bit packing and unpacking for values of 1–7 bits. + * + *

The least-significant bits of each value are packed with the first value occupying the most + * significant bits of the output. Output is padded to the nearest byte with 0s. For example, + * packing values [0b11, 0b10, 0b01] with width 2 produces 0b11100100. + */ +class BitPacking { + + private BitPacking() {} + + /** + * Packs {@code width} least-significant bits of {@code count} values into a data buffer. + * + *

Output is padded to the nearest byte with 0s. + * + *

Values are written to the buffer's underlying storage, but the buffer's position and limit + * are not modified. + * + * @param width number of bits of each value to pack + * @param values array containing source values to pack + * @param valueOffset starting index of values to pack + * @param data an output {@link ByteBuffer} + * @param dataOffset starting index for output in the data buffer + * @param count the number of values to pack + * @return the number of bytes written to the data buffer + */ + static int packBits( + int width, int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + return switch (width) { + case 0 -> 0; + case 1 -> packBits1(values, valueOffset, data, dataOffset, count); + case 2 -> packBits2(values, valueOffset, data, dataOffset, count); + case 3 -> packBits3(values, valueOffset, data, dataOffset, count); + case 4 -> packBits4(values, valueOffset, data, dataOffset, count); + case 5 -> packBits5(values, valueOffset, data, dataOffset, count); + case 6 -> packBits6(values, valueOffset, data, dataOffset, count); + case 7 -> packBits7(values, valueOffset, data, dataOffset, count); + case 8 -> copyAsBytes(values, valueOffset, data, dataOffset, count); + default -> throw new IllegalArgumentException("Invalid bit width: " + width); + }; + } + + /** + * Unpacks {@code count} values from a data buffer containing {@code width} bits of each value. + * + *

Unused bits in the last input byte are ignored. + * + *

The input buffer's position and limit are not modified. + * + * @param width number of bits of each value to unpack + * @param data an input {@link ByteBuffer} + * @param dataOffset starting index for input in the data buffer + * @param output array for unpacked output values + * @param outputOffset starting index to store values in the output array + * @param count the number of values to unpack + * @return the number of bytes read from the data buffer + */ + static int unpackBits( + int width, ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + return switch (width) { + case 0 -> 0; + case 1 -> unpackBits1(data, dataOffset, output, outputOffset, count); + case 2 -> unpackBits2(data, dataOffset, output, outputOffset, count); + case 3 -> unpackBits3(data, dataOffset, output, outputOffset, count); + case 4 -> unpackBits4(data, dataOffset, output, outputOffset, count); + case 5 -> unpackBits5(data, dataOffset, output, outputOffset, count); + case 6 -> unpackBits6(data, dataOffset, output, outputOffset, count); + case 7 -> unpackBits7(data, dataOffset, output, outputOffset, count); + case 8 -> copyAsBytes(data, dataOffset, output, outputOffset, count); + default -> throw new IllegalArgumentException("Invalid bit width: " + width); + }; + } + + /** + * Copy byte values from src into a buffer. + * + *

Values must be bytes stored in an integer array. The 3 most significant bytes of the values + * are ignored. + * + * @param source array of source values to copy + * @param sourceOffset starting offset of values to copy + * @param out output buffer values will be copied to + * @param outOffset starting offset in the output buffer + * @param count number of values (bytes) to copy + * @return the number of bytes written to the buffer + */ + private static int copyAsBytes( + int[] source, int sourceOffset, ByteBuffer out, int outOffset, int count) { + for (int i = 0; i < count; i += 1) { + out.put(outOffset + i, (byte) source[sourceOffset + i]); + } + + return count; + } + + /** + * Copy byte values from a buffer into an int[]. + * + * @param data buffer of source values to copyß + * @param dataOffset starting offset in the input buffer + * @param out output array values will be copied to + * @param outOffset starting offset in the output buffer + * @param count number of values (bytes) to copy + * @return the number of bytes read from the buffer + */ + private static int copyAsBytes( + ByteBuffer data, int dataOffset, int[] out, int outOffset, int count) { + for (int i = 0; i < count; i += 1) { + out[outOffset + i] = (data.get(dataOffset + i) & 0xFF); + } + + return count; + } + + // --------------------------------------------------------------------------- + // Specialized pack: width=1..7 + // Each method packs 8 values into exactly width bytes (full groups), plus a + // partial group of remaining < 8 values in ceil(remaining*width/8) bytes. + // --------------------------------------------------------------------------- + + /** 1-bit values: 8 values into 1 byte. */ + private static int packBits1( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + group; + int word = + packWord1( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + fullGroups; + int word = + packWord1( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + data.put(outputOffset, (byte) word); + } + + return byteWidth(count); + } + + private static int packWord1(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b1) << 7) + | ((b & 0b1) << 6) + | ((c & 0b1) << 5) + | ((d & 0b1) << 4) + | ((e & 0b1) << 3) + | ((f & 0b1) << 2) + | ((g & 0b1) << 1) + | (h & 0b1); + } + + /** 2-bit values: 8 values into 2 bytes. */ + private static int packBits2( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 2 * group; + int word = + packWord2( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 8)); + data.put(outputOffset + 1, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 2 * fullGroups; + int word = + packWord2( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + data.put(outputOffset, (byte) (word >>> 8)); + if (remaining > 4) { + data.put(outputOffset + 1, (byte) word); + } + } + + return byteWidth(2 * count); + } + + private static int packWord2(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b11) << 14) + | ((b & 0b11) << 12) + | ((c & 0b11) << 10) + | ((d & 0b11) << 8) + | ((e & 0b11) << 6) + | ((f & 0b11) << 4) + | ((g & 0b11) << 2) + | (h & 0b11); + } + + /** 3-bit values: 8 values into 3 bytes. */ + private static int packBits3( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 3 * group; + int word = + packWord3( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 16)); + data.put(outputOffset + 1, (byte) (word >>> 8)); + data.put(outputOffset + 2, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 3 * fullGroups; + int word = + packWord3( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(3 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (16 - 8 * k))); + } + } + + return byteWidth(3 * count); + } + + private static int packWord3(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b111) << 21) + | ((b & 0b111) << 18) + | ((c & 0b111) << 15) + | ((d & 0b111) << 12) + | ((e & 0b111) << 9) + | ((f & 0b111) << 6) + | ((g & 0b111) << 3) + | (h & 0b111); + } + + /** 4-bit values: 8 values into 4 bytes. */ + private static int packBits4( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 4 * group; + int word = + packWord4( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 24)); + data.put(outputOffset + 1, (byte) (word >>> 16)); + data.put(outputOffset + 2, (byte) (word >>> 8)); + data.put(outputOffset + 3, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 4 * fullGroups; + int word = + packWord4( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(4 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (24 - 8 * k))); + } + } + + return byteWidth(4 * count); + } + + private static int packWord4(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((a & 0b1111) << 28) + | ((b & 0b1111) << 24) + | ((c & 0b1111) << 20) + | ((d & 0b1111) << 16) + | ((e & 0b1111) << 12) + | ((f & 0b1111) << 8) + | ((g & 0b1111) << 4) + | (h & 0b1111); + } + + /** 5-bit values: 8 values into 5 bytes. */ + private static int packBits5( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 5 * group; + long word = + packWord5( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 32)); + data.put(outputOffset + 1, (byte) (word >>> 24)); + data.put(outputOffset + 2, (byte) (word >>> 16)); + data.put(outputOffset + 3, (byte) (word >>> 8)); + data.put(outputOffset + 4, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 5 * fullGroups; + long word = + packWord5( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(5 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (32 - 8 * k))); + } + } + + return byteWidth(5 * count); + } + + private static long packWord5(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b11111) << 35) + | ((long) (b & 0b11111) << 30) + | ((long) (c & 0b11111) << 25) + | ((long) (d & 0b11111) << 20) + | ((long) (e & 0b11111) << 15) + | ((long) (f & 0b11111) << 10) + | ((long) (g & 0b11111) << 5) + | (long) (h & 0b11111); + } + + /** 6-bit values: 8 values into 6 bytes. */ + private static int packBits6( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 6 * group; + long word = + packWord6( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 40)); + data.put(outputOffset + 1, (byte) (word >>> 32)); + data.put(outputOffset + 2, (byte) (word >>> 24)); + data.put(outputOffset + 3, (byte) (word >>> 16)); + data.put(outputOffset + 4, (byte) (word >>> 8)); + data.put(outputOffset + 5, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 6 * fullGroups; + long word = + packWord6( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(6 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (40 - 8 * k))); + } + } + + return byteWidth(6 * count); + } + + private static long packWord6(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b111111) << 42) + | ((long) (b & 0b111111) << 36) + | ((long) (c & 0b111111) << 30) + | ((long) (d & 0b111111) << 24) + | ((long) (e & 0b111111) << 18) + | ((long) (f & 0b111111) << 12) + | ((long) (g & 0b111111) << 6) + | (long) (h & 0b111111); + } + + /** 7-bit values: 8 values into 7 bytes. */ + private static int packBits7( + int[] values, int valueOffset, ByteBuffer data, int dataOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = valueOffset + 8 * group; + int outputOffset = dataOffset + 7 * group; + long word = + packWord7( + values[groupOffset], + values[groupOffset + 1], + values[groupOffset + 2], + values[groupOffset + 3], + values[groupOffset + 4], + values[groupOffset + 5], + values[groupOffset + 6], + values[groupOffset + 7]); + data.put(outputOffset, (byte) (word >>> 48)); + data.put(outputOffset + 1, (byte) (word >>> 40)); + data.put(outputOffset + 2, (byte) (word >>> 32)); + data.put(outputOffset + 3, (byte) (word >>> 24)); + data.put(outputOffset + 4, (byte) (word >>> 16)); + data.put(outputOffset + 5, (byte) (word >>> 8)); + data.put(outputOffset + 6, (byte) word); + } + + int remaining = count % 8; + if (remaining > 0) { + int groupOffset = valueOffset + 8 * fullGroups; + int outputOffset = dataOffset + 7 * fullGroups; + long word = + packWord7( + values[groupOffset], + remaining > 1 ? values[groupOffset + 1] : 0, + remaining > 2 ? values[groupOffset + 2] : 0, + remaining > 3 ? values[groupOffset + 3] : 0, + remaining > 4 ? values[groupOffset + 4] : 0, + remaining > 5 ? values[groupOffset + 5] : 0, + remaining > 6 ? values[groupOffset + 6] : 0, + 0); + int byteCount = byteWidth(7 * remaining); + for (int k = 0; k < byteCount; k += 1) { + data.put(outputOffset + k, (byte) (word >>> (48 - 8 * k))); + } + } + + return byteWidth(7 * count); + } + + private static long packWord7(int a, int b, int c, int d, int e, int f, int g, int h) { + return ((long) (a & 0b1111111) << 49) + | ((long) (b & 0b1111111) << 42) + | ((long) (c & 0b1111111) << 35) + | ((long) (d & 0b1111111) << 28) + | ((long) (e & 0b1111111) << 21) + | ((long) (f & 0b1111111) << 14) + | ((long) (g & 0b1111111) << 7) + | (long) (h & 0b1111111); + } + + // --------------------------------------------------------------------------- + // Specialized unpack: width=1..7 + // --------------------------------------------------------------------------- + + /** 1-bit values: 1 byte into 8 values. */ + private static int unpackBits1( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + group, 1); + output[groupOffset] = (word >>> 7) & 0b1; + output[groupOffset + 1] = (word >>> 6) & 0b1; + output[groupOffset + 2] = (word >>> 5) & 0b1; + output[groupOffset + 3] = (word >>> 4) & 0b1; + output[groupOffset + 4] = (word >>> 3) & 0b1; + output[groupOffset + 5] = (word >>> 2) & 0b1; + output[groupOffset + 6] = (word >>> 1) & 0b1; + output[groupOffset + 7] = word & 0b1; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + fullGroups, byteWidth(remaining)); + unpackRemainder(1, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(count); + } + + /** 2-bit values: 2 bytes into 8 values. */ + private static int unpackBits2( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + 2 * group, 2); + output[groupOffset] = (word >>> 14) & 0b11; + output[groupOffset + 1] = (word >>> 12) & 0b11; + output[groupOffset + 2] = (word >>> 10) & 0b11; + output[groupOffset + 3] = (word >>> 8) & 0b11; + output[groupOffset + 4] = (word >>> 6) & 0b11; + output[groupOffset + 5] = (word >>> 4) & 0b11; + output[groupOffset + 6] = (word >>> 2) & 0b11; + output[groupOffset + 7] = word & 0b11; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 2 * fullGroups, byteWidth(2 * remaining)); + unpackRemainder(2, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(2 * count); + } + + /** 3-bit values: 3 bytes into 8 values. */ + private static int unpackBits3( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + int word = (int) readWord(data, dataOffset + 3 * group, 3); + output[groupOffset] = (word >>> 21) & 0b111; + output[groupOffset + 1] = (word >>> 18) & 0b111; + output[groupOffset + 2] = (word >>> 15) & 0b111; + output[groupOffset + 3] = (word >>> 12) & 0b111; + output[groupOffset + 4] = (word >>> 9) & 0b111; + output[groupOffset + 5] = (word >>> 6) & 0b111; + output[groupOffset + 6] = (word >>> 3) & 0b111; + output[groupOffset + 7] = word & 0b111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 3 * fullGroups, byteWidth(3 * remaining)); + unpackRemainder(3, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(3 * count); + } + + /** 4-bit values: 4 bytes into 8 values. */ + private static int unpackBits4( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 4 * group, 4); + output[groupOffset] = (int) (word >>> 28) & 0b1111; + output[groupOffset + 1] = (int) (word >>> 24) & 0b1111; + output[groupOffset + 2] = (int) (word >>> 20) & 0b1111; + output[groupOffset + 3] = (int) (word >>> 16) & 0b1111; + output[groupOffset + 4] = (int) (word >>> 12) & 0b1111; + output[groupOffset + 5] = (int) (word >>> 8) & 0b1111; + output[groupOffset + 6] = (int) (word >>> 4) & 0b1111; + output[groupOffset + 7] = (int) word & 0b1111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 4 * fullGroups, byteWidth(4 * remaining)); + unpackRemainder(4, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(4 * count); + } + + /** 5-bit values: 5 bytes into 8 values. */ + private static int unpackBits5( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 5 * group, 5); + output[groupOffset] = (int) (word >>> 35) & 0b11111; + output[groupOffset + 1] = (int) (word >>> 30) & 0b11111; + output[groupOffset + 2] = (int) (word >>> 25) & 0b11111; + output[groupOffset + 3] = (int) (word >>> 20) & 0b11111; + output[groupOffset + 4] = (int) (word >>> 15) & 0b11111; + output[groupOffset + 5] = (int) (word >>> 10) & 0b11111; + output[groupOffset + 6] = (int) (word >>> 5) & 0b11111; + output[groupOffset + 7] = (int) word & 0b11111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 5 * fullGroups, byteWidth(5 * remaining)); + unpackRemainder(5, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(5 * count); + } + + /** 6-bit values: 6 bytes into 8 values. */ + private static int unpackBits6( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 6 * group, 6); + output[groupOffset] = (int) (word >>> 42) & 0b111111; + output[groupOffset + 1] = (int) (word >>> 36) & 0b111111; + output[groupOffset + 2] = (int) (word >>> 30) & 0b111111; + output[groupOffset + 3] = (int) (word >>> 24) & 0b111111; + output[groupOffset + 4] = (int) (word >>> 18) & 0b111111; + output[groupOffset + 5] = (int) (word >>> 12) & 0b111111; + output[groupOffset + 6] = (int) (word >>> 6) & 0b111111; + output[groupOffset + 7] = (int) word & 0b111111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 6 * fullGroups, byteWidth(6 * remaining)); + unpackRemainder(6, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(6 * count); + } + + /** 7-bit values: 7 bytes into 8 values. */ + private static int unpackBits7( + ByteBuffer data, int dataOffset, int[] output, int outputOffset, int count) { + int fullGroups = count / 8; + for (int group = 0; group < fullGroups; group += 1) { + int groupOffset = outputOffset + 8 * group; + long word = readWord(data, dataOffset + 7 * group, 7); + output[groupOffset] = (int) (word >>> 49) & 0b1111111; + output[groupOffset + 1] = (int) (word >>> 42) & 0b1111111; + output[groupOffset + 2] = (int) (word >>> 35) & 0b1111111; + output[groupOffset + 3] = (int) (word >>> 28) & 0b1111111; + output[groupOffset + 4] = (int) (word >>> 21) & 0b1111111; + output[groupOffset + 5] = (int) (word >>> 14) & 0b1111111; + output[groupOffset + 6] = (int) (word >>> 7) & 0b1111111; + output[groupOffset + 7] = (int) word & 0b1111111; + } + + int remaining = count % 8; + if (remaining > 0) { + long word = readWord(data, dataOffset + 7 * fullGroups, byteWidth(7 * remaining)); + unpackRemainder(7, word, output, outputOffset + 8 * fullGroups, remaining); + } + + return byteWidth(7 * count); + } + + /** + * Unpack {@code count < 8} values of {@code width} bits from {@code word}. + * + * @param width number of bits stored for each value + * @param word a long containing the bytes of the remaining values + * @param output array for unpacked output values + * @param outputOffset starting index to store values in the output array + * @param count number of values to unpack from the word + */ + private static void unpackRemainder( + int width, long word, int[] output, int outputOffset, int count) { + int mask = (1 << width) - 1; + // value bits are stored in the last bytes of the word + int valueBytes = byteWidth(count * width); + // the first value is width bits starting with the most-significant bits of the value bytes + int shift = 8 * valueBytes - width; + for (int i = 0; i < count; i += 1) { + output[outputOffset + i] = (int) ((word >>> shift) & mask); + shift -= width; + } + } + + /** + * Read {@code count} bytes of data into the last {@code count} bytes of {@code word}. + * + *

The first input byte occupies the most significant bits of the last {@code count} bytes and + * the last input byte occupies the least significant bits of the word. The remaining most + * significant bytes of the word are 0. + */ + private static long readWord(ByteBuffer data, int offset, int count) { + long word = 0; + for (int k = 0; k < count; k += 1) { + word = (word << 8) | (data.get(offset + k) & 0xFF); + } + + return word; + } + + /** Returns the number of whole bytes needed to hold {@code bits} bits. */ + private static int byteWidth(int bits) { + return (bits + 7) / 8; + } +} diff --git a/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java b/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java new file mode 100644 index 000000000000..74c5400acecd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/mumbling/MumblingBitmap.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import org.apache.curator.shaded.com.google.common.base.Preconditions; + +/** + * Read-only view of a Mumbling compressed bitmap stored in a {@link ByteBuffer}. + * + *

The bitmap is lazy: no decoding is done at construction time. On the first call to {@link + * #isSet}, the PFOR-encoded descriptor array is decoded and used to build an offsets array that + * maps each container index to its absolute byte position in the buffer. This offsets array is the + * only derived state kept by this class. + * + *

Format (all integers unsigned, little-endian): + * + *

+ */ +class MumblingBitmap { + private static final int VERSION = 1; + private static final int HEADER_SIZE = 6; + private static final int DENSE_CONTAINER_BIT = 0b0010_0000; + + private final ByteBuffer data; + private final int cardinality; + private final int containerCount; + private int[] descriptors = null; + private int[] offsets = null; + + MumblingBitmap(ByteBuffer data) { + int version = data.get(data.position()) & 0xFF; + if (version != VERSION) { + throw new UnsupportedOperationException("Unsupported Mumbling bitmap version: " + version); + } + + this.data = data; + this.cardinality = + (data.get(data.position() + 1) & 0xFF) + | ((data.get(data.position() + 2) & 0xFF) << 8) + | ((data.get(data.position() + 3) & 0xFF) << 16); + this.containerCount = + (data.get(data.position() + 4) & 0xFF) | ((data.get(data.position() + 5) & 0xFF) << 8); + } + + /** Returns the number of bits set in the bitmap. */ + public int cardinality() { + return cardinality; + } + + /** + * Returns {@code true} if the bit at {@code pos} is set in the bitmap. + * + *

Positions beyond the range of any container are always unset. + */ + public boolean isSet(int pos) { + Preconditions.checkArgument(pos >= 0, "Invalid bit position: %s < 0", pos); + int containerIndex = pos >>> 8; + int posInContainer = pos & 0xFF; + + if (containerIndex >= containerCount) { + return false; + } + + int containerStart = offset(containerIndex); + int descriptor = descriptor(containerIndex); + + if (isDense(descriptor)) { + // Dense: 32-byte bitset, MSB of byte 0 is position 0 + int byteIndex = posInContainer >>> 3; + int bitShift = 7 - (posInContainer & 0b111); + return ((data.get(containerStart + byteIndex) >>> bitShift) & 0b1) == 0b1; + + } else { + // Sparse: sorted list of set positions; scan until found or exceeded + for (int i = 0; i < descriptor; i += 1) { + int stored = data.get(containerStart + i) & 0xFF; + if (stored == posInContainer) { + return true; + } + + if (stored > posInContainer) { + return false; + } + } + + return false; + } + } + + private int descriptor(int containerIndex) { + if (null == descriptors) { + decodeDescriptors(); + } + + return descriptors[containerIndex]; + } + + private int offset(int containerIndex) { + if (null == offsets) { + decodeDescriptors(); + } + + return offsets[containerIndex]; + } + + /** + * Decode the descriptor array and produce an array of absolute container offsets in the buffer. + */ + private void decodeDescriptors() { + this.descriptors = new int[containerCount]; + int bytesRead = + PFOREncoding.decode(data, data.position() + HEADER_SIZE, descriptors, 0, containerCount); + + this.offsets = new int[containerCount + 1]; + int firstContainerOffset = data.position() + HEADER_SIZE + bytesRead; + descriptorsToOffsets(firstContainerOffset, descriptors, offsets); + } + + private static boolean isDense(int descriptor) { + return (descriptor & DENSE_CONTAINER_BIT) == DENSE_CONTAINER_BIT; + } + + /** + * Convert an array of lengths into an array of offsets starting at 0. + * + *

For example, descriptorsToOffsets([1, 1, 2]) produces [0, 1, 2, 4]. + * + * @param baseOffset initial offset of the first container + * @param descriptors an array of descriptor bytes + * @param offsets output array of offsets + */ + private static void descriptorsToOffsets(int baseOffset, int[] descriptors, int[] offsets) { + Preconditions.checkArgument( + offsets.length > descriptors.length, + "Cannot decode %s lengths into %s offsets (not enough space)", + descriptors.length, + offsets.length); + + offsets[0] = baseOffset; + for (int i = 0; i < descriptors.length; i += 1) { + if (isDense(descriptors[i])) { + offsets[i + 1] = offsets[i] + 32; + } else { + offsets[i + 1] = offsets[i] + descriptors[i]; + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java b/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java new file mode 100644 index 000000000000..fad8915811a1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/mumbling/PFOREncoding.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.nio.ByteBuffer; +import org.apache.curator.shaded.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; + +/** + * Patched Frame of Reference (PFOR) encoding for arrays of unsigned byte values. + * + *

Implements the encoding described in Appendix A of the Mumbling bitmap specification. The + * input array is split into 256-value chunks (the last chunk may be shorter). Each chunk is + * independently encoded using 4 configuration values: + * + *

+ * + *

Each chunk is stored as: + * + *

+ */ +class PFOREncoding { + private static final int CHUNK_SIZE = 256; + + private PFOREncoding() {} + + /** + * Encodes {@code count} values from an array of unsigned byte values. + * + * @param values unsigned byte values to encode + * @param count number of values to encode + * @return a {@link ByteBuffer} of the encoded values with position and limit set for reading + */ + static ByteBuffer encode(int[] values, int count) { + ByteBuffer out = ByteBuffer.allocate(estimateEncodedSize(count)); + int bytesWritten = encode(values, 0, out, out.position(), count); + return out.slice(0, bytesWritten); + } + + /** + * Encode {@code count} unsigned byte values from {@code values} into a buffer. + * + *

The buffer's position and limit are not modified. + * + * @param values unsigned byte values to encode + * @param valueOffset starting offset of values to encode + * @param out buffer to write encoded values to + * @param outOffset starting offset in the output bufferß + * @param count number of values to encode + * @return the number of bytes written to the buffer + */ + static int encode(int[] values, int valueOffset, ByteBuffer out, int outOffset, int count) { + // check the buffer's position and limit are compatible with outOffset and count + Preconditions.checkArgument( + outOffset >= out.position(), + "Cannot encode starting at %s to buffer with position %s", + outOffset, + out.position()); + Preconditions.checkArgument( + estimateEncodedSize(count) <= out.limit() - outOffset, + "Cannot encode %s values to buffer with %s remaining space", + count, + out.remaining()); + + int bytesWritten = 0; + int currentOffset = valueOffset; + + while (currentOffset < count) { + int chunkLength = Math.min(CHUNK_SIZE, count - currentOffset); + bytesWritten += + encodeChunk(values, currentOffset, out, outOffset + bytesWritten, chunkLength); + currentOffset += chunkLength; + } + + return bytesWritten; + } + + /** + * Decode to produce unsigned byte values. + * + *

Decodes starting at {@code encoded.position()} and does not modify the input buffer. + * + * @param encoded PFOR-encoded ByteBuffer produced by {@link #encode} + * @param count total number of values to decode + * @return decoded unsigned byte values + */ + static int[] decode(ByteBuffer encoded, int count) { + int[] out = new int[count]; + decode(encoded, encoded.position(), out, 0, count); + return out; + } + + /** + * Decode {@code count} unsigned bytes from a buffer into {@code out}. + * + *

This does not modify the input buffer. + * + * @param encoded a buffer containing encoded data + * @param offset starting offset of encoded values + * @param out an output value array + * @param outOffset starting offset in the output array + * @param count number of values to decode + * @return the number of bytes read from the encoded buffer + */ + static int decode(ByteBuffer encoded, int offset, int[] out, int outOffset, int count) { + Preconditions.checkArgument( + offset >= encoded.position(), + "Cannot decode starting at %s from buffer with position %s", + offset, + encoded.position()); + + int bytesRead = 0; + int valuesRead = 0; + + while (valuesRead < count) { + int chunkSize = Math.min(CHUNK_SIZE, count - valuesRead); + bytesRead += decodeChunk(encoded, offset + bytesRead, out, outOffset + valuesRead, chunkSize); + valuesRead += chunkSize; + } + + return bytesRead; + } + + /** + * Encode one chunk into {@code out} starting at absolute position {@code outPos}. + * + * @param values array containing source values to encode + * @param valueOffset starting index of values to encode + * @param out an output {@link ByteBuffer} + * @param outOffset starting index for output in the out buffer + * @param count number of values to encode + * @return the number of bytes written to the output buffer + */ + private static int encodeChunk( + int[] values, int valueOffset, ByteBuffer out, int outOffset, int count) { + Preconditions.checkArgument(count >= 0, "Invalid value count to encode: %s", count); + Preconditions.checkArgument( + valueOffset + count <= values.length, + "Cannot encode %s values starting at %s from int[%s]: not enough values", + count, + valueOffset, + values.length); + + // find base=min(values) for normalization + int base = min(values, valueOffset, count); + + // normalize by subtracting base + int[] normalized = new int[count]; + int setBits = 0; + int normalizedSetBits = 0; + for (int i = 0; i < count; i += 1) { + setBits |= values[valueOffset + i]; + normalized[i] = values[valueOffset + i] - base; + normalizedSetBits |= normalized[i]; + } + + Preconditions.checkArgument( + width(setBits) <= 8, + "Cannot encode values wider than 8 bits: %s bits needed", + width(setBits)); + + // Choose b1 to minimize total encoded data size (excluding 3-byte header) + int maxWidth = width(normalizedSetBits); + Pair widthAndExcCount = chooseBitWidth(normalized, count, maxWidth); + int b1 = widthAndExcCount.first(); + int b2 = maxWidth - b1; + int excCount = widthAndExcCount.second(); + + // check that there is enough space in the buffer for the encoded data + int requiredSize = encodedSize(count, b1, b2, excCount); + Preconditions.checkArgument( + outOffset + requiredSize <= out.remaining(), + "Cannot decode %s values from buffer with %s remaining bytes", + requiredSize, + out.remaining()); + + // Special case: b1=8 means store original values as raw bytes with b2, e, and m set to 0. + if (b1 == 8) { + writeHeader(out, outOffset, b1, 0 /* b2 */, 0 /* excCount */, 0 /* m */); + return 3 + BitPacking.packBits(8, values, valueOffset, out, outOffset + 3, count); + } + + int bytesWritten = writeHeader(out, outOffset, b1, b2, excCount, base); + + // Primary array: low b1 bits of every value + bytesWritten += BitPacking.packBits(b1, normalized, 0, out, outOffset + bytesWritten, count); + + // b2 is the bit width of exception values: (maxWidth - b1) bits of each exception + if (excCount > 0) { + int[] excOffsets = new int[excCount]; + int[] excValues = new int[excCount]; + + // Collect exceptions (values that do not fit in b1 bits) + int excIndex = 0; + int threshold = 1 << b1; + for (int i = 0; i < count; i += 1) { + if (normalized[i] >= threshold) { + excOffsets[excIndex] = i; + excValues[excIndex] = normalized[i] >>> b1; + excIndex += 1; + } + } + + // Exception offsets (one byte per exception) + bytesWritten += + BitPacking.packBits(8, excOffsets, 0, out, outOffset + bytesWritten, excCount); + + // Exception values: remaining high b2 bits of each exception + bytesWritten += + BitPacking.packBits(b2, excValues, 0, out, outOffset + bytesWritten, excCount); + } + + return bytesWritten; + } + + /** + * Decode one chunk of encoded data, writing decoded values into an output array. + * + * @param data buffer containing source data to decode + * @param dataOffset starting index in the buffer to decode + * @param out an output {@link ByteBuffer} + * @param outOffset starting index for output in the out buffer + * @param count number of values to decode + * @return the number of bytes read from {@code data} + */ + private static int decodeChunk( + ByteBuffer data, int dataOffset, int[] out, int outOffset, int count) { + Preconditions.checkArgument(count >= 0, "Invalid value count to decode: %s", count); + Preconditions.checkArgument( + outOffset + count <= out.length, + "Cannot decode %s values starting at %s into int[%s]: not enough space", + count, + out.length, + outOffset); + + int b1 = data.get(dataOffset) & 0x0F; + int b2 = (data.get(dataOffset) >>> 4) & 0x0F; + int excCount = data.get(dataOffset + 1) & 0xFF; + int base = data.get(dataOffset + 2) & 0xFF; + int bytesRead = 3; + + // after reading the header, check that the full chunk is present + int expectedSize = encodedSize(count, b1, b2, excCount); + Preconditions.checkArgument( + dataOffset + expectedSize <= data.limit(), + "Cannot decode %s values from buffer with %s remaining bytes", + expectedSize, + data.limit() - dataOffset); + + // Read primary array: low b1 bits of each value + bytesRead += BitPacking.unpackBits(b1, data, dataOffset + bytesRead, out, outOffset, count); + + // Read exceptions and update output values + if (excCount > 0) { + int[] excOffsets = new int[excCount]; + int[] excValues = new int[excCount]; + int excListOffset = dataOffset + bytesRead; + int excDataOffset = dataOffset + bytesRead + excCount; + + // Read exception indexes + bytesRead += BitPacking.unpackBits(8, data, excListOffset, excOffsets, 0, excCount); + + // Read exception values and patch the primary values + bytesRead += BitPacking.unpackBits(b2, data, excDataOffset, excValues, 0, excCount); + + // Update output values + for (int i = 0; i < excCount; i += 1) { + out[outOffset + excOffsets[i]] |= excValues[i] << b1; + } + } + + // Add back the chunk minimum + for (int i = 0; i < count; i += 1) { + out[outOffset + i] += base; + } + + return bytesRead; + } + + private static int writeHeader( + ByteBuffer out, int outOffset, int b1, int b2, int excCount, int base) { + // Header: b1 in low nibble, b2 in high nibble, then e, then m + out.put(outOffset, (byte) ((b2 << 4) | (b1 & 0b1111))); + out.put(outOffset + 1, (byte) excCount); + out.put(outOffset + 2, (byte) base); + + return 3; + } + + /** + * Choose the primary bit width {@code b1} that minimizes total encoded chunk size. + * + *

This produces the width that results in the smallest total size and the number of exceptions + * for that width. + * + *

Larger width is preferred on ties to reduce the number of exceptions. + * + * @param normalized value array to encode, after normalization + * @param length number of values in the array to encode + * @param maxWidth the largest bit width of normalized values + * @return a {@link Pair} of the chosen width and number of exceptions for that width + */ + private static Pair chooseBitWidth(int[] normalized, int length, int maxWidth) { + int bestWidth = 0; + int bestSize = Integer.MAX_VALUE; + int bestExcCount = 0; + + for (int candidateWidth = 0; candidateWidth <= maxWidth; candidateWidth += 1) { + int excCount = 0; + if (candidateWidth < 8) { + int threshold = 1 << candidateWidth; + for (int i = 0; i < length; i += 1) { + if (normalized[i] >= threshold) { + excCount += 1; + } + } + } + + int b2 = maxWidth - candidateWidth; + int size = byteWidth(length * candidateWidth) + excCount + byteWidth(excCount * b2); + + if (size <= bestSize) { + bestSize = size; + bestWidth = candidateWidth; + bestExcCount = excCount; + } + } + + return Pair.of(bestWidth, bestExcCount); + } + + /** + * Return the lowest byte value from the array slice [start, start + length). + * + *

If length is < 1, the result will be larger than Byte.MAX_VALUE. + * + * @param values array of values + * @param start starting index + * @param length number of values to check + * @return the min of the values in the array slice + */ + private static int min(int[] values, int start, int length) { + // Use min > Byte.MAX_VALUE to signal no min (length < 1) + int min = 256; + for (int i = start; i < start + length; i += 1) { + if (values[i] < min) { + min = values[i]; + } + } + + return min; + } + + /** Returns the number of bytes required in the worst case to encode {@code valueCount} values. */ + static int estimateEncodedSize(int valueCount) { + // Worst-case per chunk is b1=8: 3-byte header + 1 byte per value. Any other b1 chosen by the + // encoder costs <= n bytes of data (otherwise b1=8 would have been selected instead). + int numChunks = ceilDiv(valueCount, CHUNK_SIZE); + return 3 * numChunks + valueCount; + } + + /** Returns the number of bytes required to encode a chunk of values. */ + private static int encodedSize(int count, int b1, int b2, int excCount) { + return 3 + byteWidth(b1 * count) + excCount + byteWidth(b2 * excCount); + } + + /** Returns the number of bits required to represent {@code v} (0 for v=0). */ + static int width(int value) { + return 32 - Integer.numberOfLeadingZeros(value); + } + + private static int byteWidth(int bits) { + return ceilDiv(bits, 8); + } + + private static int ceilDiv(int a, int b) { + return (a + b - 1) / b; + } +} diff --git a/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java b/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java new file mode 100644 index 000000000000..1ba375adb8bf --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/mumbling/PFORRandomData.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import java.util.Random; + +/** Data generation for {@link PFOREncoding} tests and benchmarks. */ +class PFORRandomData { + + private PFORRandomData() {} + + /** Generates {@code count} values between 0 and {@code maxValue}. */ + static int[] uniform(Random random, int count, int maxValue) { + int[] values = new int[count]; + for (int i = 0; i < count; i++) { + values[i] = random.nextInt(maxValue + 1); + } + + return values; + } + + /** Generates {@code count} values (0-3) with about {@code excPercent}% exceptions (0-255). */ + static int[] exceptions(Random random, int count, float excPercent) { + int[] values = new int[count]; + for (int i = 0; i < count; i++) { + values[i] = random.nextFloat() < excPercent ? random.nextInt(256) : random.nextInt(4); + } + + return values; + } +} diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java b/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java new file mode 100644 index 000000000000..31ac59551a87 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/mumbling/TestMumblingBitmap.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.junit.jupiter.api.Test; + +class TestMumblingBitmap { + + @Test + void testEmptyBitmap() { + MumblingBitmap bitmap = bitmap(); + assertThat(bitmap.cardinality()).isEqualTo(0); + + // all positions beyond the bitmap range are false + assertThat(bitmap.isSet(0)).isFalse(); + assertThat(bitmap.isSet(255)).isFalse(); + assertThat(bitmap.isSet(256)).isFalse(); + } + + @Test + void testInvalidPosition() { + MumblingBitmap bitmap = bitmap(); + assertThat(bitmap.cardinality()).isEqualTo(0); + assertThat(bitmap.isSet(0)).isFalse(); + assertThatThrownBy(() -> bitmap.isSet(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid bit position: -1 < 0"); + } + + @Test + void testEmptySparseContainer() { + MumblingBitmap bitmap = bitmap(sparse()); + assertThat(bitmap.cardinality()).isEqualTo(0); + assertThat(bitmap.isSet(0)).isFalse(); + assertThat(bitmap.isSet(100)).isFalse(); + assertThat(bitmap.isSet(255)).isFalse(); + } + + @Test + void testSparseContainerSetPositions() { + MumblingBitmap bitmap = bitmap(sparse(0, 5, 100, 255)); + assertThat(bitmap.cardinality()).isEqualTo(4); + + assertThat(bitmap.isSet(0)).isTrue(); + assertThat(bitmap.isSet(5)).isTrue(); + assertThat(bitmap.isSet(100)).isTrue(); + assertThat(bitmap.isSet(255)).isTrue(); + + assertThat(bitmap.isSet(1)).isFalse(); + assertThat(bitmap.isSet(4)).isFalse(); + assertThat(bitmap.isSet(6)).isFalse(); + assertThat(bitmap.isSet(99)).isFalse(); + assertThat(bitmap.isSet(101)).isFalse(); + assertThat(bitmap.isSet(254)).isFalse(); + assertThat(bitmap.isSet(256)).isFalse(); + } + + @Test + void testFullSparseContainer() { + int[] positions = new int[31]; + for (int i = 0; i < 31; i += 1) { + positions[i] = i * 8; // 0, 8, 16, ..., 240 + } + + MumblingBitmap bitmap = bitmap(sparse(positions)); + assertThat(bitmap.cardinality()).isEqualTo(31); + + for (int p : positions) { + assertThat(bitmap.isSet(p)).isTrue(); + } + + assertThat(bitmap.isSet(1)).isFalse(); + assertThat(bitmap.isSet(7)).isFalse(); + assertThat(bitmap.isSet(255)).isFalse(); + } + + @Test + void testFullDenseContainer() { + byte[] container = new byte[32]; + Arrays.fill(container, (byte) 0xFF); + + MumblingBitmap bitmap = bitmap(dense(container)); + assertThat(bitmap.cardinality()).isEqualTo(256); + + for (int i = 0; i < 256; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + + assertThat(bitmap.isSet(256)).isFalse(); + } + + // Example 1: positions 0-31: `FF FF FF FF 00 ... 00` + @Test + void testDenseSpecExample1() { + byte[] container = new byte[32]; + container[0] = (byte) 0xFF; + container[1] = (byte) 0xFF; + container[2] = (byte) 0xFF; + container[3] = (byte) 0xFF; + MumblingBitmap bitmap = bitmap(dense(container)); + assertThat(bitmap.cardinality()).isEqualTo(32); + + for (int i = 0; i <= 31; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + + assertThat(bitmap.isSet(32)).isFalse(); + assertThat(bitmap.isSet(255)).isFalse(); + } + + // Example 2: positions 0-32: `FF FF FF FF 80 00 ... 00` + @Test + void testDenseSpecExample2() { + byte[] container = new byte[32]; + container[0] = (byte) 0xFF; + container[1] = (byte) 0xFF; + container[2] = (byte) 0xFF; + container[3] = (byte) 0xFF; + container[4] = (byte) 0x80; + + MumblingBitmap bitmap = bitmap(dense(container)); + assertThat(bitmap.cardinality()).isEqualTo(33); + + for (int i = 0; i <= 32; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + + assertThat(bitmap.isSet(33)).isFalse(); + assertThat(bitmap.isSet(255)).isFalse(); + } + + // Example 3: positions 0-15 and 240-255: `FF FF 00 ... 00 FF FF` + @Test + void testDenseSpecExample3() { + byte[] container = new byte[32]; + container[0] = (byte) 0xFF; + container[1] = (byte) 0xFF; + container[30] = (byte) 0xFF; + container[31] = (byte) 0xFF; + + MumblingBitmap bitmap = bitmap(dense(container)); + assertThat(bitmap.cardinality()).isEqualTo(32); + + for (int i = 0; i <= 15; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + for (int i = 240; i <= 255; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + assertThat(bitmap.isSet(16)).isFalse(); + assertThat(bitmap.isSet(239)).isFalse(); + assertThat(bitmap.isSet(256)).isFalse(); + } + + // Example 4: even positions 0, 2, 4, ...: `AA AA ... AA AA` + @Test + void testDenseSpecExample4() { + byte[] container = new byte[32]; + Arrays.fill(container, (byte) 0xAA); + + MumblingBitmap bitmap = bitmap(dense(container)); + assertThat(bitmap.cardinality()).isEqualTo(128); + + for (int i = 0; i < 256; i += 1) { + assertThat(bitmap.isSet(i)).isEqualTo(i % 2 == 0); + } + + assertThat(bitmap.isSet(256)).isFalse(); + } + + @Test + void testMultipleContainers() { + MumblingBitmap bitmap = bitmap(sparse(5), sparse(), sparse(10)); + assertThat(bitmap.cardinality()).isEqualTo(2); + + assertThat(bitmap.isSet(5)).isTrue(); // container 0, pos 5 + assertThat(bitmap.isSet(256)).isFalse(); // container 1 + assertThat(bitmap.isSet(522)).isTrue(); // container 2, pos 10 + + assertThat(bitmap.isSet(512)).isFalse(); + assertThat(bitmap.isSet(4)).isFalse(); + assertThat(bitmap.isSet(265)).isFalse(); + assertThat(bitmap.isSet(267)).isFalse(); + } + + @Test + void testMixedSparseAndDense() { + byte[] denseContainer = new byte[32]; + denseContainer[0] = (byte) 0xFF; + denseContainer[1] = (byte) 0xFF; + denseContainer[2] = (byte) 0xFF; + denseContainer[3] = (byte) 0xFF; + + MumblingBitmap bitmap = bitmap(dense(denseContainer), sparse(1)); + assertThat(bitmap.cardinality()).isEqualTo(33); + + for (int i = 0; i < 32; i += 1) { + assertThat(bitmap.isSet(i)).isTrue(); + } + assertThat(bitmap.isSet(32)).isFalse(); + + assertThat(bitmap.isSet(256)).isFalse(); + assertThat(bitmap.isSet(257)).isTrue(); // container 1, pos 1 + assertThat(bitmap.isSet(258)).isFalse(); + } + + @Test + void testBufferWithOffset() { + // Prepend 4 bytes of garbage before the actual bitmap data + ByteBuffer buffer = build(sparse(42)); + byte[] rawBytes = new byte[buffer.remaining()]; + buffer.get(rawBytes); + + ByteBuffer padded = ByteBuffer.allocate(4 + rawBytes.length); + padded.position(4); + padded.put(rawBytes); + padded.position(4); // position the buffer at the start of bitmap data + + MumblingBitmap bitmap = new MumblingBitmap(padded); + assertThat(bitmap.cardinality()).isEqualTo(1); + + assertThat(bitmap.isSet(41)).isFalse(); + assertThat(bitmap.isSet(42)).isTrue(); + assertThat(bitmap.isSet(43)).isFalse(); + } + + private static Container sparse(int... positions) { + byte[] bytes = new byte[positions.length]; + for (int i = 0; i < positions.length; i += 1) { + if (i > 0) { + Preconditions.checkArgument( + positions[i] < 256, "Invalid position in container: %s", positions[i]); + Preconditions.checkArgument( + positions[i] > positions[i - 1], + "Invalid sparse container: pos %s=%s >= pos %s=%s", + i - 1, + positions[i - 1], + i, + positions[i]); + } + + bytes[i] = (byte) positions[i]; + } + + return new Container(bytes); + } + + /** Descriptor + bytes for a dense container. */ + private static Container dense(byte[] container) { + Preconditions.checkArgument(container.length == 32, "Dense container must be 32 bytes"); + return new Container(container); + } + + private static class Container { + private final byte[] bytes; + private final int descriptor; + private final int cardinality; + + Container(byte[] bytes) { + this.bytes = bytes; + this.descriptor = bytes.length; + this.cardinality = cardinality(bytes); + } + + private static int cardinality(byte[] bytes) { + if (bytes.length < 32) { + return bytes.length; + } else if (bytes.length == 32) { + int setBits = 0; + for (byte b : bytes) { + setBits += Integer.bitCount(b & 0xFF); + } + + Preconditions.checkArgument( + setBits > 31, "Invalid dense container: %s values should be sparse", setBits); + + return setBits; + } else { + throw new IllegalArgumentException("Invalid container: longer than 32 bytes"); + } + } + } + + private static MumblingBitmap bitmap(Container... containers) { + return new MumblingBitmap(build(containers)); + } + + private static ByteBuffer build(Container... containers) { + Preconditions.checkArgument( + containers.length <= 8192, "Invalid container count (max 8192): %s", containers.length); + + int[] descriptors = new int[containers.length]; + int cardinality = 0; + int sizeEstimate = 6; + for (int i = 0; i < containers.length; i += 1) { + descriptors[i] = containers[i].descriptor; + cardinality += containers[i].cardinality; + sizeEstimate += containers[i].bytes.length; + } + + Preconditions.checkArgument( + cardinality <= 2_097_152, "Invalid cardinality (max 2,097,152): %s", cardinality); + + sizeEstimate += PFOREncoding.estimateEncodedSize(containers.length); + ByteBuffer buf = ByteBuffer.allocate(sizeEstimate); + + // header: version (1 byte), cardinality (3 bytes LE), container count (2 bytes LE) + buf.put(0, (byte) 1); + buf.put(1, (byte) (cardinality & 0xFF)); + buf.put(2, (byte) ((cardinality >>> 8) & 0xFF)); + buf.put(3, (byte) ((cardinality >>> 16) & 0xFF)); + buf.put(4, (byte) (containers.length & 0xFF)); + buf.put(5, (byte) ((containers.length >>> 8) & 0xFF)); + + // write encoded descriptors + int descriptorArraySize = + PFOREncoding.encode(descriptors, 0, buf, buf.position() + 6, descriptors.length); + + // copy container bytes into the array + int containerOffset = 6 + descriptorArraySize; + for (Container spec : containers) { + buf.put(containerOffset, spec.bytes); + containerOffset += spec.bytes.length; + } + + // the offset after the last container is the length + buf.limit(containerOffset); + + return buf; + } +} diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java new file mode 100644 index 000000000000..20193fbd23fd --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncoding.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.iceberg.util.ByteBuffers; +import org.junit.jupiter.api.Test; + +public class TestPFOREncoding { + + // Example 1: 256 values, b1=0, m=0, all=0: `00 00 00` + @Test + public void testExample1AllZeros() { + int[] values = new int[256]; + Arrays.fill(values, 0); + + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(bytes(0x00, 0x00, 0x00)); + + int[] decoded = PFOREncoding.decode(encoded, 256); + assertThat(decoded).isEqualTo(values); + } + + // Example 2: 51 values, b1=0, m=5, all=5: `00 00 05` + @Test + public void testExample2AllFives() { + int[] values = new int[51]; + Arrays.fill(values, 5); + + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(bytes(0x00, 0x00, 0x05)); + + int[] decoded = PFOREncoding.decode(encoded, 51); + assertThat(decoded).isEqualTo(values); + } + + // Example 3: only exception values, b1=0, b2=8: `80 02 00 04 07 FF FE` + @Test + public void testExample3SparseExceptions() { + int[] values = {0, 0, 0, 0, 0xFF, 0, 0, 0xFE}; + + byte[] expected = + bytes(0x80, 0x02, 0x00, /* offsets */ 0x04, 0x07, /* exceptions */ 0xFF, 0xFE); + + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected); + + int[] decoded = PFOREncoding.decode(encoded, values.length); + assertThat(decoded).isEqualTo(values); + } + + // Example 4: [6, 7, 8], no exceptions, b1=2, m=6: `02 00 06 18` + @Test + public void testExample4TwoBitsNoExceptions() { + int[] values = {6, 7, 8}; + + byte[] expected = bytes(0x02, 0x00, 0x06, 0x18); + + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected); + + int[] decoded = PFOREncoding.decode(encoded, values.length); + assertThat(decoded).isEqualTo(values); + } + + // Example 5: [6, 34, 8, 7], b1=2, b2=3, m=6, 1 exception: `32 01 06 09 01 E0` + @Test + public void testExample5() { + int[] values = {6, 34, 8, 7}; + + // impl prefers larger widths when the storage is the same size + byte[] expected = bytes(0x05, 0x00, 0x06, 0x07, 0x04, 0x10); + byte[] fromSpec = bytes(0x32, 0x01, 0x06, 0x09, 0x01, 0xE0); + + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + assertThat(ByteBuffers.toByteArray(encoded)).isEqualTo(expected); + + int[] decoded = PFOREncoding.decode(encoded, values.length); + assertThat(decoded).isEqualTo(values); + + int[] decodedFromSpec = PFOREncoding.decode(ByteBuffer.wrap(fromSpec), values.length); + assertThat(decodedFromSpec).isEqualTo(values); + } + + private static byte[] bytes(int... values) { + byte[] result = new byte[values.length]; + for (int i = 0; i < values.length; i++) { + result[i] = (byte) values[i]; + } + return result; + } +} diff --git a/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java new file mode 100644 index 000000000000..5f36ab5ce99e --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/mumbling/TestPFOREncodingRandom.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mumbling; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** Randomized round-trip tests for {@link PFOREncoding}. */ +public class TestPFOREncodingRandom { + private static final Random RANDOM = new Random(3546521684L); + + static Stream uniformRandomCases() { + return Stream.of( + // 1-bit range (b1=1): values in {0, 1} + Arguments.of("1-bit range, count=1", 1, 1), + Arguments.of("1-bit range, count=7", 7, 1), + Arguments.of("1-bit range, count=8", 8, 1), + Arguments.of("1-bit range, count=9", 9, 1), + Arguments.of("1-bit range, count=256", 256, 1), + + // 2-bit range (b1=2): values in [0, 3] + Arguments.of("2-bit range, count=7", 7, 3), + Arguments.of("2-bit range, count=8", 8, 3), + Arguments.of("2-bit range, count=9", 9, 3), + Arguments.of("2-bit range, count=100", 100, 3), + Arguments.of("2-bit range, count=256", 256, 3), + + // 3-bit range (b1=3): values in [0, 7] + Arguments.of("3-bit range, count=7", 7, 7), + Arguments.of("3-bit range, count=24", 24, 7), + Arguments.of("3-bit range, count=256", 256, 7), + + // 6-bit range (b1=6): values in [0, 32] for Mumbling descriptor bytes + Arguments.of("5-bit range, count=7", 7, 32), + Arguments.of("5-bit range, count=8", 8, 32), + Arguments.of("5-bit range, count=63", 63, 32), + Arguments.of("5-bit range, count=64", 64, 32), + Arguments.of("5-bit range, count=256", 256, 32), + + // Full byte range (b1=8 or b1 chosen by cost): values in [0, 255] + Arguments.of("full range, count=1", 1, 255), + Arguments.of("full range, count=7", 7, 255), + Arguments.of("full range, count=8", 8, 255), + Arguments.of("full range, count=9", 9, 255), + Arguments.of("full range, count=15", 15, 255), + Arguments.of("full range, count=16", 16, 255), + Arguments.of("full range, count=100", 100, 255), + Arguments.of("full range, count=255", 255, 255), + Arguments.of("full range, count=256", 256, 255), + Arguments.of("full range, count=257", 257, 255), + Arguments.of("full range, count=512", 512, 255), + Arguments.of("full range, count=513", 513, 255)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("uniformRandomCases") + public void testUniformRandom(String name, int count, int maxValue) { + assertRoundTrip(PFORRandomData.uniform(RANDOM, count, maxValue)); + } + + static Stream exceptionCases() { + return Stream.of( + Arguments.of("Exception 10%, count=7", 7, 0.10f), + Arguments.of("Exception 10%, count=8", 8, 0.10f), + Arguments.of("Exception 10%, count=9", 9, 0.10f), + Arguments.of("Exception 10%, count=100", 100, 0.10f), + Arguments.of("Exception 10%, count=256", 256, 0.10f), + Arguments.of("Exception 25%, count=256", 256, 0.25f), + Arguments.of("Exception 50%, count=256", 256, 0.50f), + Arguments.of("Exception 10%, count=512", 512, 0.10f)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("exceptionCases") + public void testRandomWithExceptions(String name, int count, float excPercent) { + assertRoundTrip(PFORRandomData.exceptions(RANDOM, count, excPercent)); + } + + static Stream offsetRangeCases() { + return Stream.of( + Arguments.of("offset min=100 rangeSize=3, count=8", 8, 100, 3), + Arguments.of("offset min=100 rangeSize=3, count=256", 256, 100, 3), + Arguments.of("offset min=50 rangeSize=31, count=64", 64, 50, 31), + Arguments.of("offset min=200 rangeSize=55, count=100", 100, 200, 55), + Arguments.of("offset min=128 rangeSize=127, count=256", 256, 128, 127)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("offsetRangeCases") + public void testOffsetRandom(String name, int count, int minValue, int rangeSize) { + // generate values between 0 and rangeSize, then add the min value offset + int[] values = PFORRandomData.uniform(RANDOM, count, rangeSize); + for (int i = 0; i < values.length; i += 1) { + values[i] += minValue; + } + + assertRoundTrip(values); + } + + private static void assertRoundTrip(int[] values) { + ByteBuffer encoded = PFOREncoding.encode(values, values.length); + int[] decoded = PFOREncoding.decode(encoded, values.length); + assertThat(decoded).isEqualTo(values); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a590aaf71405..b4fd82ef48ac 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -59,6 +59,7 @@ httpcomponents-httpclient5 = "5.6.1" hive2 = { strictly = "2.3.10"} # see rich version usage explanation above immutables-value = "2.12.2" jackson-annotations = "2.21" +javafastpfor = "0.2.1" jackson-bom = "2.21.4" jackson214 = { strictly = "2.14.2"} jackson215 = { strictly = "2.15.2"} # see rich version usage explanation above @@ -154,6 +155,7 @@ hive2-service = { module = "org.apache.hive:hive-service", version.ref = "hive2" httpcomponents-httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5", version.ref = "httpcomponents-httpclient5" } immutables-value = { module = "org.immutables:value", version.ref = "immutables-value" } jackson-bom = { module = "com.fasterxml.jackson:jackson-bom", version.ref = "jackson-bom" } +javafastpfor = { module = "me.lemire.integercompression:JavaFastPFOR", version.ref = "javafastpfor" } jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-bom" } jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-bom" } jackson-annotations = { module = "com.fasterxml.jackson.core:jackson-annotations", version.ref = "jackson-annotations" } diff --git a/jmh.gradle b/jmh.gradle index a4d794f1e41a..f7c12a2d562a 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -85,4 +85,10 @@ configure(jmhProjects) { } tasks.jmh.finalizedBy tasks.jmhReport + + if (project.path == ':iceberg-core') { + dependencies { + jmhImplementation(libs.javafastpfor) + } + } }