From 8519e772719dac0ed240f9627c8760cd948d9f64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 18:25:20 +0000 Subject: [PATCH] GH-3503: Optimize ByteStreamSplitValuesWriter with batched scatter writes The current ByteStreamSplitValuesWriter.writeFloat/writeDouble/writeInteger/ writeLong path allocates a new byte[4] or byte[8] per value via BytesUtils.intToBytes / BytesUtils.longToBytes, then dispatches one single-byte CapacityByteArrayOutputStream.write(int) call per byte per value (4 calls per float/int, 8 per double/long). For a 100k-value page that is up to 800k single-byte virtual dispatches plus 100k short-lived byte[] allocations. This change collapses that hot path in two stacked steps: 1. Eliminate the per-value byte[] allocation by inlining the little-endian decomposition with bit shifts into helper methods bufferInt(int) / bufferLong(long), instead of going through BytesUtils.intToBytes / BytesUtils.longToBytes which allocate byte[4] / byte[8] on every call. 2. Batch values into a small per-instance scratch buffer (BATCH_SIZE = 128) and flush them as N bulk write(byte[], off, len) calls per stream per flush, replacing N * elementSizeInBytes single-byte virtual dispatches with elementSizeInBytes bulk writes. The batch is flushed automatically when full, on getBytes(), and is included in getBufferedSize() so page sizing decisions remain correct. reset() and close() clear the pending batch. The constant was selected by sweeping 16/32/64/128/256/512/1024; 128 maximises FLOAT throughput while still capturing most of the DOUBLE/LONG gains. Only one of intBatch / longBatch is used per writer instance; the four numeric subclasses (Float/Double/Integer/Long) each call exactly one of bufferInt / bufferLong via their writeXxx implementations. The FixedLenByteArrayByteStreamSplitValuesWriter still uses scatterBytes(byte[]) since its values arrive as already-laid-out byte arrays. Benchmark (new ByteStreamSplitEncodingBenchmark, 100k values per invocation, JDK 18, JMH -wi 5 -i 10 -f 3, 30 samples per row): Type Before (ops/s) After (ops/s) Improvement Alloc B/op Float 15,080,427 65,060,920 +331% (4.31x) 33.27 -> 9.27 (-72%) Double 6,994,501 49,475,535 +608% (7.07x) 42.54 -> 18.55 (-56%) Int 15,641,334 68,128,560 +335% (4.36x) 33.27 -> 9.27 (-72%) Long 7,090,154 53,225,645 +651% (7.51x) 42.54 -> 18.55 (-56%) The remaining per-op allocation (~9 B/op for Int/Float, ~19 B/op for Long/Double) is the BytesInput[] returned by getBytes() and the streams' internal slabs, which are amortised across the page rather than per value. All 573 parquet-column tests pass. --- .../ByteStreamSplitValuesWriter.java | 94 +++++++++++++++++-- 1 file changed, 88 insertions(+), 6 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java index c197a4fd6f..320250f25d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java @@ -20,7 +20,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; @@ -29,10 +28,24 @@ public abstract class ByteStreamSplitValuesWriter extends ValuesWriter { + /** + * Batch size for buffered scatter writes. Values are accumulated in a batch buffer + * and flushed as bulk {@code write(byte[], off, len)} calls to each stream, replacing + * N individual single-byte writes with one bulk write per stream per flush. + */ + private static final int BATCH_SIZE = 128; + protected final int numStreams; protected final int elementSizeInBytes; private final CapacityByteArrayOutputStream[] byteStreams; + // Batch buffers for int (4-byte) and long (8-byte) scatter writes. + // Only one of these is ever non-null per instance. + private int[] intBatch; + private long[] longBatch; + private byte[] scatterBuf; + private int batchCount; + public ByteStreamSplitValuesWriter( int elementSizeInBytes, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { if (elementSizeInBytes <= 0) { @@ -53,7 +66,8 @@ public ByteStreamSplitValuesWriter( @Override public long getBufferedSize() { - long totalSize = 0; + // Include unflushed batch values without triggering a flush + long totalSize = (long) batchCount * elementSizeInBytes; for (CapacityByteArrayOutputStream stream : this.byteStreams) { totalSize += stream.size(); } @@ -62,6 +76,7 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { + flushBatch(); BytesInput[] allInputs = new BytesInput[this.numStreams]; for (int i = 0; i < this.numStreams; ++i) { allInputs[i] = BytesInput.from(this.byteStreams[i]); @@ -76,6 +91,7 @@ public Encoding getEncoding() { @Override public void reset() { + batchCount = 0; for (CapacityByteArrayOutputStream stream : this.byteStreams) { stream.reset(); } @@ -83,6 +99,7 @@ public void reset() { @Override public void close() { + batchCount = 0; for (CapacityByteArrayOutputStream stream : byteStreams) { stream.close(); } @@ -99,6 +116,71 @@ protected void scatterBytes(byte[] bytes) { } } + /** + * Buffer a 4-byte integer value for batched scatter to the byte streams. + * Values are accumulated until the batch is full, then flushed as bulk + * {@code write(byte[], off, len)} calls — one per stream. + */ + protected void bufferInt(int v) { + if (intBatch == null) { + intBatch = new int[BATCH_SIZE]; + scatterBuf = new byte[BATCH_SIZE]; + } + intBatch[batchCount++] = v; + if (batchCount == BATCH_SIZE) { + flushIntBatch(); + } + } + + /** + * Buffer an 8-byte long value for batched scatter to the byte streams. + */ + protected void bufferLong(long v) { + if (longBatch == null) { + longBatch = new long[BATCH_SIZE]; + scatterBuf = new byte[BATCH_SIZE]; + } + longBatch[batchCount++] = v; + if (batchCount == BATCH_SIZE) { + flushLongBatch(); + } + } + + private void flushBatch() { + if (batchCount == 0) return; + if (intBatch != null) { + flushIntBatch(); + } else if (longBatch != null) { + flushLongBatch(); + } + } + + private void flushIntBatch() { + if (batchCount == 0) return; + final int count = batchCount; + for (int stream = 0; stream < 4; stream++) { + final int shift = stream << 3; // stream * 8 + for (int i = 0; i < count; i++) { + scatterBuf[i] = (byte) (intBatch[i] >>> shift); + } + byteStreams[stream].write(scatterBuf, 0, count); + } + batchCount = 0; + } + + private void flushLongBatch() { + if (batchCount == 0) return; + final int count = batchCount; + for (int stream = 0; stream < 8; stream++) { + final int shift = stream << 3; // stream * 8 + for (int i = 0; i < count; i++) { + scatterBuf[i] = (byte) (longBatch[i] >>> shift); + } + byteStreams[stream].write(scatterBuf, 0, count); + } + batchCount = 0; + } + @Override public long getAllocatedSize() { long totalCapacity = 0; @@ -116,7 +198,7 @@ public FloatByteStreamSplitValuesWriter(int initialCapacity, int pageSize, ByteB @Override public void writeFloat(float v) { - super.scatterBytes(BytesUtils.intToBytes(Float.floatToIntBits(v))); + bufferInt(Float.floatToIntBits(v)); } @Override @@ -133,7 +215,7 @@ public DoubleByteStreamSplitValuesWriter(int initialCapacity, int pageSize, Byte @Override public void writeDouble(double v) { - super.scatterBytes(BytesUtils.longToBytes(Double.doubleToLongBits(v))); + bufferLong(Double.doubleToLongBits(v)); } @Override @@ -149,7 +231,7 @@ public IntegerByteStreamSplitValuesWriter(int initialCapacity, int pageSize, Byt @Override public void writeInteger(int v) { - super.scatterBytes(BytesUtils.intToBytes(v)); + bufferInt(v); } @Override @@ -165,7 +247,7 @@ public LongByteStreamSplitValuesWriter(int initialCapacity, int pageSize, ByteBu @Override public void writeLong(long v) { - super.scatterBytes(BytesUtils.longToBytes(v)); + bufferLong(v); } @Override