From 2404a29f1a082f1b3a0b76fe4c42b9641643fb3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 20:08:17 +0000 Subject: [PATCH 1/3] GH-3511: Add JMH encoding benchmarks and fix parquet-benchmarks shaded jar The parquet-benchmarks pom is missing the JMH annotation-processor configuration and the AppendingTransformer entries for BenchmarkList / CompilerHints. As a result, the shaded jar built from master fails at runtime with "Unable to find the resource: /META-INF/BenchmarkList". This commit: - Fixes parquet-benchmarks/pom.xml so the shaded jar is runnable: adds jmh-generator-annprocess to maven-compiler-plugin's annotation processor paths, and adds AppendingTransformer entries for META-INF/BenchmarkList and META-INF/CompilerHints to the shade plugin. - Adds 11 JMH benchmarks covering the encode/decode paths used by the pending performance optimization PRs (#3494, #3496, #3500, #3504, #3506, #3510), so reviewers can reproduce the reported numbers and detect regressions: IntEncodingBenchmark, BinaryEncodingBenchmark, ByteStreamSplitEncodingBenchmark, ByteStreamSplitDecodingBenchmark, FixedLenByteArrayEncodingBenchmark, FileReadBenchmark, FileWriteBenchmark, RowGroupFlushBenchmark, ConcurrentReadWriteBenchmark, BlackHoleOutputFile, TestDataFactory. After this change the shaded jar registers 87 benchmarks (was 0 from a working build, or unrunnable at all from a default build). --- parquet-benchmarks/pom.xml | 18 ++ .../benchmarks/BinaryEncodingBenchmark.java | 182 +++++++++++++ .../benchmarks/BlackHoleOutputFile.java | 76 ++++++ .../ByteStreamSplitDecodingBenchmark.java | 170 ++++++++++++ .../ByteStreamSplitEncodingBenchmark.java | 131 ++++++++++ .../ConcurrentReadWriteBenchmark.java | 135 ++++++++++ .../parquet/benchmarks/FileReadBenchmark.java | 119 +++++++++ .../benchmarks/FileWriteBenchmark.java | 82 ++++++ .../FixedLenByteArrayEncodingBenchmark.java | 89 +++++++ .../benchmarks/IntEncodingBenchmark.java | 244 ++++++++++++++++++ .../benchmarks/RowGroupFlushBenchmark.java | 191 ++++++++++++++ .../parquet/benchmarks/TestDataFactory.java | 175 +++++++++++++ 12 files changed, 1612 insertions(+) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 65d6dbf3ed..58eb4a0e19 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -89,6 +89,18 @@ org.apache.maven.plugins maven-compiler-plugin + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + org.openjdk.jmh.generators.BenchmarkProcessor + + org.apache.maven.plugins @@ -107,6 +119,12 @@ org.openjdk.jmh.Main + + META-INF/BenchmarkList + + + META-INF/CompilerHints + diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java new file mode 100644 index 0000000000..7added9717 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; +import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; +import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.plain.BinaryPlainValuesReader; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding-level micro-benchmarks for BINARY values. + * Compares PLAIN, DELTA_BYTE_ARRAY, DELTA_LENGTH_BYTE_ARRAY, and DICTIONARY encodings + * across different string lengths and cardinality patterns. + * + *

Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is + * reported per-value using {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class BinaryEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + private static final int MAX_DICT_BYTE_SIZE = 4 * 1024 * 1024; + + @Param({"10", "100", "1000"}) + public int stringLength; + + /** LOW = 100 distinct values; HIGH = all unique. */ + @Param({"LOW", "HIGH"}) + public String cardinality; + + private Binary[] data; + private byte[] plainEncoded; + private byte[] deltaLengthEncoded; + private byte[] deltaStringsEncoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + Random random = new Random(42); + int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, distinct, random); + + // Pre-encode data for decode benchmarks + plainEncoded = encodeBinaryWith(newPlainWriter()); + deltaLengthEncoded = encodeBinaryWith(newDeltaLengthWriter()); + deltaStringsEncoded = encodeBinaryWith(newDeltaStringsWriter()); + } + + private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + // ---- Writer factories ---- + + private static PlainValuesWriter newPlainWriter() { + return new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaLengthByteArrayValuesWriter newDeltaLengthWriter() { + return new DeltaLengthByteArrayValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaByteArrayWriter newDeltaStringsWriter() { + return new DeltaByteArrayWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter newDictWriter() { + return new DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + } + + // ---- Encode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodePlain() throws IOException { + return encodeBinaryWith(newPlainWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDeltaLengthByteArray() throws IOException { + return encodeBinaryWith(newDeltaLengthWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDeltaByteArray() throws IOException { + return encodeBinaryWith(newDeltaStringsWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDictionary() throws IOException { + return encodeBinaryWith(newDictWriter()); + } + + // ---- Decode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodePlain(Blackhole bh) throws IOException { + BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDeltaLengthByteArray(Blackhole bh) throws IOException { + DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaLengthEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDeltaByteArray(Blackhole bh) throws IOException { + DeltaByteArrayReader reader = new DeltaByteArrayReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaStringsEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java new file mode 100644 index 0000000000..690ddc2bbe --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BlackHoleOutputFile.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +/** + * A no-op {@link OutputFile} that discards all written data. + * Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks. + */ +public final class BlackHoleOutputFile implements OutputFile { + + public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile(); + + private BlackHoleOutputFile() {} + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1L; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new PositionOutputStream() { + private long pos; + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void write(int b) throws IOException { + ++pos; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + pos += len; + } + }; + } + + @Override + public String getPath() { + return "/dev/null"; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java new file mode 100644 index 0000000000..e59b7ba941 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitDecodingBenchmark.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding-level micro-benchmarks for the BYTE_STREAM_SPLIT encoding across the four + * primitive widths supported by Parquet ({@code FLOAT}, {@code DOUBLE}, {@code INT32}, + * {@code INT64}). + * + *

