From db126b967caa308031fa2f84defe5a9250a542d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 11:39:39 +0000 Subject: [PATCH 1/2] GH-3495: Optimize PlainValuesWriter with direct ByteBuffer slab writes PlainValuesWriter previously wrote values through a two-layer abstraction: PlainValuesWriter -> LittleEndianDataOutputStream -> CapacityByteArrayOutputStream. Each writeInt() decomposed the int into 4 bytes in a temp writeBuffer[8] array, then dispatched through the OutputStream chain. Since CapacityByteArrayOutputStream already uses ByteBuffer slabs internally, we can write directly to the slab with putInt()/putLong() using LITTLE_ENDIAN byte order -- a single JVM intrinsic on x86/ARM -- eliminating the byte decomposition, temp array, and virtual dispatch. Changes: - CapacityByteArrayOutputStream: set ByteOrder.LITTLE_ENDIAN on newly allocated slabs in addSlab(); add writeInt(int) and writeLong(long) methods that use currentSlab.putInt(v) / currentSlab.putLong(v) directly. - PlainValuesWriter: remove the LittleEndianDataOutputStream field; route writeInteger/writeLong/writeFloat/writeDouble/writeBytes through the underlying CapacityByteArrayOutputStream directly. writeFloat and writeDouble use Float.floatToIntBits / Double.doubleToLongBits + the new writeInt/writeLong methods. getBytes() no longer needs to flush a buffering layer; close() no longer closes the defunct stream. Benchmark (IntEncodingBenchmark.encodePlain, 100k INT32 values per invocation, JMH -wi 3 -i 5 -f 1): Pattern Before (ops/s) After (ops/s) Improvement SEQUENTIAL 26,817,451 52,953,193 +97.5% (2.0x) RANDOM 28,517,312 37,774,036 +32.5% LOW_CARDINALITY 28,705,158 52,819,678 +84.0% HIGH_CARDINALITY 28,595,519 37,862,571 +32.4% The same code path also benefits writeLong, writeFloat, writeDouble, and the length prefix written by writeBytes(Binary). --- .../values/plain/PlainValuesWriter.java | 43 +++---------------- .../bytes/CapacityByteArrayOutputStream.java | 30 +++++++++++++ 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java index c7069bc092..be5dfc0fa8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java @@ -23,7 +23,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -41,18 +40,16 @@ public class PlainValuesWriter extends ValuesWriter { public static final Charset CHARSET = Charset.forName("UTF-8"); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); } @Override public final void writeBytes(Binary v) { try { - out.writeInt(v.length()); - v.writeTo(out); + arrayOut.writeInt(v.length()); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } @@ -60,47 +57,27 @@ public final void writeBytes(Binary v) { @Override public final void writeInteger(int v) { - try { - out.writeInt(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write int", e); - } + arrayOut.writeInt(v); } @Override public final void writeLong(long v) { - try { - out.writeLong(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write long", e); - } + arrayOut.writeLong(v); } @Override public final void writeFloat(float v) { - try { - out.writeFloat(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write float", e); - } + arrayOut.writeInt(Float.floatToIntBits(v)); } @Override public final void writeDouble(double v) { - try { - out.writeDouble(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write double", e); - } + arrayOut.writeLong(Double.doubleToLongBits(v)); } @Override public void writeByte(int value) { - try { - out.write(value); - } catch (IOException e) { - throw new ParquetEncodingException("could not write byte", e); - } + arrayOut.write(value); } @Override @@ -110,11 +87,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } if (LOG.isDebugEnabled()) LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } @@ -127,7 +99,6 @@ public void reset() { @Override public void close() { arrayOut.close(); - out.close(); } @Override diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index 84d3c5b7ba..c3ff89af25 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import org.apache.parquet.OutputStreamCloseException; @@ -194,6 +195,7 @@ private void addSlab(int minimumSize) { LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize); this.currentSlab = allocator.allocate(nextSlabSize); + this.currentSlab.order(ByteOrder.LITTLE_ENDIAN); this.slabs.add(currentSlab); this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize); } @@ -225,6 +227,34 @@ public void write(byte b[], int off, int len) { bytesUsed = Math.addExact(bytesUsed, len); } + /** + * Writes an int in little-endian byte order directly to the underlying slab, + * bypassing intermediate byte array decomposition. Slabs are set to + * {@link ByteOrder#LITTLE_ENDIAN} order so {@code putInt} produces the correct encoding. + * + * @param v the int value to write + */ + public void writeInt(int v) { + if (currentSlab.remaining() < 4) { + addSlab(4); + } + currentSlab.putInt(v); + bytesUsed = Math.addExact(bytesUsed, 4); + } + + /** + * Writes a long in little-endian byte order directly to the underlying slab. + * + * @param v the long value to write + */ + public void writeLong(long v) { + if (currentSlab.remaining() < 8) { + addSlab(8); + } + currentSlab.putLong(v); + bytesUsed = Math.addExact(bytesUsed, 8); + } + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { if (buf.hasArray()) { out.write(buf.array(), buf.arrayOffset(), len); From 6964ccb9ddbb0b2b502cd370540354b7f22437d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 19 Apr 2026 12:18:26 +0000 Subject: [PATCH 2/2] GH-3495: Deprecate LittleEndianDataOutputStream and remove remaining wrapper usages This is an API cleanup commit with no measurable performance impact; it removes the last two production usages of LittleEndianDataOutputStream so the class can be deprecated. After the previous commit removed LittleEndianDataOutputStream from PlainValuesWriter, two production usages remained: - FixedLenByteArrayPlainValuesWriter wrapped its CapacityByteArrayOutputStream in a LittleEndianDataOutputStream solely to call Binary.writeTo(out) for the fixed-length payload. The fixed-length encoding has no length prefix and the wrapper exposed no LE-specific behavior used here -- Binary.writeTo() only invokes OutputStream.write(byte[], int, int), which the wrapper passes through unchanged. The wrapper has been removed and the writer now writes the binary payload directly to the underlying CapacityByteArrayOutputStream. The wrapper-specific flush() in getBytes() is also gone (CBOS does not buffer). - DeltaLengthByteArrayValuesWriter had the same pattern: a wrapper used only for v.writeTo(out) on the concatenated byte-array payload, with lengths written through a separate DeltaBinaryPackingValuesWriterForInteger. The wrapper has been removed for the same reasons. With no remaining production usages, LittleEndianDataOutputStream is marked @Deprecated. The class is retained for binary compatibility (it is part of the public parquet-common API) and will be removed in a future major release. The javadoc directs producers of PLAIN-encoded data to write little-endian values directly into a ByteBuffer with ByteOrder.LITTLE_ENDIAN, which compiles to a single intrinsic store on little-endian architectures and avoids the per-call byte decomposition and virtual dispatch performed by this class. --- .../DeltaLengthByteArrayValuesWriter.java | 10 +--------- .../plain/FixedLenByteArrayPlainValuesWriter.java | 10 +--------- .../parquet/bytes/LittleEndianDataOutputStream.java | 12 +++++++++++- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java index ac63ff52ef..d614c960cc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter; @@ -46,11 +45,9 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter { private ValuesWriter lengthWriter; private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); lengthWriter = new DeltaBinaryPackingValuesWriterForInteger( DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES, DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS, @@ -63,7 +60,7 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBuffe public void writeBytes(Binary v) { try { lengthWriter.writeInteger(v.length()); - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } @@ -76,11 +73,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut)); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index dec4d1be1b..c170ad8e90 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -37,7 +36,6 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesWriter.class); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; private int length; private ByteBufferAllocator allocator; @@ -46,7 +44,6 @@ public FixedLenByteArrayPlainValuesWriter( this.length = length; this.allocator = allocator; this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator); - this.out = new LittleEndianDataOutputStream(arrayOut); } @Override @@ -56,7 +53,7 @@ public final void writeBytes(Binary v) { "Fixed Binary size " + v.length() + " does not match field type length " + length); } try { - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write fixed bytes", e); } @@ -69,11 +66,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java index ef6c71bc86..d15751b14f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java @@ -24,8 +24,18 @@ import org.slf4j.LoggerFactory; /** - * Based on DataOutputStream but in little endian and without the String/char methods + * Based on DataOutputStream but in little endian and without the String/char methods. + * + * @deprecated As of release following the {@link CapacityByteArrayOutputStream#writeInt(int)} + * and {@link CapacityByteArrayOutputStream#writeLong(long)} additions, this class is no + * longer used by Parquet's own writers. Producers of PLAIN-encoded data should write + * little-endian values directly into a {@link java.nio.ByteBuffer} configured with + * {@link java.nio.ByteOrder#LITTLE_ENDIAN}, which compiles to a single intrinsic store on + * little-endian architectures and avoids the per-call byte decomposition and virtual + * dispatch performed here. This class is retained for binary compatibility and will be + * removed in a future release. */ +@Deprecated public class LittleEndianDataOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(LittleEndianDataOutputStream.class);