Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.common;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
Expand Down Expand Up @@ -133,6 +134,37 @@ default boolean isDictionaryEncoded() {
*/
byte[][] getBytesValuesSV();

/**
* Returns the BYTES values for a single-valued column as {@link ByteBuffer} views, avoiding the
* per-row {@code byte[]} allocation of {@link #getBytesValuesSV()}.
*
* <p><b>Stability:</b> the returned buffers are safe to hold across the block ONLY when
* {@link #isBytesViewStableAcrossReads()} returns {@code true}. The default implementation wraps
* the {@code byte[][]} from {@link #getBytesValuesSV()} (always stable), so it is safe but
* provides no allocation win. Implementations backed by a forward index override this to return
* zero-copy views when the underlying reader's views are stable.
*
* @return Array of {@link ByteBuffer} views, one per row
*/
default ByteBuffer[] getBytesValueViewsSV() {
byte[][] values = getBytesValuesSV();
ByteBuffer[] views = new ByteBuffer[values.length];
for (int i = 0; i < values.length; i++) {
views[i] = ByteBuffer.wrap(values[i]);
}
return views;
}

/**
* Returns {@code true} if the buffers from {@link #getBytesValueViewsSV()} remain valid across the
* whole block (i.e. a batched caller may materialize them into an array and consume them later).
* Defaults to {@code false}; forward-index-backed implementations answer from the underlying
* reader's {@code isBufferViewStableAcrossReads()}.
*/
default boolean isBytesViewStableAcrossReads() {
return false;
}

default int[] get32BitsMurmur3HashValuesSV() {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.common;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -242,6 +243,19 @@ public byte[][] getBytesValuesForSVColumn(String column) {
return bytesValues;
}

/**
* Get BYTES values for the given single-valued column as {@link ByteBuffer} views (zero-copy when
* the underlying reader supports it). Not cached: the {@code DataType}-keyed value map already
* holds the {@code byte[][]} under the BYTES key, and the per-call outer-array allocation is
* negligible against the per-row payload copies the view path saves (same rationale as the
* murmur-hash accessors below).
*/
public ByteBuffer[] getBytesValueViewsForSVColumn(String column) {
ByteBuffer[] views = new ByteBuffer[_length];
_dataFetcher.fetchBytesValueViews(column, _docIds, _length, views);
return views;
}

public int[] get32BitsMurmur3HashValuesForSVColumn(String column) {
// TODO: This is not cached
int[] hashValues = new int[_length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -189,6 +190,10 @@ public void fetchBytesValues(String column, int[] inDocIds, int length, byte[][]
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}

public void fetchBytesValueViews(String column, int[] inDocIds, int length, ByteBuffer[] outValues) {
_columnValueReaderMap.get(column).readBytesValueViews(inDocIds, length, outValues);
}

public void fetchMapValues(String column, int[] inDocIds, int length, Map[] outValues) {
_columnValueReaderMap.get(column).readMapValues(inDocIds, length, outValues);
}
Expand Down Expand Up @@ -439,6 +444,27 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
}
}

void readBytesValueViews(int[] docIds, int length, ByteBuffer[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
if (_dictionary != null) {
// Dictionary columns have no view path. This is never reached on the hot path because callers
// only request views when the reader reports isBufferViewStableAcrossReads() == true, which a
// dictionary reader never does. Wrap the dictionary byte[] defensively for correctness.
int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
_reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
byte[][] values = new byte[length][];
_dictionary.readBytesValues(dictIdBuffer, length, values);
for (int i = 0; i < length; i++) {
valueBuffer[i] = ByteBuffer.wrap(values[i]);
}
} else {
for (int i = 0; i < length; i++) {
valueBuffer[i] = _reader.getBytesView(docIds[i], readerContext);
}
}
}

void readMapValues(int[] docIds, int length, Map[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -1156,11 +1157,19 @@ public Sketch deserialize(byte[] bytes) {
return Sketch.wrap(Memory.wrap(bytes));
}

/**
* Wraps the buffer directly: {@link Sketch#wrap(Memory)} is zero-copy and retains a reference
* to the underlying memory. The caller is responsible for keeping the buffer's bytes alive for
* as long as the returned {@code Sketch} is in use. Broker reduce paths (which feed each
* deserialised sketch through a {@link Union} and discard the wrapper) satisfy this trivially.
*
* <p>Explicit {@link ByteOrder#LITTLE_ENDIAN} matches the implicit native order produced by
* {@link Memory#wrap(byte[])} on the LE platforms Pinot targets — and the on-disk layout of
* Datasketches-serialised theta sketches.
*/
@Override
public Sketch deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return Sketch.wrap(Memory.wrap(bytes));
return Sketch.wrap(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
}
};

Expand All @@ -1177,12 +1186,15 @@ public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] b
new IntegerSummaryDeserializer());
}

/**
* Wraps the buffer directly. {@code heapifySketch} materialises a heap-resident sketch
* during the call and does not retain a reference to the input memory, so this is a pure
* win — no lifetime concern.
*/
@Override
public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
new IntegerSummaryDeserializer());
return org.apache.datasketches.tuple.Sketches.heapifySketch(
Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
}
};

