The writer's data bytes (the RLE-encoded indices) and the dictionary + * page are returned separately so both pieces can be measured or fed to a + * decoder symmetrically. The dictionary page buffer is copied so it remains + * valid after the writer's allocator is released. + * + *
The writer is closed via {@code toDictPageAndClose()}; callers must not + * call {@link DictionaryValuesWriter#close()} again afterwards. + */ + static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException { + byte[] dictData = writer.getBytes().toByteArray(); + DictionaryPage rawPage = writer.toDictPageAndClose(); + DictionaryPage dictPage = rawPage == null ? null : rawPage.copy(); + return new EncodedDictionary(dictData, dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java new file mode 100644 index 0000000000..e6646458d8 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; +import org.apache.parquet.column.values.plain.BinaryPlainValuesReader; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding-level and decoding-level micro-benchmarks for BINARY values. + * Compares PLAIN, DELTA_BYTE_ARRAY, DELTA_LENGTH_BYTE_ARRAY, and DICTIONARY encodings + * across different string lengths and cardinality patterns. + * + *
Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is + * reported per-value using {@link OperationsPerInvocation}. + * + *
The dictionary encode/decode benchmarks intentionally measure the full path: + * the encoder produces both the RLE-encoded indices and a {@link DictionaryPage}; + * the decoder consumes the indices through a {@link DictionaryValuesReader} backed + * by the same dictionary. If the dictionary exceeds {@link #MAX_DICT_BYTE_SIZE} + * (which can happen for high-cardinality, long-string parameter combinations) the + * writer falls back to plain encoding and dictionary decoding for that combination + * is skipped. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class BinaryEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024; + + @Param({"10", "100", "1000"}) + public int stringLength; + + /** LOW = 100 distinct values; HIGH = all unique. */ + @Param({"LOW", "HIGH"}) + public String cardinality; + + private Binary[] data; + private byte[] plainEncoded; + private byte[] deltaLengthEncoded; + private byte[] deltaStringsEncoded; + private byte[] dictEncoded; + private DictionaryPage dictPage; + private Dictionary binaryDictionary; + private boolean dictionaryAvailable; + + @Setup(Level.Trial) + public void setup() throws IOException { + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, distinct, TestDataFactory.DEFAULT_SEED); + + // Pre-encode data for decode benchmarks + plainEncoded = encodeBinaryWith(newPlainWriter()); + deltaLengthEncoded = encodeBinaryWith(newDeltaLengthWriter()); + deltaStringsEncoded = encodeBinaryWith(newDeltaStringsWriter()); + + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter dictWriter = newDictWriter(); + for (Binary v : data) { + dictWriter.writeBytes(v); + } + BenchmarkEncodingUtils.EncodedDictionary encoded = BenchmarkEncodingUtils.drainDictionary(dictWriter); + dictEncoded = encoded.dictData; + dictPage = encoded.dictPage; + dictionaryAvailable = !encoded.fellBackToPlain(); + if (dictionaryAvailable) { + binaryDictionary = new PlainValuesDictionary.PlainBinaryDictionary(dictPage); + } + } + + private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + private BenchmarkEncodingUtils.EncodedDictionary encodeDictionaryWith( + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter writer) throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + return BenchmarkEncodingUtils.drainDictionary(writer); + } + + // ---- Writer factories ---- + + private static PlainValuesWriter newPlainWriter() { + return new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaLengthByteArrayValuesWriter newDeltaLengthWriter() { + return new DeltaLengthByteArrayValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaByteArrayWriter newDeltaStringsWriter() { + return new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter newDictWriter() { + return new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + } + + // ---- Encode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodePlain() throws IOException { + return encodeBinaryWith(newPlainWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDeltaLengthByteArray() throws IOException { + return encodeBinaryWith(newDeltaLengthWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDeltaByteArray() throws IOException { + return encodeBinaryWith(newDeltaStringsWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void encodeDictionary(Blackhole bh) throws IOException { + BenchmarkEncodingUtils.EncodedDictionary encoded = encodeDictionaryWith(newDictWriter()); + bh.consume(encoded.dictData); + bh.consume(encoded.dictPage); + } + + // ---- Decode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodePlain(Blackhole bh) throws IOException { + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDeltaLengthByteArray(Blackhole bh) throws IOException { + DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaLengthEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDeltaByteArray(Blackhole bh) throws IOException { + DeltaByteArrayReader reader = new DeltaByteArrayReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaStringsEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionary(Blackhole bh) throws IOException { + if (!dictionaryAvailable) { + // Dictionary fell back to plain encoding (e.g. high-cardinality long strings + // exceeding MAX_DICT_BYTE_SIZE). Skip to keep the benchmark meaningful. + return; + } + DictionaryValuesReader reader = new DictionaryValuesReader(binaryDictionary); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java new file mode 100644 index 0000000000..690ddc2bbe --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +/** + * A no-op {@link OutputFile} that discards all written data. + * Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks. + */ +public final class BlackHoleOutputFile implements OutputFile { + + public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile(); + + private BlackHoleOutputFile() {} + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + pos += len; + } + }; + } + + @Override + public String getPath() { + return "/dev/null"; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java new file mode 100644 index 0000000000..e59b7ba941 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the BYTE_STREAM_SPLIT encoding across the four + * primitive widths supported by Parquet ({@code FLOAT}, {@code DOUBLE}, {@code INT32}, + * {@code INT64}). + * + *
Each invocation decodes {@value #VALUE_COUNT} values; throughput is reported + * per-value via {@link OperationsPerInvocation}. The cost includes both + * {@code initFromPage} (which eagerly transposes the entire page) and the per-value + * read calls. Page transposition is the part this benchmark is primarily designed + * to exercise. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class ByteStreamSplitDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + private byte[] floatPage; + private byte[] doublePage; + private byte[] intPage; + private byte[] longPage; + + @Setup(Level.Trial) + public void setup() throws IOException { + Random random = new Random(42); + int[] intData = new int[VALUE_COUNT]; + long[] longData = new long[VALUE_COUNT]; + float[] floatData = new float[VALUE_COUNT]; + double[] doubleData = new double[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + intData[i] = random.nextInt(); + longData[i] = random.nextLong(); + floatData[i] = random.nextFloat(); + doubleData[i] = random.nextDouble(); + } + + { + ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (float v : floatData) { + w.writeFloat(v); + } + floatPage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (double v : doubleData) { + w.writeDouble(v); + } + doublePage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : intData) { + w.writeInteger(v); + } + intPage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (long v : longData) { + w.writeLong(v); + } + longPage = w.getBytes().toByteArray(); + w.close(); + } + } + + private static void init(ByteStreamSplitValuesReader r, byte[] page) throws IOException { + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(java.nio.ByteBuffer.wrap(page))); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFloat(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForFloat r = new ByteStreamSplitValuesReaderForFloat(); + init(r, floatPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readFloat()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDouble(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForDouble r = new ByteStreamSplitValuesReaderForDouble(); + init(r, doublePage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readDouble()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeInt(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForInteger r = new ByteStreamSplitValuesReaderForInteger(); + init(r, intPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeLong(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForLong r = new ByteStreamSplitValuesReaderForLong(); + init(r, longPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readLong()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java new file mode 100644 index 0000000000..37ec9df812 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmarks for the BYTE_STREAM_SPLIT encoding across the four + * primitive widths supported by Parquet ({@code FLOAT}, {@code DOUBLE}, {@code INT32}, + * {@code INT64}). + * + *
Each invocation encodes {@value #VALUE_COUNT} values; throughput is reported + * per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class ByteStreamSplitEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + private int[] intData; + private long[] longData; + private float[] floatData; + private double[] doubleData; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(42); + intData = new int[VALUE_COUNT]; + longData = new long[VALUE_COUNT]; + floatData = new float[VALUE_COUNT]; + doubleData = new double[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + intData[i] = random.nextInt(); + longData[i] = random.nextLong(); + floatData[i] = random.nextFloat(); + doubleData[i] = random.nextDouble(); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFloat() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (float v : floatData) { + w.writeFloat(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDouble() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (double v : doubleData) { + w.writeDouble(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeInt() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : intData) { + w.writeInteger(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeLong() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (long v : longData) { + w.writeLong(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java new file mode 100644 index 0000000000..de94b422cf --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Multi-threaded benchmarks measuring independent read and write throughput under + * concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). + * + *
This benchmark does not assert correctness; it measures the cost of each thread + * writing a full file to a stateless sink or reading a shared pre-generated file. + * The set of rows used by {@link #concurrentWrite(Blackhole)} is built once during + * setup and shared (read-only) across all threads, so the timed section measures + * the encoder/serializer pipeline rather than per-row data construction. + * + *
{@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full file write or read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows)
+ * that JIT amortization across invocations is unnecessary.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 2, batchSize = 1)
+@Measurement(iterations = 5, batchSize = 1)
+@Threads(4)
+@State(Scope.Benchmark)
+public class ConcurrentReadWriteBenchmark {
+
+ private File tempFile;
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+
+ // Generate a shared file for concurrent reads
+ tempFile = File.createTempFile("parquet-concurrent-bench-", ".parquet");
+ tempFile.deleteOnExit();
+ tempFile.delete();
+
+ try (ParquetWriter Parameterized across compression codec and writer version. The footer parse
+ * (via {@link LocalInputFile} open) is included in the timed section so the result
+ * reflects the full open-and-read cost a typical caller would observe.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 3, batchSize = 1)
+@Measurement(iterations = 5, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileReadBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ private File tempFile;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ tempFile = File.createTempFile("parquet-read-bench-", ".parquet");
+ tempFile.deleteOnExit();
+ tempFile.delete(); // remove so the writer can create it
+
+ Group[] rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ try (ParquetWriter Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost
+ * from filesystem I/O. Parameterized across compression codec, writer version, and
+ * dictionary encoding.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 3, batchSize = 1)
+@Measurement(iterations = 5, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileWriteBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ @Param({"true", "false"})
+ public String dictionary;
+
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ }
+
+ @Benchmark
+ public void writeFile() throws IOException {
+ try (ParquetWriter Each benchmark invocation processes {@value #VALUE_COUNT} values; throughput
+ * is reported per-value via {@link OperationsPerInvocation}.
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Thread)
+public class FixedLenByteArrayEncodingBenchmark {
+
+ static final int VALUE_COUNT = 100_000;
+ private static final int INIT_SLAB_SIZE = 64 * 1024;
+ private static final int PAGE_SIZE = 4 * 1024 * 1024;
+
+ @Param({"10", "100", "1000"})
+ public int fixedLength;
+
+ private Binary[] data;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ Random random = new Random(42);
+ // distinct=0 -> all unique values; each is exactly fixedLength bytes long.
+ data = TestDataFactory.generateBinaryData(VALUE_COUNT, fixedLength, 0, random);
+ }
+
+ private byte[] encodeWith(ValuesWriter writer) throws IOException {
+ for (Binary v : data) {
+ writer.writeBytes(v);
+ }
+ byte[] bytes = writer.getBytes().toByteArray();
+ writer.close();
+ return bytes;
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeFixedLenPlain() throws IOException {
+ return encodeWith(new FixedLenByteArrayPlainValuesWriter(
+ fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()));
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java
new file mode 100644
index 0000000000..7665a7462a
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
+import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary;
+import org.apache.parquet.column.values.plain.PlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Encoding-level and decoding-level micro-benchmarks for INT32 values.
+ * Compares PLAIN, DELTA_BINARY_PACKED, BYTE_STREAM_SPLIT, and DICTIONARY encodings
+ * across different data distribution patterns. Synthetic dictionary-id RLE decode is
+ * benchmarked separately in {@link RleDictionaryIndexDecodingBenchmark} so the results
+ * here stay comparable at the full-value level.
+ *
+ * Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is
+ * reported per-value using {@link OperationsPerInvocation}.
+ *
+ * BYTE_STREAM_SPLIT is included for completeness even though it is rarely a good
+ * choice for integer data; it exists here to compare the full set of encodings the
+ * Parquet writer can emit for INT32.
+ *
+ * The dictionary encode/decode benchmarks measure the full path: the encoder
+ * produces both the RLE-encoded indices and a {@link DictionaryPage}; the decoder
+ * consumes the indices through a {@link DictionaryValuesReader} backed by the same
+ * dictionary.
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Thread)
+public class IntEncodingBenchmark {
+
+ static final int VALUE_COUNT = 100_000;
+ private static final int INIT_SLAB_SIZE = 64 * 1024;
+ private static final int PAGE_SIZE = 1024 * 1024;
+ private static final int MAX_DICT_BYTE_SIZE = 1024 * 1024;
+
+ @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"})
+ public String dataPattern;
+
+ private int[] data;
+ private byte[] plainEncoded;
+ private byte[] deltaEncoded;
+ private byte[] bssEncoded;
+ private byte[] dictDataEncoded;
+ private DictionaryPage dictPage;
+ private Dictionary intDictionary;
+ private boolean dictionaryAvailable;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ switch (dataPattern) {
+ case "SEQUENTIAL":
+ data = TestDataFactory.generateSequentialInts(VALUE_COUNT);
+ break;
+ case "RANDOM":
+ data = TestDataFactory.generateRandomInts(VALUE_COUNT, TestDataFactory.DEFAULT_SEED);
+ break;
+ case "LOW_CARDINALITY":
+ data = TestDataFactory.generateLowCardinalityInts(
+ VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED);
+ break;
+ case "HIGH_CARDINALITY":
+ data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, TestDataFactory.DEFAULT_SEED);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown data pattern: " + dataPattern);
+ }
+
+ // Pre-encode data for decode benchmarks
+ plainEncoded = encodeWith(newPlainWriter());
+ deltaEncoded = encodeWith(newDeltaWriter());
+ bssEncoded = encodeWith(newBssWriter());
+
+ // Pre-encode dictionary data for decode benchmark
+ DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter dictWriter = newDictWriter();
+ for (int v : data) {
+ dictWriter.writeInteger(v);
+ }
+ BenchmarkEncodingUtils.EncodedDictionary encoded = BenchmarkEncodingUtils.drainDictionary(dictWriter);
+ dictDataEncoded = encoded.dictData;
+ dictPage = encoded.dictPage;
+ dictionaryAvailable = !encoded.fellBackToPlain();
+ if (dictionaryAvailable) {
+ intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(dictPage);
+ }
+ }
+
+ private byte[] encodeWith(ValuesWriter writer) throws IOException {
+ for (int v : data) {
+ writer.writeInteger(v);
+ }
+ byte[] bytes = writer.getBytes().toByteArray();
+ writer.close();
+ return bytes;
+ }
+
+ private BenchmarkEncodingUtils.EncodedDictionary encodeDictionaryWith(
+ DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter writer) throws IOException {
+ for (int v : data) {
+ writer.writeInteger(v);
+ }
+ return BenchmarkEncodingUtils.drainDictionary(writer);
+ }
+
+ // ---- Writer factories ----
+
+ private static PlainValuesWriter newPlainWriter() {
+ return new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ private static DeltaBinaryPackingValuesWriterForInteger newDeltaWriter() {
+ return new DeltaBinaryPackingValuesWriterForInteger(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ private static ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter newBssWriter() {
+ return new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
+ INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
+ }
+
+ private static DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter newDictWriter() {
+ return new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter(
+ MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator());
+ }
+
+ // ---- Encode benchmarks ----
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodePlain() throws IOException {
+ return encodeWith(newPlainWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeDelta() throws IOException {
+ return encodeWith(newDeltaWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public byte[] encodeByteStreamSplit() throws IOException {
+ return encodeWith(newBssWriter());
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void encodeDictionary(Blackhole bh) throws IOException {
+ BenchmarkEncodingUtils.EncodedDictionary encoded = encodeDictionaryWith(newDictWriter());
+ bh.consume(encoded.dictData);
+ bh.consume(encoded.dictPage);
+ }
+
+ // ---- Decode benchmarks ----
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodePlain(Blackhole bh) throws IOException {
+ PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader();
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readInteger());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDelta(Blackhole bh) throws IOException {
+ DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader();
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readInteger());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeByteStreamSplit(Blackhole bh) throws IOException {
+ ByteStreamSplitValuesReaderForInteger reader = new ByteStreamSplitValuesReaderForInteger();
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readInteger());
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDictionary(Blackhole bh) throws IOException {
+ if (!dictionaryAvailable) {
+ // Dictionary fell back to plain encoding (e.g. very large unique-value sets
+ // exceeding MAX_DICT_BYTE_SIZE). Skip to keep the benchmark meaningful.
+ return;
+ }
+ DictionaryValuesReader reader = new DictionaryValuesReader(intDictionary);
+ reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictDataEncoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(reader.readInteger());
+ }
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java
new file mode 100644
index 0000000000..68c51f0842
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Decoding micro-benchmark for synthetic dictionary-id pages encoded with
+ * {@link RunLengthBitPackingHybridEncoder}. This isolates the dictionary-id
+ * decode path and is intentionally separate from {@link IntEncodingBenchmark},
+ * which measures full INT32 value decode paths.
+ *
+ * Per-invocation overhead (decoder construction and {@link ByteBufferInputStream}
+ * wrapping) is amortized over {@value #VALUE_COUNT} reads via
+ * {@link OperationsPerInvocation}.
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@State(Scope.Thread)
+public class RleDictionaryIndexDecodingBenchmark {
+
+ static final int VALUE_COUNT = 100_000;
+ private static final int INIT_SLAB_SIZE = 64 * 1024;
+ private static final int PAGE_SIZE = 1024 * 1024;
+ private static final int BIT_WIDTH = 10;
+ private static final int MAX_ID = 1 << BIT_WIDTH;
+
+ static {
+ if (TestDataFactory.LOW_CARDINALITY_DISTINCT > MAX_ID) {
+ throw new IllegalStateException("LOW_CARDINALITY_DISTINCT (" + TestDataFactory.LOW_CARDINALITY_DISTINCT
+ + ") must fit within BIT_WIDTH=" + BIT_WIDTH + " (MAX_ID=" + MAX_ID + ")");
+ }
+ }
+
+ @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY"})
+ public String indexPattern;
+
+ private byte[] encoded;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ int[] ids = generateDictionaryIds();
+ try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(
+ BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) {
+ for (int id : ids) {
+ encoder.writeInt(id);
+ }
+ encoded = encoder.toBytes().toByteArray();
+ }
+ }
+
+ private int[] generateDictionaryIds() {
+ switch (indexPattern) {
+ case "SEQUENTIAL":
+ int[] sequential = new int[VALUE_COUNT];
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ sequential[i] = i % MAX_ID;
+ }
+ return sequential;
+ case "RANDOM":
+ return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, MAX_ID, TestDataFactory.DEFAULT_SEED);
+ case "LOW_CARDINALITY":
+ return TestDataFactory.generateLowCardinalityInts(
+ VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED);
+ default:
+ throw new IllegalArgumentException("Unknown index pattern: " + indexPattern);
+ }
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(VALUE_COUNT)
+ public void decodeDictionaryIds(Blackhole bh) throws IOException {
+ RunLengthBitPackingHybridDecoder decoder =
+ new RunLengthBitPackingHybridDecoder(BIT_WIDTH, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded)));
+ for (int i = 0; i < VALUE_COUNT; i++) {
+ bh.consume(decoder.readInt());
+ }
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java
new file mode 100644
index 0000000000..753b27de4a
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmark measuring row group flush performance and peak buffer memory.
+ *
+ * Uses a wide schema (20 BINARY columns, 200 bytes each) to produce
+ * substantial per-column page buffers. A {@link PeakTrackingAllocator}
+ * wraps the heap allocator to precisely track the peak bytes outstanding
+ * across all parquet-managed ByteBuffers (independent of JVM GC behavior).
+ *
+ * The key metric is {@code peakAllocatorMB}: with the interleaved flush
+ * optimization, each column's pages are finalized, written, and released
+ * before the next column is processed, so peak buffer memory is roughly
+ * 1/N of the total row group size (N = number of columns).
+ *
+ * Writes to {@link BlackHoleOutputFile} to isolate flush cost from
+ * filesystem I/O.
+ */
+@BenchmarkMode({Mode.AverageTime})
+@Fork(
+ value = 1,
+ jvmArgs = {"-Xms512m", "-Xmx1g"})
+@Warmup(iterations = 2)
+@Measurement(iterations = 3)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Thread)
+public class RowGroupFlushBenchmark {
+
+ private static final int COLUMN_COUNT = 20;
+ private static final int BINARY_VALUE_LENGTH = 200;
+ private static final int ROW_COUNT = 100_000;
+
+ /** Row group sizes: 8MB and 64MB. */
+ @Param({"8388608", "67108864"})
+ public int rowGroupSize;
+
+ /** Wide schema: 20 required BINARY columns. */
+ private static final MessageType WIDE_SCHEMA;
+
+ static {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+ for (int c = 0; c < COLUMN_COUNT; c++) {
+ builder.required(PrimitiveTypeName.BINARY).named("col_" + c);
+ }
+ WIDE_SCHEMA = builder.named("wide_record");
+ }
+
+ /** Pre-generated column values (one unique value per column). */
+ private Binary[] columnValues;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ Random random = new Random(42);
+ columnValues = new Binary[COLUMN_COUNT];
+ for (int c = 0; c < COLUMN_COUNT; c++) {
+ byte[] value = new byte[BINARY_VALUE_LENGTH];
+ random.nextBytes(value);
+ columnValues[c] = Binary.fromConstantByteArray(value);
+ }
+ }
+
+ /**
+ * Auxiliary counters reported alongside timing. JMH collects these after
+ * each iteration.
+ */
+ @AuxCounters(AuxCounters.Type.EVENTS)
+ @State(Scope.Thread)
+ public static class MemoryCounters {
+ /** Peak bytes outstanding in the parquet ByteBufferAllocator. */
+ public long peakAllocatorBytes;
+
+ /** Convenience: peak in MB (peakAllocatorBytes / 1048576). */
+ public double peakAllocatorMB;
+
+ @Setup(Level.Iteration)
+ public void reset() {
+ peakAllocatorBytes = 0;
+ peakAllocatorMB = 0;
+ }
+ }
+
+ /**
+ * ByteBufferAllocator wrapper that tracks current and peak allocated bytes.
+ * Thread-safe (uses AtomicLong) although the write path is single-threaded.
+ */
+ static class PeakTrackingAllocator implements ByteBufferAllocator {
+ private final ByteBufferAllocator delegate = new HeapByteBufferAllocator();
+ private final AtomicLong currentBytes = new AtomicLong();
+ private final AtomicLong peakBytes = new AtomicLong();
+
+ @Override
+ public ByteBuffer allocate(int size) {
+ ByteBuffer buf = delegate.allocate(size);
+ long current = currentBytes.addAndGet(buf.capacity());
+ peakBytes.accumulateAndGet(current, Math::max);
+ return buf;
+ }
+
+ @Override
+ public void release(ByteBuffer buf) {
+ currentBytes.addAndGet(-buf.capacity());
+ delegate.release(buf);
+ }
+
+ @Override
+ public boolean isDirect() {
+ return delegate.isDirect();
+ }
+
+ long getPeakBytes() {
+ return peakBytes.get();
+ }
+ }
+
+ @Benchmark
+ public void writeWithFlush(MemoryCounters counters) throws IOException {
+ PeakTrackingAllocator allocator = new PeakTrackingAllocator();
+ SimpleGroupFactory factory = new SimpleGroupFactory(WIDE_SCHEMA);
+
+ try (ParquetWriter Note: prefer {@link #generateRandomInts(int, long)} when call ordering between
+ * generators in the same setup must not influence the produced data.
+ */
+ public static int[] generateRandomInts(int count, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt();
+ }
+ return data;
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set) using the given seed.
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) {
+ return generateLowCardinalityInts(count, distinctValues, new Random(seed));
+ }
+
+ /**
+ * Generates low-cardinality integers (values drawn from a small set).
+ */
+ public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) {
+ int[] data = new int[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = random.nextInt(distinctValues);
+ }
+ return data;
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order) using the given seed.
+ */
+ public static int[] generateHighCardinalityInts(int count, long seed) {
+ return generateHighCardinalityInts(count, new Random(seed));
+ }
+
+ /**
+ * Generates high-cardinality integers (all unique in randomized order).
+ */
+ public static int[] generateHighCardinalityInts(int count, Random random) {
+ int[] data = generateSequentialInts(count);
+ for (int i = count - 1; i > 0; i--) {
+ int swapIndex = random.nextInt(i + 1);
+ int tmp = data[i];
+ data[i] = data[swapIndex];
+ data[swapIndex] = tmp;
+ }
+ return data;
+ }
+
+ // ---- Binary data generation for encoding benchmarks ----
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality, using
+ * a deterministic seed.
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) {
+ return generateBinaryData(count, stringLength, distinct, new Random(seed));
+ }
+
+ /**
+ * Generates binary strings of the given length with the specified cardinality.
+ *
+ * @param count number of values
+ * @param stringLength length of each string
+ * @param distinct number of distinct values (0 means all unique)
+ * @param random random source
+ * @return array of Binary values
+ */
+ public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) {
+ Binary[] data = new Binary[count];
+ if (distinct > 0) {
+ // Pre-generate the distinct values
+ Binary[] dictionary = new Binary[distinct];
+ for (int i = 0; i < distinct; i++) {
+ dictionary[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ for (int i = 0; i < count; i++) {
+ data[i] = dictionary[random.nextInt(distinct)];
+ }
+ } else {
+ // All unique
+ for (int i = 0; i < count; i++) {
+ data[i] = Binary.fromConstantByteArray(
+ randomString(stringLength, random).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return data;
+ }
+
+ private static String randomString(int length, Random random) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append((char) ('a' + random.nextInt(26)));
+ }
+ return sb.toString();
+ }
+}