diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java index ac63ff52ef..d614c960cc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter; @@ -46,11 +45,9 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter { private ValuesWriter lengthWriter; private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); lengthWriter = new DeltaBinaryPackingValuesWriterForInteger( DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES, DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS, @@ -63,7 +60,7 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBuffe public void writeBytes(Binary v) { try { lengthWriter.writeInteger(v.length()); - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } @@ -76,11 +73,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut)); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index dec4d1be1b..c170ad8e90 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -22,7 +22,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -37,7 +36,6 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesWriter.class); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; private int length; private ByteBufferAllocator allocator; @@ -46,7 +44,6 @@ public FixedLenByteArrayPlainValuesWriter( this.length = length; this.allocator = allocator; this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator); - this.out = new LittleEndianDataOutputStream(arrayOut); } @Override @@ -56,7 +53,7 @@ public final void writeBytes(Binary v) { "Fixed Binary size " + v.length() + " does not match field type length " + length); } try { - v.writeTo(out); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write fixed bytes", e); } @@ -69,11 +66,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java index c7069bc092..be5dfc0fa8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java @@ -23,7 +23,6 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.bytes.LittleEndianDataOutputStream; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetEncodingException; @@ -41,18 +40,16 @@ public class PlainValuesWriter extends ValuesWriter { public static final Charset CHARSET = Charset.forName("UTF-8"); private CapacityByteArrayOutputStream arrayOut; - private LittleEndianDataOutputStream out; public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); - out = new LittleEndianDataOutputStream(arrayOut); } @Override public final void writeBytes(Binary v) { try { - out.writeInt(v.length()); - v.writeTo(out); + arrayOut.writeInt(v.length()); + v.writeTo(arrayOut); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } @@ -60,47 +57,27 @@ public final void writeBytes(Binary v) { @Override public final void writeInteger(int v) { - try { - out.writeInt(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write int", e); - } + arrayOut.writeInt(v); } @Override public final void writeLong(long v) { - try { - out.writeLong(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write long", e); - } + arrayOut.writeLong(v); } @Override public final void writeFloat(float v) { - try { - out.writeFloat(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write float", e); - } + arrayOut.writeInt(Float.floatToIntBits(v)); } @Override public final void writeDouble(double v) { - try { - out.writeDouble(v); - } catch (IOException e) { - throw new ParquetEncodingException("could not write double", e); - } + arrayOut.writeLong(Double.doubleToLongBits(v)); } @Override public void writeByte(int value) { - try { - out.write(value); - } catch (IOException e) { - throw new ParquetEncodingException("could not write byte", e); - } + arrayOut.write(value); } @Override @@ -110,11 +87,6 @@ public long getBufferedSize() { @Override public BytesInput getBytes() { - try { - out.flush(); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page", e); - } if (LOG.isDebugEnabled()) LOG.debug("writing a buffer of size {}", arrayOut.size()); return BytesInput.from(arrayOut); } @@ -127,7 +99,6 @@ public void reset() { @Override public void close() { arrayOut.close(); - out.close(); } @Override diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index 84d3c5b7ba..c3ff89af25 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; import org.apache.parquet.OutputStreamCloseException; @@ -194,6 +195,7 @@ private void addSlab(int minimumSize) { LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize); this.currentSlab = allocator.allocate(nextSlabSize); + this.currentSlab.order(ByteOrder.LITTLE_ENDIAN); this.slabs.add(currentSlab); this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize); } @@ -225,6 +227,34 @@ public void write(byte b[], int off, int len) { bytesUsed = Math.addExact(bytesUsed, len); } + /** + * Writes an int in little-endian byte order directly to the underlying slab, + * bypassing intermediate byte array decomposition. Slabs are set to + * {@link ByteOrder#LITTLE_ENDIAN} order so {@code putInt} produces the correct encoding. + * + * @param v the int value to write + */ + public void writeInt(int v) { + if (currentSlab.remaining() < 4) { + addSlab(4); + } + currentSlab.putInt(v); + bytesUsed = Math.addExact(bytesUsed, 4); + } + + /** + * Writes a long in little-endian byte order directly to the underlying slab. + * + * @param v the long value to write + */ + public void writeLong(long v) { + if (currentSlab.remaining() < 8) { + addSlab(8); + } + currentSlab.putLong(v); + bytesUsed = Math.addExact(bytesUsed, 8); + } + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { if (buf.hasArray()) { out.write(buf.array(), buf.arrayOffset(), len); diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java index ef6c71bc86..d15751b14f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java @@ -24,8 +24,18 @@ import org.slf4j.LoggerFactory; /** - * Based on DataOutputStream but in little endian and without the String/char methods + * Based on DataOutputStream but in little endian and without the String/char methods. + * + * @deprecated As of release following the {@link CapacityByteArrayOutputStream#writeInt(int)} + * and {@link CapacityByteArrayOutputStream#writeLong(long)} additions, this class is no + * longer used by Parquet's own writers. Producers of PLAIN-encoded data should write + * little-endian values directly into a {@link java.nio.ByteBuffer} configured with + * {@link java.nio.ByteOrder#LITTLE_ENDIAN}, which compiles to a single intrinsic store on + * little-endian architectures and avoids the per-call byte decomposition and virtual + * dispatch performed here. This class is retained for binary compatibility and will be + * removed in a future release. */ +@Deprecated public class LittleEndianDataOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(LittleEndianDataOutputStream.class);