Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
Expand All @@ -46,11 +45,9 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {

private ValuesWriter lengthWriter;
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriterForInteger(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
Expand All @@ -63,7 +60,7 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBuffe
public void writeBytes(Binary v) {
try {
lengthWriter.writeInteger(v.length());
v.writeTo(out);
v.writeTo(arrayOut);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
Expand All @@ -76,11 +73,6 @@ public long getBufferedSize() {

@Override
public BytesInput getBytes() {
try {
out.flush();
} catch (IOException e) {
throw new ParquetEncodingException("could not write page", e);
}
LOG.debug("writing a buffer of size {}", arrayOut.size());
return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
Expand All @@ -37,7 +36,6 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesWriter.class);

private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;
private int length;
private ByteBufferAllocator allocator;

Expand All @@ -46,7 +44,6 @@ public FixedLenByteArrayPlainValuesWriter(
this.length = length;
this.allocator = allocator;
this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator);
this.out = new LittleEndianDataOutputStream(arrayOut);
}

@Override
Expand All @@ -56,7 +53,7 @@ public final void writeBytes(Binary v) {
"Fixed Binary size " + v.length() + " does not match field type length " + length);
}
try {
v.writeTo(out);
v.writeTo(arrayOut);
} catch (IOException e) {
throw new ParquetEncodingException("could not write fixed bytes", e);
}
Expand All @@ -69,11 +66,6 @@ public long getBufferedSize() {

@Override
public BytesInput getBytes() {
try {
out.flush();
} catch (IOException e) {
throw new ParquetEncodingException("could not write page", e);
}
LOG.debug("writing a buffer of size {}", arrayOut.size());
return BytesInput.from(arrayOut);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
Expand All @@ -41,66 +40,44 @@ public class PlainValuesWriter extends ValuesWriter {
public static final Charset CHARSET = Charset.forName("UTF-8");

private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
out = new LittleEndianDataOutputStream(arrayOut);
}

@Override
public final void writeBytes(Binary v) {
try {
out.writeInt(v.length());
v.writeTo(out);
arrayOut.writeInt(v.length());
v.writeTo(arrayOut);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
}

@Override
public final void writeInteger(int v) {
try {
out.writeInt(v);
} catch (IOException e) {
throw new ParquetEncodingException("could not write int", e);
}
arrayOut.writeInt(v);
}

@Override
public final void writeLong(long v) {
try {
out.writeLong(v);
} catch (IOException e) {
throw new ParquetEncodingException("could not write long", e);
}
arrayOut.writeLong(v);
}

@Override
public final void writeFloat(float v) {
try {
out.writeFloat(v);
} catch (IOException e) {
throw new ParquetEncodingException("could not write float", e);
}
arrayOut.writeInt(Float.floatToIntBits(v));
}

@Override
public final void writeDouble(double v) {
try {
out.writeDouble(v);
} catch (IOException e) {
throw new ParquetEncodingException("could not write double", e);
}
arrayOut.writeLong(Double.doubleToLongBits(v));
}

@Override
public void writeByte(int value) {
try {
out.write(value);
} catch (IOException e) {
throw new ParquetEncodingException("could not write byte", e);
}
arrayOut.write(value);
}

@Override
Expand All @@ -110,11 +87,6 @@ public long getBufferedSize() {

@Override
public BytesInput getBytes() {
try {
out.flush();
} catch (IOException e) {
throw new ParquetEncodingException("could not write page", e);
}
if (LOG.isDebugEnabled()) LOG.debug("writing a buffer of size {}", arrayOut.size());
return BytesInput.from(arrayOut);
}
Expand All @@ -127,7 +99,6 @@ public void reset() {
@Override
public void close() {
arrayOut.close();
out.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.OutputStreamCloseException;
Expand Down Expand Up @@ -194,6 +195,7 @@ private void addSlab(int minimumSize) {
LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);

this.currentSlab = allocator.allocate(nextSlabSize);
this.currentSlab.order(ByteOrder.LITTLE_ENDIAN);
this.slabs.add(currentSlab);
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
}
Expand Down Expand Up @@ -225,6 +227,34 @@ public void write(byte b[], int off, int len) {
bytesUsed = Math.addExact(bytesUsed, len);
}

/**
* Writes an int in little-endian byte order directly to the underlying slab,
* bypassing intermediate byte array decomposition. Slabs are set to
* {@link ByteOrder#LITTLE_ENDIAN} order so {@code putInt} produces the correct encoding.
*
* @param v the int value to write
*/
public void writeInt(int v) {
if (currentSlab.remaining() < 4) {
addSlab(4);
}
currentSlab.putInt(v);
bytesUsed = Math.addExact(bytesUsed, 4);
}

/**
* Writes a long in little-endian byte order directly to the underlying slab.
*
* @param v the long value to write
*/
public void writeLong(long v) {
if (currentSlab.remaining() < 8) {
addSlab(8);
}
currentSlab.putLong(v);
bytesUsed = Math.addExact(bytesUsed, 8);
}

private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
if (buf.hasArray()) {
out.write(buf.array(), buf.arrayOffset(), len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@
import org.slf4j.LoggerFactory;

/**
* Based on DataOutputStream but in little endian and without the String/char methods
* Based on DataOutputStream but in little endian and without the String/char methods.
*
* @deprecated As of release following the {@link CapacityByteArrayOutputStream#writeInt(int)}
* and {@link CapacityByteArrayOutputStream#writeLong(long)} additions, this class is no
* longer used by Parquet's own writers. Producers of PLAIN-encoded data should write
* little-endian values directly into a {@link java.nio.ByteBuffer} configured with
* {@link java.nio.ByteOrder#LITTLE_ENDIAN}, which compiles to a single intrinsic store on
* little-endian architectures and avoids the per-call byte decomposition and virtual
* dispatch performed here. This class is retained for binary compatibility and will be
* removed in a future release.
*/
@Deprecated
public class LittleEndianDataOutputStream extends OutputStream {

private static final Logger LOG = LoggerFactory.getLogger(LittleEndianDataOutputStream.class);
Expand Down