Each invocation decodes {@value #VALUE_COUNT} values; throughput is reported + * per-value via {@link OperationsPerInvocation}. The cost includes both + * {@code initFromPage} (which eagerly transposes the entire page) and the per-value + * read calls. Page transposition is the part this benchmark is primarily designed + * to exercise. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class ByteStreamSplitDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + private byte[] floatPage; + private byte[] doublePage; + private byte[] intPage; + private byte[] longPage; + + @Setup(Level.Trial) + public void setup() throws IOException { + Random random = new Random(42); + int[] intData = new int[VALUE_COUNT]; + long[] longData = new long[VALUE_COUNT]; + float[] floatData = new float[VALUE_COUNT]; + double[] doubleData = new double[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + intData[i] = random.nextInt(); + longData[i] = random.nextLong(); + floatData[i] = random.nextFloat(); + doubleData[i] = random.nextDouble(); + } + + { + ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (float v : floatData) { + w.writeFloat(v); + } + floatPage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (double v : doubleData) { + w.writeDouble(v); + } + doublePage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : intData) { + w.writeInteger(v); + } + intPage = w.getBytes().toByteArray(); + w.close(); + } + { + ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (long v : longData) { + w.writeLong(v); + } + longPage = w.getBytes().toByteArray(); + w.close(); + } + } + + private static void init(ByteStreamSplitValuesReader r, byte[] page) throws IOException { + r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(java.nio.ByteBuffer.wrap(page))); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeFloat(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForFloat r = new ByteStreamSplitValuesReaderForFloat(); + init(r, floatPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readFloat()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDouble(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForDouble r = new ByteStreamSplitValuesReaderForDouble(); + init(r, doublePage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readDouble()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeInt(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForInteger r = new ByteStreamSplitValuesReaderForInteger(); + init(r, intPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeLong(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForLong r = new ByteStreamSplitValuesReaderForLong(); + init(r, longPage); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(r.readLong()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java new file mode 100644 index 0000000000..37ec9df812 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ByteStreamSplitEncodingBenchmark.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmarks for the BYTE_STREAM_SPLIT encoding across the four + * primitive widths supported by Parquet ({@code FLOAT}, {@code DOUBLE}, {@code INT32}, + * {@code INT64}). + * + *

Each invocation encodes {@value #VALUE_COUNT} values; throughput is reported + * per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class ByteStreamSplitEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + private int[] intData; + private long[] longData; + private float[] floatData; + private double[] doubleData; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(42); + intData = new int[VALUE_COUNT]; + longData = new long[VALUE_COUNT]; + floatData = new float[VALUE_COUNT]; + doubleData = new double[VALUE_COUNT]; + for (int i = 0; i < VALUE_COUNT; i++) { + intData[i] = random.nextInt(); + longData[i] = random.nextLong(); + floatData[i] = random.nextFloat(); + doubleData[i] = random.nextDouble(); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFloat() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (float v : floatData) { + w.writeFloat(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDouble() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (double v : doubleData) { + w.writeDouble(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeInt() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : intData) { + w.writeInteger(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeLong() throws IOException { + ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (long v : longData) { + w.writeLong(v); + } + byte[] bytes = w.getBytes().toByteArray(); + w.close(); + return bytes; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java new file mode 100644 index 0000000000..9c5d135eab --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Multi-threaded benchmarks to validate that read and write operations perform correctly + * under concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). + * + *

    + *
  • {@link #concurrentWrite()} - each thread independently writes to a shared + * {@link BlackHoleOutputFile} (stateless sink)
  • + *
  • {@link #concurrentRead(Blackhole)} - each thread independently reads the same + * pre-generated Parquet file
  • + *
+ */ +@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 2, batchSize = 1) +@Measurement(iterations = 5, batchSize = 1) +@Threads(4) +@State(Scope.Benchmark) +public class ConcurrentReadWriteBenchmark { + + private File tempFile; + + @Setup(Level.Trial) + public void setup() throws IOException { + // Generate a shared file for concurrent reads + tempFile = File.createTempFile("parquet-concurrent-bench-", ".parquet"); + tempFile.deleteOnExit(); + tempFile.delete(); + + SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); + Random random = new Random(42); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .build()) { + for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { + writer.write(TestDataFactory.generateRow(factory, i, random)); + } + } + } + + @TearDown(Level.Trial) + public void tearDown() { + if (tempFile != null && tempFile.exists()) { + tempFile.delete(); + } + } + + /** + * Each thread writes a full file independently to the shared stateless + * {@link BlackHoleOutputFile} sink. + */ + @Benchmark + public void concurrentWrite() throws IOException { + SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); + Random random = new Random(Thread.currentThread().getId()); + try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .build()) { + for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { + writer.write(TestDataFactory.generateRow(factory, i, random)); + } + } + } + + /** + * Each thread reads the full pre-generated file independently. + */ + @Benchmark + public void concurrentRead(Blackhole bh) throws IOException { + InputFile inputFile = new LocalInputFile(tempFile.toPath()); + try (ParquetReader reader = new ParquetReader.Builder(inputFile) { + @Override + protected ReadSupport getReadSupport() { + return new GroupReadSupport(); + } + }.build()) { + Group group; + while ((group = reader.read()) != null) { + bh.consume(group); + } + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java new file mode 100644 index 0000000000..7d5d0f5159 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * File-level read benchmarks measuring throughput of the full Parquet read pipeline. + * A temporary file is generated during setup using {@link LocalOutputFile} (no Hadoop FS + * overhead on write side), then read repeatedly during the benchmark. + * + *

Parameterized across compression codec and writer version. + */ +@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@Fork(1) +@Warmup(iterations = 3, batchSize = 1) +@Measurement(iterations = 5, batchSize = 1) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FileReadBenchmark { + + @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"}) + public String codec; + + @Param({"PARQUET_1_0", "PARQUET_2_0"}) + public String writerVersion; + + private File tempFile; + + @Setup(Level.Trial) + public void setup() throws IOException { + tempFile = File.createTempFile("parquet-read-bench-", ".parquet"); + tempFile.deleteOnExit(); + tempFile.delete(); // remove so the writer can create it + + SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); + Random random = new Random(42); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .withCompressionCodec(CompressionCodecName.valueOf(codec)) + .withWriterVersion(WriterVersion.valueOf(writerVersion)) + .withDictionaryEncoding(true) + .build()) { + for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { + writer.write(TestDataFactory.generateRow(factory, i, random)); + } + } + } + + @TearDown(Level.Trial) + public void tearDown() { + if (tempFile != null && tempFile.exists()) { + tempFile.delete(); + } + } + + @Benchmark + public void readFile(Blackhole bh) throws IOException { + InputFile inputFile = new LocalInputFile(tempFile.toPath()); + try (ParquetReader reader = new ParquetReader.Builder(inputFile) { + @Override + protected ReadSupport getReadSupport() { + return new GroupReadSupport(); + } + }.build()) { + Group group; + while ((group = reader.read()) != null) { + bh.consume(group); + } + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java new file mode 100644 index 0000000000..60ac086504 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * File-level write benchmarks measuring throughput of the full Parquet write pipeline. + * Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU/encoding cost from + * filesystem I/O. + * + *

Parameterized across compression codec, writer version, and dictionary encoding. + */ +@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@Fork(1) +@Warmup(iterations = 3, batchSize = 1) +@Measurement(iterations = 5, batchSize = 1) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FileWriteBenchmark { + + @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP"}) + public String codec; + + @Param({"PARQUET_1_0", "PARQUET_2_0"}) + public String writerVersion; + + @Param({"true", "false"}) + public String dictionary; + + @Benchmark + public void writeFile() throws IOException { + SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); + Random random = new Random(42); + try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) + .withCompressionCodec(CompressionCodecName.valueOf(codec)) + .withWriterVersion(WriterVersion.valueOf(writerVersion)) + .withDictionaryEncoding(Boolean.parseBoolean(dictionary)) + .build()) { + for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { + writer.write(TestDataFactory.generateRow(factory, i, random)); + } + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java new file mode 100644 index 0000000000..7bf9359c92 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FixedLenByteArrayEncodingBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter; +import org.apache.parquet.io.api.Binary; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Encoding-level micro-benchmark for {@link FixedLenByteArrayPlainValuesWriter}. + * Each input value has a fixed length matching the writer's configured length, so + * no length prefix is emitted -- the writer simply concatenates the raw bytes. + * + *

Each benchmark invocation processes {@value #VALUE_COUNT} values; throughput + * is reported per-value via {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class FixedLenByteArrayEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 4 * 1024 * 1024; + + @Param({"10", "100", "1000"}) + public int fixedLength; + + private Binary[] data; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(42); + // distinct=0 -> all unique values; each is exactly fixedLength bytes long. + data = TestDataFactory.generateBinaryData(VALUE_COUNT, fixedLength, 0, random); + } + + private byte[] encodeWith(ValuesWriter writer) throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeFixedLenPlain() throws IOException { + return encodeWith(new FixedLenByteArrayPlainValuesWriter( + fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java new file mode 100644 index 0000000000..6ce4420e7e --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger; +import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; +import org.apache.parquet.column.values.plain.PlainValuesReader; +import org.apache.parquet.column.values.plain.PlainValuesWriter; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Encoding-level micro-benchmarks for INT32 values. + * Compares PLAIN, DELTA_BINARY_PACKED, BYTE_STREAM_SPLIT, and DICTIONARY encodings + * across different data distribution patterns. + * + *

Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is + * reported per-value using {@link OperationsPerInvocation}. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class IntEncodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + private static final int MAX_DICT_BYTE_SIZE = 1024 * 1024; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY", "HIGH_CARDINALITY"}) + public String dataPattern; + + private int[] data; + private byte[] plainEncoded; + private byte[] deltaEncoded; + private byte[] bssEncoded; + private byte[] rleEncoded; + private int rleBitWidth; + private byte[] dictDataEncoded; + private Dictionary intDictionary; + + @Setup(Level.Trial) + public void setup() throws IOException { + Random random = new Random(42); + switch (dataPattern) { + case "SEQUENTIAL": + data = TestDataFactory.generateSequentialInts(VALUE_COUNT); + break; + case "RANDOM": + data = TestDataFactory.generateRandomInts(VALUE_COUNT, random); + break; + case "LOW_CARDINALITY": + data = TestDataFactory.generateLowCardinalityInts( + VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, random); + break; + case "HIGH_CARDINALITY": + data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT); + break; + default: + throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); + } + + // Pre-encode data for decode benchmarks + plainEncoded = encodeWith(newPlainWriter()); + deltaEncoded = encodeWith(newDeltaWriter()); + bssEncoded = encodeWith(newBssWriter()); + + // Pre-encode RLE data (using 10-bit values to simulate dictionary indices) + rleBitWidth = 10; + RunLengthBitPackingHybridEncoder rleEncoder = new RunLengthBitPackingHybridEncoder( + rleBitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int v : data) { + rleEncoder.writeInt(v & 0x3FF); // mask to 10 bits + } + rleEncoded = rleEncoder.toBytes().toByteArray(); + rleEncoder.close(); + + // Pre-encode dictionary data for decode benchmark + DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter dictWriter = newDictWriter(); + for (int v : data) { + dictWriter.writeInteger(v); + } + BytesInput dictDataBytes = dictWriter.getBytes(); + dictDataEncoded = dictDataBytes.toByteArray(); + DictionaryPage dictPage = dictWriter.toDictPageAndClose().copy(); + intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(dictPage); + } + + private byte[] encodeWith(ValuesWriter writer) throws IOException { + for (int v : data) { + writer.writeInteger(v); + } + byte[] bytes = writer.getBytes().toByteArray(); + writer.close(); + return bytes; + } + + // ---- Writer factories ---- + + private static PlainValuesWriter newPlainWriter() { + return new PlainValuesWriter(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DeltaBinaryPackingValuesWriterForInteger newDeltaWriter() { + return new DeltaBinaryPackingValuesWriterForInteger(INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter newBssWriter() { + return new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter( + INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + } + + private static DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter newDictWriter() { + return new DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter( + MAX_DICT_BYTE_SIZE, Encoding.PLAIN_DICTIONARY, Encoding.PLAIN, new HeapByteBufferAllocator()); + } + + // ---- Encode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodePlain() throws IOException { + return encodeWith(newPlainWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDelta() throws IOException { + return encodeWith(newDeltaWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeByteStreamSplit() throws IOException { + return encodeWith(newBssWriter()); + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public byte[] encodeDictionary() throws IOException { + return encodeWith(newDictWriter()); + } + + // ---- Decode benchmarks ---- + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodePlain(Blackhole bh) throws IOException { + PlainValuesReader.IntegerPlainValuesReader reader = new PlainValuesReader.IntegerPlainValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(plainEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDelta(Blackhole bh) throws IOException { + DeltaBinaryPackingValuesReader reader = new DeltaBinaryPackingValuesReader(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeByteStreamSplit(Blackhole bh) throws IOException { + ByteStreamSplitValuesReaderForInteger reader = new ByteStreamSplitValuesReaderForInteger(); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(bssEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeRle(Blackhole bh) throws IOException { + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder( + rleBitWidth, ByteBufferInputStream.wrap(ByteBuffer.wrap(rleEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(decoder.readInt()); + } + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionary(Blackhole bh) throws IOException { + DictionaryValuesReader reader = new DictionaryValuesReader(intDictionary); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictDataEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readInteger()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java new file mode 100644 index 0000000000..753b27de4a --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RowGroupFlushBenchmark.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmark measuring row group flush performance and peak buffer memory. + * + *

Uses a wide schema (20 BINARY columns, 200 bytes each) to produce + * substantial per-column page buffers. A {@link PeakTrackingAllocator} + * wraps the heap allocator to precisely track the peak bytes outstanding + * across all parquet-managed ByteBuffers (independent of JVM GC behavior). + * + *

The key metric is {@code peakAllocatorMB}: with the interleaved flush + * optimization, each column's pages are finalized, written, and released + * before the next column is processed, so peak buffer memory is roughly + * 1/N of the total row group size (N = number of columns). + * + *

Writes to {@link BlackHoleOutputFile} to isolate flush cost from + * filesystem I/O. + */ +@BenchmarkMode({Mode.AverageTime}) +@Fork( + value = 1, + jvmArgs = {"-Xms512m", "-Xmx1g"}) +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class RowGroupFlushBenchmark { + + private static final int COLUMN_COUNT = 20; + private static final int BINARY_VALUE_LENGTH = 200; + private static final int ROW_COUNT = 100_000; + + /** Row group sizes: 8MB and 64MB. */ + @Param({"8388608", "67108864"}) + public int rowGroupSize; + + /** Wide schema: 20 required BINARY columns. */ + private static final MessageType WIDE_SCHEMA; + + static { + Types.MessageTypeBuilder builder = Types.buildMessage(); + for (int c = 0; c < COLUMN_COUNT; c++) { + builder.required(PrimitiveTypeName.BINARY).named("col_" + c); + } + WIDE_SCHEMA = builder.named("wide_record"); + } + + /** Pre-generated column values (one unique value per column). */ + private Binary[] columnValues; + + @Setup(Level.Trial) + public void setup() { + Random random = new Random(42); + columnValues = new Binary[COLUMN_COUNT]; + for (int c = 0; c < COLUMN_COUNT; c++) { + byte[] value = new byte[BINARY_VALUE_LENGTH]; + random.nextBytes(value); + columnValues[c] = Binary.fromConstantByteArray(value); + } + } + + /** + * Auxiliary counters reported alongside timing. JMH collects these after + * each iteration. + */ + @AuxCounters(AuxCounters.Type.EVENTS) + @State(Scope.Thread) + public static class MemoryCounters { + /** Peak bytes outstanding in the parquet ByteBufferAllocator. */ + public long peakAllocatorBytes; + + /** Convenience: peak in MB (peakAllocatorBytes / 1048576). */ + public double peakAllocatorMB; + + @Setup(Level.Iteration) + public void reset() { + peakAllocatorBytes = 0; + peakAllocatorMB = 0; + } + } + + /** + * ByteBufferAllocator wrapper that tracks current and peak allocated bytes. + * Thread-safe (uses AtomicLong) although the write path is single-threaded. + */ + static class PeakTrackingAllocator implements ByteBufferAllocator { + private final ByteBufferAllocator delegate = new HeapByteBufferAllocator(); + private final AtomicLong currentBytes = new AtomicLong(); + private final AtomicLong peakBytes = new AtomicLong(); + + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = delegate.allocate(size); + long current = currentBytes.addAndGet(buf.capacity()); + peakBytes.accumulateAndGet(current, Math::max); + return buf; + } + + @Override + public void release(ByteBuffer buf) { + currentBytes.addAndGet(-buf.capacity()); + delegate.release(buf); + } + + @Override + public boolean isDirect() { + return delegate.isDirect(); + } + + long getPeakBytes() { + return peakBytes.get(); + } + } + + @Benchmark + public void writeWithFlush(MemoryCounters counters) throws IOException { + PeakTrackingAllocator allocator = new PeakTrackingAllocator(); + SimpleGroupFactory factory = new SimpleGroupFactory(WIDE_SCHEMA); + + try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(WIDE_SCHEMA) + .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) + .withWriterVersion(WriterVersion.PARQUET_1_0) + .withRowGroupSize(rowGroupSize) + .withDictionaryEncoding(false) + .withAllocator(allocator) + .build()) { + for (int i = 0; i < ROW_COUNT; i++) { + Group group = factory.newGroup(); + for (int c = 0; c < COLUMN_COUNT; c++) { + group.append("col_" + c, columnValues[c]); + } + writer.write(group); + } + } + + counters.peakAllocatorBytes = allocator.getPeakBytes(); + counters.peakAllocatorMB = allocator.getPeakBytes() / (1024.0 * 1024.0); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java new file mode 100644 index 0000000000..f0fc7c52df --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +import java.util.Random; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; + +/** + * Utility class for generating test schemas and data for benchmarks. + */ +public final class TestDataFactory { + + /** Default number of rows for file-level benchmarks. */ + public static final int DEFAULT_ROW_COUNT = 100_000; + + /** Number of distinct values for low-cardinality data patterns. */ + public static final int LOW_CARDINALITY_DISTINCT = 100; + + /** A standard multi-type schema used by file-level benchmarks. */ + public static final MessageType FILE_BENCHMARK_SCHEMA = Types.buildMessage() + .required(INT32) + .named("int32_field") + .required(INT64) + .named("int64_field") + .required(FLOAT) + .named("float_field") + .required(DOUBLE) + .named("double_field") + .required(BOOLEAN) + .named("boolean_field") + .required(BINARY) + .named("binary_field") + .named("benchmark_record"); + + private TestDataFactory() {} + + /** + * Creates a {@link SimpleGroupFactory} for the standard benchmark schema. + */ + public static SimpleGroupFactory newGroupFactory() { + return new SimpleGroupFactory(FILE_BENCHMARK_SCHEMA); + } + + /** + * Generates a single row of benchmark data. + * + * @param factory the group factory + * @param index the row index (used for deterministic data) + * @param random the random source + * @return a populated Group + */ + public static Group generateRow(SimpleGroupFactory factory, int index, Random random) { + return factory.newGroup() + .append("int32_field", index) + .append("int64_field", (long) index * 100) + .append("float_field", random.nextFloat()) + .append("double_field", random.nextDouble()) + .append("boolean_field", index % 2 == 0) + .append("binary_field", "value_" + (index % 1000)); + } + + // ---- Integer data generation for encoding benchmarks ---- + + /** + * Generates sequential integers: 0, 1, 2, ... + */ + public static int[] generateSequentialInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + /** + * Generates uniformly random integers. + */ + public static int[] generateRandomInts(int count, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(); + } + return data; + } + + /** + * Generates low-cardinality integers (values drawn from a small set). + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, Random random) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = random.nextInt(distinctValues); + } + return data; + } + + /** + * Generates high-cardinality integers (all unique). + */ + public static int[] generateHighCardinalityInts(int count) { + int[] data = new int[count]; + for (int i = 0; i < count; i++) { + data[i] = i; + } + return data; + } + + // ---- Binary data generation for encoding benchmarks ---- + + /** + * Generates binary strings of the given length with the specified cardinality. + * + * @param count number of values + * @param stringLength length of each string + * @param distinct number of distinct values (0 means all unique) + * @param random random source + * @return array of Binary values + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, Random random) { + Binary[] data = new Binary[count]; + if (distinct > 0) { + // Pre-generate the distinct values + Binary[] dictionary = new Binary[distinct]; + for (int i = 0; i < distinct; i++) { + dictionary[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes()); + } + for (int i = 0; i < count; i++) { + data[i] = dictionary[random.nextInt(distinct)]; + } + } else { + // All unique + for (int i = 0; i < count; i++) { + data[i] = Binary.fromConstantByteArray( + randomString(stringLength, random).getBytes()); + } + } + return data; + } + + private static String randomString(int length, Random random) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } +} From 8f8f8a2b729855d421d55a07437e8475147eee90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 20 Apr 2026 01:16:11 +0000 Subject: [PATCH 2/3] GH-3511: Isolate setup cost and benchmark full dictionary paths Pre-generate deterministic rows for the file and concurrent benchmarks so row construction does not skew the timed section, and make the encoding benchmarks include real dictionary-page and dictionary-decode work instead of only value buffers. Split synthetic RLE dictionary-index decoding into its own benchmark and encode generated binary payloads as UTF-8 explicitly so benchmark inputs stay consistent across runs and platforms. --- .../benchmarks/BinaryEncodingBenchmark.java | 48 +++++++- .../ConcurrentReadWriteBenchmark.java | 38 +++--- .../parquet/benchmarks/FileReadBenchmark.java | 19 ++- .../benchmarks/FileWriteBenchmark.java | 31 +++-- .../benchmarks/IntEncodingBenchmark.java | 53 ++++---- .../RleDictionaryIndexDecodingBenchmark.java | 115 ++++++++++++++++++ .../parquet/benchmarks/TestDataFactory.java | 30 +++-- 7 files changed, 262 insertions(+), 72 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java index 7added9717..db65ca5f25 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java @@ -23,14 +23,19 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; +import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; +import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; import org.apache.parquet.column.values.plain.BinaryPlainValuesReader; import org.apache.parquet.column.values.plain.PlainValuesWriter; import org.apache.parquet.io.api.Binary; @@ -50,7 +55,7 @@ import org.openjdk.jmh.infra.Blackhole; /** - * Encoding-level micro-benchmarks for BINARY values. + * Encoding-level and decoding-level micro-benchmarks for BINARY values. * Compares PLAIN, DELTA_BYTE_ARRAY, DELTA_LENGTH_BYTE_ARRAY, and DICTIONARY encodings * across different string lengths and cardinality patterns. * @@ -81,6 +86,8 @@ public class BinaryEncodingBenchmark { private byte[] plainEncoded; private byte[] deltaLengthEncoded; private byte[] deltaStringsEncoded; + private byte[] dictEncoded; + private Dictionary binaryDictionary; @Setup(Level.Trial) public void setup() throws IOException { @@ -92,6 +99,15 @@ public void setup() throws IOException { plainEncoded = encodeBinaryWith(newPlainWriter()); deltaLengthEncoded = encodeBinaryWith(newDeltaLengthWriter()); deltaStringsEncoded = encodeBinaryWith(newDeltaStringsWriter()); + + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter dictWriter = newDictWriter(); + for (Binary v : data) { + dictWriter.writeBytes(v); + } + dictEncoded = dictWriter.getBytes().toByteArray(); + DictionaryPage dictPage = dictWriter.toDictPageAndClose().copy(); + binaryDictionary = new PlainValuesDictionary.PlainBinaryDictionary(dictPage); + dictWriter.close(); } private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { @@ -103,6 +119,24 @@ private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { return bytes; } + private byte[] encodeDictionaryWith(DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter writer) + throws IOException { + for (Binary v : data) { + writer.writeBytes(v); + } + BytesInput dataBytes = writer.getBytes(); + DictionaryPage dictPage = writer.toDictPageAndClose(); + byte[] bytes; + if (dictPage == null) { + bytes = dataBytes.toByteArray(); + } else { + BytesInput allBytes = BytesInput.concat(dataBytes, dictPage.getBytes()); + bytes = allBytes.toByteArray(); + } + writer.close(); + return bytes; + } + // ---- Writer factories ---- private static PlainValuesWriter newPlainWriter() { @@ -145,7 +179,7 @@ public byte[] encodeDeltaByteArray() throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) public byte[] encodeDictionary() throws IOException { - return encodeBinaryWith(newDictWriter()); + return encodeDictionaryWith(newDictWriter()); } // ---- Decode benchmarks ---- @@ -179,4 +213,14 @@ public void decodeDeltaByteArray(Blackhole bh) throws IOException { bh.consume(reader.readBytes()); } } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionary(Blackhole bh) throws IOException { + DictionaryValuesReader reader = new DictionaryValuesReader(binaryDictionary); + reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictEncoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(reader.readBytes()); + } + } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java index 9c5d135eab..29371f7eb1 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java @@ -20,10 +20,8 @@ import java.io.File; import java.io.IOException; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; @@ -49,8 +47,10 @@ import org.openjdk.jmh.infra.Blackhole; /** - * Multi-threaded benchmarks to validate that read and write operations perform correctly - * under concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). + * Multi-threaded benchmarks measuring independent read and write throughput under + * concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). + * This benchmark does not assert correctness; it measures the cost of each thread + * writing a full file to a stateless sink or reading a shared pre-generated file. * *

    *
  • {@link #concurrentWrite()} - each thread independently writes to a shared @@ -59,7 +59,7 @@ * pre-generated Parquet file
  • *
*/ -@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@BenchmarkMode(Mode.SingleShotTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @Fork(1) @Warmup(iterations = 2, batchSize = 1) @@ -69,6 +69,18 @@ public class ConcurrentReadWriteBenchmark { private File tempFile; + private Group[] readRows; + + @State(Scope.Thread) + public static class ThreadData { + private Group[] rows; + + @Setup(Level.Trial) + public void setup() { + rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); + } + } @Setup(Level.Trial) public void setup() throws IOException { @@ -77,14 +89,14 @@ public void setup() throws IOException { tempFile.deleteOnExit(); tempFile.delete(); - SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); - Random random = new Random(42); + readRows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) .build()) { - for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { - writer.write(TestDataFactory.generateRow(factory, i, random)); + for (Group row : readRows) { + writer.write(row); } } } @@ -101,15 +113,13 @@ public void tearDown() { * {@link BlackHoleOutputFile} sink. */ @Benchmark - public void concurrentWrite() throws IOException { - SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); - Random random = new Random(Thread.currentThread().getId()); + public void concurrentWrite(ThreadData threadData) throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) .build()) { - for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { - writer.write(TestDataFactory.generateRow(factory, i, random)); + for (Group row : threadData.rows) { + writer.write(row); } } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java index 7d5d0f5159..eb5b959efa 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java @@ -20,11 +20,9 @@ import java.io.File; import java.io.IOException; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; @@ -51,13 +49,14 @@ import org.openjdk.jmh.infra.Blackhole; /** - * File-level read benchmarks measuring throughput of the full Parquet read pipeline. - * A temporary file is generated during setup using {@link LocalOutputFile} (no Hadoop FS - * overhead on write side), then read repeatedly during the benchmark. + * File-level read benchmarks measuring end-to-end Parquet read throughput through the + * example {@link Group} API. A temporary file is generated once during setup from + * pre-generated rows using {@link LocalOutputFile}, then read repeatedly during the + * benchmark. * *

Parameterized across compression codec and writer version. */ -@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@BenchmarkMode(Mode.SingleShotTime) @Fork(1) @Warmup(iterations = 3, batchSize = 1) @Measurement(iterations = 5, batchSize = 1) @@ -79,8 +78,8 @@ public void setup() throws IOException { tempFile.deleteOnExit(); tempFile.delete(); // remove so the writer can create it - SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); - Random random = new Random(42); + Group[] rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) @@ -88,8 +87,8 @@ public void setup() throws IOException { .withWriterVersion(WriterVersion.valueOf(writerVersion)) .withDictionaryEncoding(true) .build()) { - for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { - writer.write(TestDataFactory.generateRow(factory, i, random)); + for (Group row : rows) { + writer.write(row); } } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java index 60ac086504..73f60d7199 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java @@ -19,11 +19,9 @@ package org.apache.parquet.benchmarks; import java.io.IOException; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; @@ -31,22 +29,27 @@ import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; /** - * File-level write benchmarks measuring throughput of the full Parquet write pipeline. - * Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU/encoding cost from - * filesystem I/O. + * File-level write benchmarks measuring end-to-end Parquet write throughput through the + * example {@link Group} API. Row contents are pre-generated during setup so compression + * and writer settings dominate the timed section, while writes still flow through the + * full Parquet writer path. * - *

Parameterized across compression codec, writer version, and dictionary encoding. + *

Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost + * from filesystem I/O. Parameterized across compression codec, writer version, and + * dictionary encoding. */ -@BenchmarkMode({Mode.SingleShotTime, Mode.AverageTime}) +@BenchmarkMode(Mode.SingleShotTime) @Fork(1) @Warmup(iterations = 3, batchSize = 1) @Measurement(iterations = 5, batchSize = 1) @@ -63,10 +66,16 @@ public class FileWriteBenchmark { @Param({"true", "false"}) public String dictionary; + private Group[] rows; + + @Setup(Level.Trial) + public void setup() { + rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); + } + @Benchmark public void writeFile() throws IOException { - SimpleGroupFactory factory = TestDataFactory.newGroupFactory(); - Random random = new Random(42); try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) @@ -74,8 +83,8 @@ public void writeFile() throws IOException { .withWriterVersion(WriterVersion.valueOf(writerVersion)) .withDictionaryEncoding(Boolean.parseBoolean(dictionary)) .build()) { - for (int i = 0; i < TestDataFactory.DEFAULT_ROW_COUNT; i++) { - writer.write(TestDataFactory.generateRow(factory, i, random)); + for (Group row : rows) { + writer.write(row); } } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java index 6ce4420e7e..df767df455 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java @@ -38,8 +38,6 @@ import org.apache.parquet.column.values.dictionary.PlainValuesDictionary; import org.apache.parquet.column.values.plain.PlainValuesReader; import org.apache.parquet.column.values.plain.PlainValuesWriter; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -56,9 +54,11 @@ import org.openjdk.jmh.infra.Blackhole; /** - * Encoding-level micro-benchmarks for INT32 values. + * Encoding-level and decoding-level micro-benchmarks for INT32 values. * Compares PLAIN, DELTA_BINARY_PACKED, BYTE_STREAM_SPLIT, and DICTIONARY encodings - * across different data distribution patterns. + * across different data distribution patterns. Synthetic dictionary-id RLE decode is + * benchmarked separately in {@link RleDictionaryIndexDecodingBenchmark} so the results + * here stay comparable at the full-value level. * *

Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is * reported per-value using {@link OperationsPerInvocation}. @@ -83,8 +83,6 @@ public class IntEncodingBenchmark { private byte[] plainEncoded; private byte[] deltaEncoded; private byte[] bssEncoded; - private byte[] rleEncoded; - private int rleBitWidth; private byte[] dictDataEncoded; private Dictionary intDictionary; @@ -103,7 +101,7 @@ public void setup() throws IOException { VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, random); break; case "HIGH_CARDINALITY": - data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT); + data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, random); break; default: throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); @@ -114,16 +112,6 @@ public void setup() throws IOException { deltaEncoded = encodeWith(newDeltaWriter()); bssEncoded = encodeWith(newBssWriter()); - // Pre-encode RLE data (using 10-bit values to simulate dictionary indices) - rleBitWidth = 10; - RunLengthBitPackingHybridEncoder rleEncoder = new RunLengthBitPackingHybridEncoder( - rleBitWidth, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); - for (int v : data) { - rleEncoder.writeInt(v & 0x3FF); // mask to 10 bits - } - rleEncoded = rleEncoder.toBytes().toByteArray(); - rleEncoder.close(); - // Pre-encode dictionary data for decode benchmark DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter dictWriter = newDictWriter(); for (int v : data) { @@ -133,6 +121,7 @@ public void setup() throws IOException { dictDataEncoded = dictDataBytes.toByteArray(); DictionaryPage dictPage = dictWriter.toDictPageAndClose().copy(); intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(dictPage); + dictWriter.close(); } private byte[] encodeWith(ValuesWriter writer) throws IOException { @@ -144,6 +133,24 @@ private byte[] encodeWith(ValuesWriter writer) throws IOException { return bytes; } + private byte[] encodeDictionaryWith(DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter writer) + throws IOException { + for (int v : data) { + writer.writeInteger(v); + } + BytesInput dataBytes = writer.getBytes(); + DictionaryPage dictPage = writer.toDictPageAndClose(); + byte[] bytes; + if (dictPage == null) { + bytes = dataBytes.toByteArray(); + } else { + BytesInput allBytes = BytesInput.concat(dataBytes, dictPage.getBytes()); + bytes = allBytes.toByteArray(); + } + writer.close(); + return bytes; + } + // ---- Writer factories ---- private static PlainValuesWriter newPlainWriter() { @@ -187,7 +194,7 @@ public byte[] encodeByteStreamSplit() throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) public byte[] encodeDictionary() throws IOException { - return encodeWith(newDictWriter()); + return encodeDictionaryWith(newDictWriter()); } // ---- Decode benchmarks ---- @@ -222,16 +229,6 @@ public void decodeByteStreamSplit(Blackhole bh) throws IOException { } } - @Benchmark - @OperationsPerInvocation(VALUE_COUNT) - public void decodeRle(Blackhole bh) throws IOException { - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder( - rleBitWidth, ByteBufferInputStream.wrap(ByteBuffer.wrap(rleEncoded))); - for (int i = 0; i < VALUE_COUNT; i++) { - bh.consume(decoder.readInt()); - } - } - @Benchmark @OperationsPerInvocation(VALUE_COUNT) public void decodeDictionary(Blackhole bh) throws IOException { diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java new file mode 100644 index 0000000000..c9d604c946 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; +import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Decoding micro-benchmark for synthetic dictionary-id pages encoded with + * {@link RunLengthBitPackingHybridEncoder}. This isolates the dictionary-id + * decode path and is intentionally separate from {@link IntEncodingBenchmark}, + * which measures full INT32 value decode paths. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class RleDictionaryIndexDecodingBenchmark { + + static final int VALUE_COUNT = 100_000; + private static final int INIT_SLAB_SIZE = 64 * 1024; + private static final int PAGE_SIZE = 1024 * 1024; + private static final int BIT_WIDTH = 10; + private static final int MAX_ID = 1 << BIT_WIDTH; + + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY"}) + public String indexPattern; + + private byte[] encoded; + + @Setup(Level.Trial) + public void setup() throws IOException { + int[] ids = generateDictionaryIds(); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); + for (int id : ids) { + encoder.writeInt(id); + } + encoded = encoder.toBytes().toByteArray(); + encoder.close(); + } + + private int[] generateDictionaryIds() { + int[] ids = new int[VALUE_COUNT]; + Random random = new Random(42); + switch (indexPattern) { + case "SEQUENTIAL": + for (int i = 0; i < VALUE_COUNT; i++) { + ids[i] = i % MAX_ID; + } + break; + case "RANDOM": + for (int i = 0; i < VALUE_COUNT; i++) { + ids[i] = random.nextInt(MAX_ID); + } + break; + case "LOW_CARDINALITY": + for (int i = 0; i < VALUE_COUNT; i++) { + ids[i] = random.nextInt(TestDataFactory.LOW_CARDINALITY_DISTINCT); + } + break; + default: + throw new IllegalArgumentException("Unknown index pattern: " + indexPattern); + } + return ids; + } + + @Benchmark + @OperationsPerInvocation(VALUE_COUNT) + public void decodeDictionaryIds(Blackhole bh) throws IOException { + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder( + BIT_WIDTH, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded))); + for (int i = 0; i < VALUE_COUNT; i++) { + bh.consume(decoder.readInt()); + } + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java index f0fc7c52df..bc00f3b070 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -25,6 +25,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import java.nio.charset.StandardCharsets; import java.util.Random; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; @@ -86,6 +87,18 @@ public static Group generateRow(SimpleGroupFactory factory, int index, Random ra .append("binary_field", "value_" + (index % 1000)); } + /** + * Generates a deterministic set of rows for file-level benchmarks. + */ + public static Group[] generateRows(SimpleGroupFactory factory, int rowCount, long seed) { + Group[] rows = new Group[rowCount]; + Random random = new Random(seed); + for (int i = 0; i < rowCount; i++) { + rows[i] = generateRow(factory, i, random); + } + return rows; + } + // ---- Integer data generation for encoding benchmarks ---- /** @@ -122,12 +135,15 @@ public static int[] generateLowCardinalityInts(int count, int distinctValues, Ra } /** - * Generates high-cardinality integers (all unique). + * Generates high-cardinality integers (all unique in randomized order). */ - public static int[] generateHighCardinalityInts(int count) { - int[] data = new int[count]; - for (int i = 0; i < count; i++) { - data[i] = i; + public static int[] generateHighCardinalityInts(int count, Random random) { + int[] data = generateSequentialInts(count); + for (int i = count - 1; i > 0; i--) { + int swapIndex = random.nextInt(i + 1); + int tmp = data[i]; + data[i] = data[swapIndex]; + data[swapIndex] = tmp; } return data; } @@ -150,7 +166,7 @@ public static Binary[] generateBinaryData(int count, int stringLength, int disti Binary[] dictionary = new Binary[distinct]; for (int i = 0; i < distinct; i++) { dictionary[i] = Binary.fromConstantByteArray( - randomString(stringLength, random).getBytes()); + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); } for (int i = 0; i < count; i++) { data[i] = dictionary[random.nextInt(distinct)]; @@ -159,7 +175,7 @@ public static Binary[] generateBinaryData(int count, int stringLength, int disti // All unique for (int i = 0; i < count; i++) { data[i] = Binary.fromConstantByteArray( - randomString(stringLength, random).getBytes()); + randomString(stringLength, random).getBytes(StandardCharsets.UTF_8)); } } return data; From 5bf53dd379e1a284c6c5ac5c1473adbf4c657960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 20 Apr 2026 01:45:22 +0000 Subject: [PATCH 3/3] GH-3511: Tighten dictionary benchmarks and reduce setup coupling Make the dictionary encode/decode benchmarks symmetric by routing both sides through a shared EncodedDictionary helper, guard against the dictionary writer falling back to plain encoding (which previously NPE'd in BinaryEncodingBenchmark setup for high-cardinality long strings), and drop redundant close() calls after toDictPageAndClose(). Share the pre-generated row array across threads in ConcurrentReadWriteBenchmark via Scope.Benchmark, eliminating 4x heap duplication and a now-unnecessary ThreadData inner class. Centralize the RNG seed as TestDataFactory.DEFAULT_SEED and add seed-overload variants for the int and binary generators so generators in the same setup no longer share a Random and silently depend on call order. Wrap the RLE encoder in try-with-resources and validate that LOW_CARDINALITY_DISTINCT fits within the configured bit width. --- .../benchmarks/BenchmarkEncodingUtils.java | 70 +++++++++++++++++++ .../benchmarks/BinaryEncodingBenchmark.java | 53 ++++++++------ .../ConcurrentReadWriteBenchmark.java | 43 ++++++------ .../parquet/benchmarks/FileReadBenchmark.java | 10 ++- .../benchmarks/FileWriteBenchmark.java | 6 +- .../benchmarks/IntEncodingBenchmark.java | 59 +++++++++------- .../RleDictionaryIndexDecodingBenchmark.java | 47 +++++++------ .../parquet/benchmarks/TestDataFactory.java | 35 ++++++++++ 8 files changed, 227 insertions(+), 96 deletions(-) create mode 100644 parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java new file mode 100644 index 0000000000..c79dedce28 --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkEncodingUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.IOException; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter; + +/** + * Shared helpers for encode/decode micro-benchmarks. + */ +final class BenchmarkEncodingUtils { + + private BenchmarkEncodingUtils() {} + + /** + * Container for the two artefacts produced by a dictionary-encoded page: + * the encoded dictionary indices ({@link #dictData}) and the dictionary + * page itself ({@link #dictPage}). The dictionary page may be {@code null} + * if the writer fell back to plain encoding (for example, when the + * dictionary exceeded its configured maximum size). + */ + static final class EncodedDictionary { + final byte[] dictData; + final DictionaryPage dictPage; + + EncodedDictionary(byte[] dictData, DictionaryPage dictPage) { + this.dictData = dictData; + this.dictPage = dictPage; + } + + boolean fellBackToPlain() { + return dictPage == null; + } + } + + /** + * Drains a {@link DictionaryValuesWriter} into an {@link EncodedDictionary}. + * + *

The writer's data bytes (the RLE-encoded indices) and the dictionary + * page are returned separately so both pieces can be measured or fed to a + * decoder symmetrically. The dictionary page buffer is copied so it remains + * valid after the writer's allocator is released. + * + *

The writer is closed via {@code toDictPageAndClose()}; callers must not + * call {@link DictionaryValuesWriter#close()} again afterwards. + */ + static EncodedDictionary drainDictionary(DictionaryValuesWriter writer) throws IOException { + byte[] dictData = writer.getBytes().toByteArray(); + DictionaryPage rawPage = writer.toDictPageAndClose(); + DictionaryPage dictPage = rawPage == null ? null : rawPage.copy(); + return new EncodedDictionary(dictData, dictPage); + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java index db65ca5f25..e6646458d8 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BinaryEncodingBenchmark.java @@ -20,10 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; @@ -61,6 +59,14 @@ * *

Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is * reported per-value using {@link OperationsPerInvocation}. + * + *

The dictionary encode/decode benchmarks intentionally measure the full path: + * the encoder produces both the RLE-encoded indices and a {@link DictionaryPage}; + * the decoder consumes the indices through a {@link DictionaryValuesReader} backed + * by the same dictionary. If the dictionary exceeds {@link #MAX_DICT_BYTE_SIZE} + * (which can happen for high-cardinality, long-string parameter combinations) the + * writer falls back to plain encoding and dictionary decoding for that combination + * is skipped. */ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @@ -87,13 +93,14 @@ public class BinaryEncodingBenchmark { private byte[] deltaLengthEncoded; private byte[] deltaStringsEncoded; private byte[] dictEncoded; + private DictionaryPage dictPage; private Dictionary binaryDictionary; + private boolean dictionaryAvailable; @Setup(Level.Trial) public void setup() throws IOException { - Random random = new Random(42); int distinct = "LOW".equals(cardinality) ? TestDataFactory.LOW_CARDINALITY_DISTINCT : 0; - data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, distinct, random); + data = TestDataFactory.generateBinaryData(VALUE_COUNT, stringLength, distinct, TestDataFactory.DEFAULT_SEED); // Pre-encode data for decode benchmarks plainEncoded = encodeBinaryWith(newPlainWriter()); @@ -104,10 +111,13 @@ public void setup() throws IOException { for (Binary v : data) { dictWriter.writeBytes(v); } - dictEncoded = dictWriter.getBytes().toByteArray(); - DictionaryPage dictPage = dictWriter.toDictPageAndClose().copy(); - binaryDictionary = new PlainValuesDictionary.PlainBinaryDictionary(dictPage); - dictWriter.close(); + BenchmarkEncodingUtils.EncodedDictionary encoded = BenchmarkEncodingUtils.drainDictionary(dictWriter); + dictEncoded = encoded.dictData; + dictPage = encoded.dictPage; + dictionaryAvailable = !encoded.fellBackToPlain(); + if (dictionaryAvailable) { + binaryDictionary = new PlainValuesDictionary.PlainBinaryDictionary(dictPage); + } } private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { @@ -119,22 +129,12 @@ private byte[] encodeBinaryWith(ValuesWriter writer) throws IOException { return bytes; } - private byte[] encodeDictionaryWith(DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter writer) - throws IOException { + private BenchmarkEncodingUtils.EncodedDictionary encodeDictionaryWith( + DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter writer) throws IOException { for (Binary v : data) { writer.writeBytes(v); } - BytesInput dataBytes = writer.getBytes(); - DictionaryPage dictPage = writer.toDictPageAndClose(); - byte[] bytes; - if (dictPage == null) { - bytes = dataBytes.toByteArray(); - } else { - BytesInput allBytes = BytesInput.concat(dataBytes, dictPage.getBytes()); - bytes = allBytes.toByteArray(); - } - writer.close(); - return bytes; + return BenchmarkEncodingUtils.drainDictionary(writer); } // ---- Writer factories ---- @@ -178,8 +178,10 @@ public byte[] encodeDeltaByteArray() throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) - public byte[] encodeDictionary() throws IOException { - return encodeDictionaryWith(newDictWriter()); + public void encodeDictionary(Blackhole bh) throws IOException { + BenchmarkEncodingUtils.EncodedDictionary encoded = encodeDictionaryWith(newDictWriter()); + bh.consume(encoded.dictData); + bh.consume(encoded.dictPage); } // ---- Decode benchmarks ---- @@ -217,6 +219,11 @@ public void decodeDeltaByteArray(Blackhole bh) throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) public void decodeDictionary(Blackhole bh) throws IOException { + if (!dictionaryAvailable) { + // Dictionary fell back to plain encoding (e.g. high-cardinality long strings + // exceeding MAX_DICT_BYTE_SIZE). Skip to keep the benchmark meaningful. + return; + } DictionaryValuesReader reader = new DictionaryValuesReader(binaryDictionary); reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictEncoded))); for (int i = 0; i < VALUE_COUNT; i++) { diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java index 29371f7eb1..de94b422cf 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java @@ -49,15 +49,23 @@ /** * Multi-threaded benchmarks measuring independent read and write throughput under * concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). - * This benchmark does not assert correctness; it measures the cost of each thread + * + *

This benchmark does not assert correctness; it measures the cost of each thread * writing a full file to a stateless sink or reading a shared pre-generated file. + * The set of rows used by {@link #concurrentWrite(Blackhole)} is built once during + * setup and shared (read-only) across all threads, so the timed section measures + * the encoder/serializer pipeline rather than per-row data construction. * *

    - *
  • {@link #concurrentWrite()} - each thread independently writes to a shared - * {@link BlackHoleOutputFile} (stateless sink)
  • + *
  • {@link #concurrentWrite(Blackhole)} - each thread independently writes the + * shared pre-generated rows to a {@link BlackHoleOutputFile} (stateless sink)
  • *
  • {@link #concurrentRead(Blackhole)} - each thread independently reads the same * pre-generated Parquet file
  • *
+ * + *

{@link Mode#SingleShotTime} is used because each invocation does enough work + * (a full file write or read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) + * that JIT amortization across invocations is unnecessary. */ @BenchmarkMode(Mode.SingleShotTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -69,33 +77,23 @@ public class ConcurrentReadWriteBenchmark { private File tempFile; - private Group[] readRows; - - @State(Scope.Thread) - public static class ThreadData { - private Group[] rows; - - @Setup(Level.Trial) - public void setup() { - rows = TestDataFactory.generateRows( - TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); - } - } + private Group[] rows; @Setup(Level.Trial) public void setup() throws IOException { + rows = TestDataFactory.generateRows( + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED); + // Generate a shared file for concurrent reads tempFile = File.createTempFile("parquet-concurrent-bench-", ".parquet"); tempFile.deleteOnExit(); tempFile.delete(); - readRows = TestDataFactory.generateRows( - TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) .build()) { - for (Group row : readRows) { + for (Group row : rows) { writer.write(row); } } @@ -109,19 +107,20 @@ public void tearDown() { } /** - * Each thread writes a full file independently to the shared stateless - * {@link BlackHoleOutputFile} sink. + * Each thread writes the shared pre-generated rows independently to the + * stateless {@link BlackHoleOutputFile} sink. */ @Benchmark - public void concurrentWrite(ThreadData threadData) throws IOException { + public void concurrentWrite(Blackhole bh) throws IOException { try (ParquetWriter writer = ExampleParquetWriter.builder(BlackHoleOutputFile.INSTANCE) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) .build()) { - for (Group row : threadData.rows) { + for (Group row : rows) { writer.write(row); } } + bh.consume(rows); } /** diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java index eb5b959efa..de8e0b6580 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileReadBenchmark.java @@ -54,7 +54,13 @@ * pre-generated rows using {@link LocalOutputFile}, then read repeatedly during the * benchmark. * - *

Parameterized across compression codec and writer version. + *

Parameterized across compression codec and writer version. The footer parse + * (via {@link LocalInputFile} open) is included in the timed section so the result + * reflects the full open-and-read cost a typical caller would observe. + * + *

{@link Mode#SingleShotTime} is used because each invocation does enough work + * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT + * amortization across invocations is unnecessary. */ @BenchmarkMode(Mode.SingleShotTime) @Fork(1) @@ -79,7 +85,7 @@ public void setup() throws IOException { tempFile.delete(); // remove so the writer can create it Group[] rows = TestDataFactory.generateRows( - TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED); try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(tempFile.toPath())) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(TestDataFactory.FILE_BENCHMARK_SCHEMA) diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java index 73f60d7199..f6174bcaa2 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/FileWriteBenchmark.java @@ -48,6 +48,10 @@ *

Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost * from filesystem I/O. Parameterized across compression codec, writer version, and * dictionary encoding. + * + *

{@link Mode#SingleShotTime} is used because each invocation does enough work + * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT + * amortization across invocations is unnecessary. */ @BenchmarkMode(Mode.SingleShotTime) @Fork(1) @@ -71,7 +75,7 @@ public class FileWriteBenchmark { @Setup(Level.Trial) public void setup() { rows = TestDataFactory.generateRows( - TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, 42L); + TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED); } @Benchmark diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java index df767df455..7665a7462a 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/IntEncodingBenchmark.java @@ -20,10 +20,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.bytes.ByteBufferInputStream; -import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; @@ -62,6 +60,15 @@ * *

Each benchmark invocation processes {@value #VALUE_COUNT} values. Throughput is * reported per-value using {@link OperationsPerInvocation}. + * + *

BYTE_STREAM_SPLIT is included for completeness even though it is rarely a good + * choice for integer data; it exists here to compare the full set of encodings the + * Parquet writer can emit for INT32. + * + *

The dictionary encode/decode benchmarks measure the full path: the encoder + * produces both the RLE-encoded indices and a {@link DictionaryPage}; the decoder + * consumes the indices through a {@link DictionaryValuesReader} backed by the same + * dictionary. */ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @@ -84,24 +91,25 @@ public class IntEncodingBenchmark { private byte[] deltaEncoded; private byte[] bssEncoded; private byte[] dictDataEncoded; + private DictionaryPage dictPage; private Dictionary intDictionary; + private boolean dictionaryAvailable; @Setup(Level.Trial) public void setup() throws IOException { - Random random = new Random(42); switch (dataPattern) { case "SEQUENTIAL": data = TestDataFactory.generateSequentialInts(VALUE_COUNT); break; case "RANDOM": - data = TestDataFactory.generateRandomInts(VALUE_COUNT, random); + data = TestDataFactory.generateRandomInts(VALUE_COUNT, TestDataFactory.DEFAULT_SEED); break; case "LOW_CARDINALITY": data = TestDataFactory.generateLowCardinalityInts( - VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, random); + VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED); break; case "HIGH_CARDINALITY": - data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, random); + data = TestDataFactory.generateHighCardinalityInts(VALUE_COUNT, TestDataFactory.DEFAULT_SEED); break; default: throw new IllegalArgumentException("Unknown data pattern: " + dataPattern); @@ -117,11 +125,13 @@ public void setup() throws IOException { for (int v : data) { dictWriter.writeInteger(v); } - BytesInput dictDataBytes = dictWriter.getBytes(); - dictDataEncoded = dictDataBytes.toByteArray(); - DictionaryPage dictPage = dictWriter.toDictPageAndClose().copy(); - intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(dictPage); - dictWriter.close(); + BenchmarkEncodingUtils.EncodedDictionary encoded = BenchmarkEncodingUtils.drainDictionary(dictWriter); + dictDataEncoded = encoded.dictData; + dictPage = encoded.dictPage; + dictionaryAvailable = !encoded.fellBackToPlain(); + if (dictionaryAvailable) { + intDictionary = new PlainValuesDictionary.PlainIntegerDictionary(dictPage); + } } private byte[] encodeWith(ValuesWriter writer) throws IOException { @@ -133,22 +143,12 @@ private byte[] encodeWith(ValuesWriter writer) throws IOException { return bytes; } - private byte[] encodeDictionaryWith(DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter writer) - throws IOException { + private BenchmarkEncodingUtils.EncodedDictionary encodeDictionaryWith( + DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter writer) throws IOException { for (int v : data) { writer.writeInteger(v); } - BytesInput dataBytes = writer.getBytes(); - DictionaryPage dictPage = writer.toDictPageAndClose(); - byte[] bytes; - if (dictPage == null) { - bytes = dataBytes.toByteArray(); - } else { - BytesInput allBytes = BytesInput.concat(dataBytes, dictPage.getBytes()); - bytes = allBytes.toByteArray(); - } - writer.close(); - return bytes; + return BenchmarkEncodingUtils.drainDictionary(writer); } // ---- Writer factories ---- @@ -193,8 +193,10 @@ public byte[] encodeByteStreamSplit() throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) - public byte[] encodeDictionary() throws IOException { - return encodeDictionaryWith(newDictWriter()); + public void encodeDictionary(Blackhole bh) throws IOException { + BenchmarkEncodingUtils.EncodedDictionary encoded = encodeDictionaryWith(newDictWriter()); + bh.consume(encoded.dictData); + bh.consume(encoded.dictPage); } // ---- Decode benchmarks ---- @@ -232,6 +234,11 @@ public void decodeByteStreamSplit(Blackhole bh) throws IOException { @Benchmark @OperationsPerInvocation(VALUE_COUNT) public void decodeDictionary(Blackhole bh) throws IOException { + if (!dictionaryAvailable) { + // Dictionary fell back to plain encoding (e.g. very large unique-value sets + // exceeding MAX_DICT_BYTE_SIZE). Skip to keep the benchmark meaningful. + return; + } DictionaryValuesReader reader = new DictionaryValuesReader(intDictionary); reader.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(dictDataEncoded))); for (int i = 0; i < VALUE_COUNT; i++) { diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java index c9d604c946..68c51f0842 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/RleDictionaryIndexDecodingBenchmark.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.HeapByteBufferAllocator; @@ -46,6 +45,10 @@ * {@link RunLengthBitPackingHybridEncoder}. This isolates the dictionary-id * decode path and is intentionally separate from {@link IntEncodingBenchmark}, * which measures full INT32 value decode paths. + * + *

Per-invocation overhead (decoder construction and {@link ByteBufferInputStream} + * wrapping) is amortized over {@value #VALUE_COUNT} reads via + * {@link OperationsPerInvocation}. */ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @@ -61,6 +64,13 @@ public class RleDictionaryIndexDecodingBenchmark { private static final int BIT_WIDTH = 10; private static final int MAX_ID = 1 << BIT_WIDTH; + static { + if (TestDataFactory.LOW_CARDINALITY_DISTINCT > MAX_ID) { + throw new IllegalStateException("LOW_CARDINALITY_DISTINCT (" + TestDataFactory.LOW_CARDINALITY_DISTINCT + + ") must fit within BIT_WIDTH=" + BIT_WIDTH + " (MAX_ID=" + MAX_ID + ")"); + } + } + @Param({"SEQUENTIAL", "RANDOM", "LOW_CARDINALITY"}) public String indexPattern; @@ -69,45 +79,38 @@ public class RleDictionaryIndexDecodingBenchmark { @Setup(Level.Trial) public void setup() throws IOException { int[] ids = generateDictionaryIds(); - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( - BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator()); - for (int id : ids) { - encoder.writeInt(id); + try (RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder( + BIT_WIDTH, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator())) { + for (int id : ids) { + encoder.writeInt(id); + } + encoded = encoder.toBytes().toByteArray(); } - encoded = encoder.toBytes().toByteArray(); - encoder.close(); } private int[] generateDictionaryIds() { - int[] ids = new int[VALUE_COUNT]; - Random random = new Random(42); switch (indexPattern) { case "SEQUENTIAL": + int[] sequential = new int[VALUE_COUNT]; for (int i = 0; i < VALUE_COUNT; i++) { - ids[i] = i % MAX_ID; + sequential[i] = i % MAX_ID; } - break; + return sequential; case "RANDOM": - for (int i = 0; i < VALUE_COUNT; i++) { - ids[i] = random.nextInt(MAX_ID); - } - break; + return TestDataFactory.generateLowCardinalityInts(VALUE_COUNT, MAX_ID, TestDataFactory.DEFAULT_SEED); case "LOW_CARDINALITY": - for (int i = 0; i < VALUE_COUNT; i++) { - ids[i] = random.nextInt(TestDataFactory.LOW_CARDINALITY_DISTINCT); - } - break; + return TestDataFactory.generateLowCardinalityInts( + VALUE_COUNT, TestDataFactory.LOW_CARDINALITY_DISTINCT, TestDataFactory.DEFAULT_SEED); default: throw new IllegalArgumentException("Unknown index pattern: " + indexPattern); } - return ids; } @Benchmark @OperationsPerInvocation(VALUE_COUNT) public void decodeDictionaryIds(Blackhole bh) throws IOException { - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder( - BIT_WIDTH, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded))); + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(BIT_WIDTH, ByteBufferInputStream.wrap(ByteBuffer.wrap(encoded))); for (int i = 0; i < VALUE_COUNT; i++) { bh.consume(decoder.readInt()); } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java index bc00f3b070..13c5175d66 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/TestDataFactory.java @@ -44,6 +44,9 @@ public final class TestDataFactory { /** Number of distinct values for low-cardinality data patterns. */ public static final int LOW_CARDINALITY_DISTINCT = 100; + /** Default RNG seed used across benchmarks for deterministic data. */ + public static final long DEFAULT_SEED = 42L; + /** A standard multi-type schema used by file-level benchmarks. */ public static final MessageType FILE_BENCHMARK_SCHEMA = Types.buildMessage() .required(INT32) @@ -112,8 +115,18 @@ public static int[] generateSequentialInts(int count) { return data; } + /** + * Generates uniformly random integers using the given seed. + */ + public static int[] generateRandomInts(int count, long seed) { + return generateRandomInts(count, new Random(seed)); + } + /** * Generates uniformly random integers. + * + *

Note: prefer {@link #generateRandomInts(int, long)} when call ordering between + * generators in the same setup must not influence the produced data. */ public static int[] generateRandomInts(int count, Random random) { int[] data = new int[count]; @@ -123,6 +136,13 @@ public static int[] generateRandomInts(int count, Random random) { return data; } + /** + * Generates low-cardinality integers (values drawn from a small set) using the given seed. + */ + public static int[] generateLowCardinalityInts(int count, int distinctValues, long seed) { + return generateLowCardinalityInts(count, distinctValues, new Random(seed)); + } + /** * Generates low-cardinality integers (values drawn from a small set). */ @@ -134,6 +154,13 @@ public static int[] generateLowCardinalityInts(int count, int distinctValues, Ra return data; } + /** + * Generates high-cardinality integers (all unique in randomized order) using the given seed. + */ + public static int[] generateHighCardinalityInts(int count, long seed) { + return generateHighCardinalityInts(count, new Random(seed)); + } + /** * Generates high-cardinality integers (all unique in randomized order). */ @@ -150,6 +177,14 @@ public static int[] generateHighCardinalityInts(int count, Random random) { // ---- Binary data generation for encoding benchmarks ---- + /** + * Generates binary strings of the given length with the specified cardinality, using + * a deterministic seed. + */ + public static Binary[] generateBinaryData(int count, int stringLength, int distinct, long seed) { + return generateBinaryData(count, stringLength, distinct, new Random(seed)); + } + /** * Generates binary strings of the given length with the specified cardinality. *