Expand Down Expand Up @@ -1217,11 +1229,14 @@ public CpcSketch deserialize(byte[] bytes) {
return CpcSketch.heapify(Memory.wrap(bytes));
}

/**
* Wraps the buffer directly. {@link CpcSketch#heapify} materialises a heap-resident sketch
* during the call and does not retain a reference to the input memory, so this is a pure win —
* no lifetime concern.
*/
@Override
public CpcSketch deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return CpcSketch.heapify(Memory.wrap(bytes));
return CpcSketch.heapify(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
}
};

Expand Down Expand Up @@ -1734,12 +1749,12 @@ public ThetaSketchAccumulator deserialize(byte[] bytes) {

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
// The wrapped Sketch retains the buffer's bytes; the accumulator transitively pins them
// until its first threshold-triggered union, after which the wrapper is released.
@Override
public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
ThetaSketchAccumulator thetaSketchAccumulator = new ThetaSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
Sketch sketch = Sketch.wrap(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
thetaSketchAccumulator.apply(sketch);
return thetaSketchAccumulator;
}
Expand All @@ -1761,14 +1776,13 @@ public TupleIntSketchAccumulator deserialize(byte[] bytes) {

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
// {@code heapifySketch} copies into heap, so the buffer is not retained beyond this call.
@Override
public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) {
TupleIntSketchAccumulator tupleIntSketchAccumulator = new TupleIntSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
org.apache.datasketches.tuple.Sketch<IntegerSummary> sketch =
org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
new IntegerSummaryDeserializer());
org.apache.datasketches.tuple.Sketches.heapifySketch(
Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
tupleIntSketchAccumulator.apply(sketch);
return tupleIntSketchAccumulator;
}
Expand All @@ -1790,12 +1804,11 @@ public CpcSketchAccumulator deserialize(byte[] bytes) {

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
// {@link CpcSketch#heapify} copies into heap, so the buffer is not retained beyond this call.
@Override
public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) {
CpcSketchAccumulator cpcSketchAccumulator = new CpcSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
CpcSketch sketch = CpcSketch.heapify(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
cpcSketchAccumulator.apply(sketch);
return cpcSketchAccumulator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.docvalsets;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.DataBlockCache;
Expand Down Expand Up @@ -181,6 +182,20 @@ public byte[][] getBytesValuesSV() {
}
}

@Override
public ByteBuffer[] getBytesValueViewsSV() {
try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
recordReadValues(scope, DataType.BYTES, true);
return _dataBlockCache.getBytesValueViewsForSVColumn(_column);
}
}

@Override
public boolean isBytesViewStableAcrossReads() {
ForwardIndexReader<?> forwardIndex = _dataSource.getForwardIndex();
return forwardIndex != null && forwardIndex.isBufferViewStableAcrossReads();
}

@Override
public int[] get32BitsMurmur3HashValuesSV() {
try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.core.query.aggregation.function;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -141,7 +143,8 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
// Treat BYTES value as serialized CPC Sketch
FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketchAccumulator cpcSketchAccumulator = getAccumulator(aggregationResultHolder);
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
Expand Down Expand Up @@ -212,7 +215,8 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
// Treat BYTES value as serialized CPC Sketch
DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == FieldSpec.DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
Expand Down Expand Up @@ -285,7 +289,8 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
boolean singleValue = blockValSet.isSingleValue();

if (singleValue && storedType == DataType.BYTES) {
byte[][] bytesValues = blockValSet.getBytesValuesSV();
Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
Expand Down Expand Up @@ -605,6 +610,27 @@ private CpcSketchAccumulator getAccumulator(GroupByResultHolder groupByResultHol
return accumulator;
}

/**
* Deserializes the sketches from the extracted value array, which is either a {@code byte[][]}
* (default path) or a {@code ByteBuffer[]} of zero-copy views (when the forward index reader's
* views are stable across the block — see {@link BlockValSet#getBytesValueViewsSV()}). Empty
* entries map to {@code null} (the default BYTES value); callers must handle nulls.
*/
private CpcSketch[] deserializeSketches(Object serializedSketches, int length) {
if (serializedSketches instanceof ByteBuffer[]) {
ByteBuffer[] views = (ByteBuffer[]) serializedSketches;
CpcSketch[] sketches = new CpcSketch[length];
for (int i = 0; i < length; i++) {
ByteBuffer buf = views[i];
// Explicit LITTLE_ENDIAN matches the byte[] path's Memory.wrap default. CpcSketch.heapify
// still copies into a heap sketch; the win is skipping the upstream byte[] alloc + copy.
sketches[i] = buf.remaining() > 0 ? CpcSketch.heapify(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN)) : null;
}
return sketches;
}
return deserializeSketches((byte[][]) serializedSketches, length);
}

/**
* Deserializes the sketches from the bytes. Returns null for empty byte arrays which represent
* the default null value for BYTES columns in Pinot. Callers must handle null entries.
Expand Down
Loading
Loading