diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
index 728ac06cc71c..1aade97b9cc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.common;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -133,6 +134,37 @@ default boolean isDictionaryEncoded() {
*/
byte[][] getBytesValuesSV();
+ /**
+ * Returns the BYTES values for a single-valued column as {@link ByteBuffer} views, avoiding the
+ * per-row {@code byte[]} allocation of {@link #getBytesValuesSV()}.
+ *
+ *
Stability: the returned buffers are safe to hold across the block ONLY when
+ * {@link #isBytesViewStableAcrossReads()} returns {@code true}. The default implementation wraps
+ * the {@code byte[][]} from {@link #getBytesValuesSV()} (always stable), so it is safe but
+ * provides no allocation win. Implementations backed by a forward index override this to return
+ * zero-copy views when the underlying reader's views are stable.
+ *
+ * @return Array of {@link ByteBuffer} views, one per row
+ */
+ default ByteBuffer[] getBytesValueViewsSV() {
+ byte[][] values = getBytesValuesSV();
+ ByteBuffer[] views = new ByteBuffer[values.length];
+ for (int i = 0; i < values.length; i++) {
+ views[i] = ByteBuffer.wrap(values[i]);
+ }
+ return views;
+ }
+
+ /**
+ * Returns {@code true} if the buffers from {@link #getBytesValueViewsSV()} remain valid across the
+ * whole block (i.e. a batched caller may materialize them into an array and consume them later).
+ * Defaults to {@code false}; forward-index-backed implementations answer from the underlying
+ * reader's {@code isBufferViewStableAcrossReads()}.
+ */
+ default boolean isBytesViewStableAcrossReads() {
+ return false;
+ }
+
default int[] get32BitsMurmur3HashValuesSV() {
throw new UnsupportedOperationException();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index e902f316ee70..60678ae2cc28 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.common;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@@ -242,6 +243,19 @@ public byte[][] getBytesValuesForSVColumn(String column) {
return bytesValues;
}
+ /**
+ * Get BYTES values for the given single-valued column as {@link ByteBuffer} views (zero-copy when
+ * the underlying reader supports it). Not cached: the {@code DataType}-keyed value map already
+ * holds the {@code byte[][]} under the BYTES key, and the per-call outer-array allocation is
+ * negligible against the per-row payload copies the view path saves (same rationale as the
+ * murmur-hash accessors below).
+ */
+ public ByteBuffer[] getBytesValueViewsForSVColumn(String column) {
+ ByteBuffer[] views = new ByteBuffer[_length];
+ _dataFetcher.fetchBytesValueViews(column, _docIds, _length, views);
+ return views;
+ }
+
public int[] get32BitsMurmur3HashValuesForSVColumn(String column) {
// TODO: This is not cached
int[] hashValues = new int[_length];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 60dfc92d01c9..a569efa709b3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -189,6 +190,10 @@ public void fetchBytesValues(String column, int[] inDocIds, int length, byte[][]
_columnValueReaderMap.get(column).readBytesValues(inDocIds, length, outValues);
}
+ public void fetchBytesValueViews(String column, int[] inDocIds, int length, ByteBuffer[] outValues) {
+ _columnValueReaderMap.get(column).readBytesValueViews(inDocIds, length, outValues);
+ }
+
public void fetchMapValues(String column, int[] inDocIds, int length, Map[] outValues) {
_columnValueReaderMap.get(column).readMapValues(inDocIds, length, outValues);
}
@@ -439,6 +444,27 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
}
}
+ void readBytesValueViews(int[] docIds, int length, ByteBuffer[] valueBuffer) {
+ Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
+ ForwardIndexReaderContext readerContext = getReaderContext();
+ if (_dictionary != null) {
+ // Dictionary columns have no view path. This is never reached on the hot path because callers
+ // only request views when the reader reports isBufferViewStableAcrossReads() == true, which a
+ // dictionary reader never does. Wrap the dictionary byte[] defensively for correctness.
+ int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
+ _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
+ byte[][] values = new byte[length][];
+ _dictionary.readBytesValues(dictIdBuffer, length, values);
+ for (int i = 0; i < length; i++) {
+ valueBuffer[i] = ByteBuffer.wrap(values[i]);
+ }
+ } else {
+ for (int i = 0; i < length; i++) {
+ valueBuffer[i] = _reader.getBytesView(docIds[i], readerContext);
+ }
+ }
+ }
+
void readMapValues(int[] docIds, int length, Map[] valueBuffer) {
Tracing.activeRecording().setInputDataType(_storedType, _singleValue);
ForwardIndexReaderContext readerContext = getReaderContext();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index ed611ee68039..833b1c03574f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -58,6 +58,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -1156,11 +1157,19 @@ public Sketch deserialize(byte[] bytes) {
return Sketch.wrap(Memory.wrap(bytes));
}
+ /**
+ * Wraps the buffer directly: {@link Sketch#wrap(Memory)} is zero-copy and retains a reference
+ * to the underlying memory. The caller is responsible for keeping the buffer's bytes alive for
+ * as long as the returned {@code Sketch} is in use. Broker reduce paths (which feed each
+ * deserialised sketch through a {@link Union} and discard the wrapper) satisfy this trivially.
+ *
+ *
Explicit {@link ByteOrder#LITTLE_ENDIAN} matches the implicit native order produced by
+ * {@link Memory#wrap(byte[])} on the LE platforms Pinot targets — and the on-disk layout of
+ * Datasketches-serialised theta sketches.
+ */
@Override
public Sketch deserialize(ByteBuffer byteBuffer) {
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- return Sketch.wrap(Memory.wrap(bytes));
+ return Sketch.wrap(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
}
};
@@ -1177,12 +1186,15 @@ public org.apache.datasketches.tuple.Sketch deserialize(byte[] b
new IntegerSummaryDeserializer());
}
+ /**
+ * Wraps the buffer directly. {@code heapifySketch} materialises a heap-resident sketch
+ * during the call and does not retain a reference to the input memory, so this is a pure
+ * win — no lifetime concern.
+ */
@Override
public org.apache.datasketches.tuple.Sketch deserialize(ByteBuffer byteBuffer) {
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
- new IntegerSummaryDeserializer());
+ return org.apache.datasketches.tuple.Sketches.heapifySketch(
+ Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
}
};
@@ -1217,11 +1229,14 @@ public CpcSketch deserialize(byte[] bytes) {
return CpcSketch.heapify(Memory.wrap(bytes));
}
+ /**
+ * Wraps the buffer directly. {@link CpcSketch#heapify} materialises a heap-resident sketch
+ * during the call and does not retain a reference to the input memory, so this is a pure win —
+ * no lifetime concern.
+ */
@Override
public CpcSketch deserialize(ByteBuffer byteBuffer) {
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- return CpcSketch.heapify(Memory.wrap(bytes));
+ return CpcSketch.heapify(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
}
};
@@ -1734,12 +1749,12 @@ public ThetaSketchAccumulator deserialize(byte[] bytes) {
// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
+ // The wrapped Sketch retains the buffer's bytes; the accumulator transitively pins them
+ // until its first threshold-triggered union, after which the wrapper is released.
@Override
public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
ThetaSketchAccumulator thetaSketchAccumulator = new ThetaSketchAccumulator();
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
+ Sketch sketch = Sketch.wrap(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
thetaSketchAccumulator.apply(sketch);
return thetaSketchAccumulator;
}
@@ -1761,14 +1776,13 @@ public TupleIntSketchAccumulator deserialize(byte[] bytes) {
// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
+ // {@code heapifySketch} copies into heap, so the buffer is not retained beyond this call.
@Override
public TupleIntSketchAccumulator deserialize(ByteBuffer byteBuffer) {
TupleIntSketchAccumulator tupleIntSketchAccumulator = new TupleIntSketchAccumulator();
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
org.apache.datasketches.tuple.Sketch sketch =
- org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
- new IntegerSummaryDeserializer());
+ org.apache.datasketches.tuple.Sketches.heapifySketch(
+ Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
tupleIntSketchAccumulator.apply(sketch);
return tupleIntSketchAccumulator;
}
@@ -1790,12 +1804,11 @@ public CpcSketchAccumulator deserialize(byte[] bytes) {
// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
+ // {@link CpcSketch#heapify} copies into heap, so the buffer is not retained beyond this call.
@Override
public CpcSketchAccumulator deserialize(ByteBuffer byteBuffer) {
CpcSketchAccumulator cpcSketchAccumulator = new CpcSketchAccumulator();
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
+ CpcSketch sketch = CpcSketch.heapify(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN));
cpcSketchAccumulator.apply(sketch);
return cpcSketchAccumulator;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
index 5bd14e4565d5..6e025d0d1beb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.docvalsets;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.DataBlockCache;
@@ -181,6 +182,20 @@ public byte[][] getBytesValuesSV() {
}
}
+ @Override
+ public ByteBuffer[] getBytesValueViewsSV() {
+ try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
+ recordReadValues(scope, DataType.BYTES, true);
+ return _dataBlockCache.getBytesValueViewsForSVColumn(_column);
+ }
+ }
+
+ @Override
+ public boolean isBytesViewStableAcrossReads() {
+ ForwardIndexReader> forwardIndex = _dataSource.getForwardIndex();
+ return forwardIndex != null && forwardIndex.isBufferViewStableAcrossReads();
+ }
+
@Override
public int[] get32BitsMurmur3HashValuesSV() {
try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
index fd2b40395f87..742d54316e4d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -141,7 +143,8 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
// Treat BYTES value as serialized CPC Sketch
FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == DataType.BYTES) {
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketchAccumulator cpcSketchAccumulator = getAccumulator(aggregationResultHolder);
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
@@ -212,7 +215,8 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
// Treat BYTES value as serialized CPC Sketch
DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == FieldSpec.DataType.BYTES) {
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
@@ -285,7 +289,8 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
boolean singleValue = blockValSet.isSingleValue();
if (singleValue && storedType == DataType.BYTES) {
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
CpcSketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
@@ -605,6 +610,27 @@ private CpcSketchAccumulator getAccumulator(GroupByResultHolder groupByResultHol
return accumulator;
}
+ /**
+ * Deserializes the sketches from the extracted value array, which is either a {@code byte[][]}
+ * (default path) or a {@code ByteBuffer[]} of zero-copy views (when the forward index reader's
+ * views are stable across the block — see {@link BlockValSet#getBytesValueViewsSV()}). Empty
+ * entries map to {@code null} (the default BYTES value); callers must handle nulls.
+ */
+ private CpcSketch[] deserializeSketches(Object serializedSketches, int length) {
+ if (serializedSketches instanceof ByteBuffer[]) {
+ ByteBuffer[] views = (ByteBuffer[]) serializedSketches;
+ CpcSketch[] sketches = new CpcSketch[length];
+ for (int i = 0; i < length; i++) {
+ ByteBuffer buf = views[i];
+ // Explicit LITTLE_ENDIAN matches the byte[] path's Memory.wrap default. CpcSketch.heapify
+ // still copies into a heap sketch; the win is skipping the upstream byte[] alloc + copy.
+ sketches[i] = buf.remaining() > 0 ? CpcSketch.heapify(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN)) : null;
+ }
+ return sketches;
+ }
+ return deserializeSketches((byte[][]) serializedSketches, length);
+ }
+
/**
* Deserializes the sketches from the bytes. Returns null for empty byte arrays which represent
* the default null value for BYTES columns in Pinot. Callers must handle null entries.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 494406383106..d3509fb81e7b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -419,7 +420,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
} else {
// Serialized sketch
List thetaSketchAccumulators = getUnions(aggregationResultHolder);
- Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
+ Sketch[] sketches = deserializeSketches(valueArrays[0], length);
if (_includeDefaultSketch) {
ThetaSketchAccumulator defaultThetaAccumulator = thetaSketchAccumulators.get(0);
for (Sketch sketch : sketches) {
@@ -646,7 +647,7 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
}
} else {
// Serialized sketch
- Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
+ Sketch[] sketches = deserializeSketches(valueArrays[0], length);
for (int i = 0; i < length; i++) {
List thetaSketchAccumulators = getUnions(groupByResultHolder, groupKeyArray[i]);
Sketch sketch = sketches[i];
@@ -920,7 +921,7 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
}
} else {
// Serialized sketch
- Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
+ Sketch[] sketches = deserializeSketches(valueArrays[0], length);
if (_includeDefaultSketch) {
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
@@ -1271,7 +1272,11 @@ private void extractValues(Map blockValSetMap, b
valueArrays[i] = blockValSet.getStringValuesSV();
break;
case BYTES:
- valueArrays[i] = blockValSet.getBytesValuesSV();
+ // Zero-copy view path only for the plain form (no filter evaluators re-reading the
+ // value array per row) and only when the reader's views survive the block read.
+ valueArrays[i] = (_filterEvaluators.isEmpty() && blockValSet.isBytesViewStableAcrossReads())
+ ? blockValSet.getBytesValueViewsSV()
+ : blockValSet.getBytesValuesSV();
break;
default:
throw new IllegalStateException();
@@ -1374,6 +1379,25 @@ private List buildUnions() {
return unions;
}
+ /**
+ * Deserializes the sketches from the extracted value array, which is either a {@code byte[][]}
+ * (default path) or a {@code ByteBuffer[]} of zero-copy views (when the forward index reader's
+ * views are stable across the block — see {@link BlockValSet#getBytesValueViewsSV()}).
+ */
+ private Sketch[] deserializeSketches(Object serializedSketches, int length) {
+ if (serializedSketches instanceof ByteBuffer[]) {
+ ByteBuffer[] views = (ByteBuffer[]) serializedSketches;
+ Sketch[] sketches = new Sketch[length];
+ for (int i = 0; i < length; i++) {
+ // Explicit LITTLE_ENDIAN matches the byte[] path's Memory.wrap(byte[]) default and the
+ // serialized sketch format, regardless of the source buffer's own byte order.
+ sketches[i] = Sketch.wrap(Memory.wrap(views[i], ByteOrder.LITTLE_ENDIAN));
+ }
+ return sketches;
+ }
+ return deserializeSketches((byte[][]) serializedSketches, length);
+ }
+
/**
* Deserializes the sketches from the bytes.
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
index 679933c5ed1b..a1697bc5d7ca 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@@ -162,7 +164,8 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
// Treat BYTES value as serialized Integer Tuple Sketch
FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == FieldSpec.DataType.BYTES) {
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
TupleIntSketchAccumulator tupleIntSketchAccumulator = getAccumulator(aggregationResultHolder);
Sketch[] sketches = deserializeSketches(bytesValues, length);
@@ -187,7 +190,8 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
if (storedType == FieldSpec.DataType.BYTES) {
- byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
Sketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
@@ -214,7 +218,8 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
boolean singleValue = blockValSet.isSingleValue();
if (singleValue && storedType == FieldSpec.DataType.BYTES) {
- byte[][] bytesValues = blockValSetMap.get(_expression).getBytesValuesSV();
+ Object bytesValues = blockValSet.isBytesViewStableAcrossReads()
+ ? blockValSet.getBytesValueViewsSV() : blockValSet.getBytesValuesSV();
try {
Sketch[] sketches = deserializeSketches(bytesValues, length);
for (int i = 0; i < length; i++) {
@@ -338,6 +343,27 @@ private TupleIntSketchAccumulator getAccumulator(GroupByResultHolder groupByResu
return accumulator;
}
+ /**
+ * Deserializes the sketches from the extracted value array, which is either a {@code byte[][]}
+ * (default path) or a {@code ByteBuffer[]} of zero-copy views (when the forward index reader's
+ * views are stable across the block — see {@link BlockValSet#getBytesValueViewsSV()}).
+ */
+ @SuppressWarnings({"unchecked"})
+ private Sketch[] deserializeSketches(Object serializedSketches, int length) {
+ if (serializedSketches instanceof ByteBuffer[]) {
+ ByteBuffer[] views = (ByteBuffer[]) serializedSketches;
+ Sketch[] sketches = new Sketch[length];
+ for (int i = 0; i < length; i++) {
+ // Explicit LITTLE_ENDIAN matches the byte[] path's Memory.wrap default. heapifySketch still
+ // copies into a heap sketch; the win is skipping the upstream byte[] alloc + copy.
+ sketches[i] =
+ Sketches.heapifySketch(Memory.wrap(views[i], ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
+ }
+ return sketches;
+ }
+ return deserializeSketches((byte[][]) serializedSketches, length);
+ }
+
/**
* Deserializes the sketches from the bytes.
*/
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsBufferParityTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsBufferParityTest.java
new file mode 100644
index 000000000000..61fbf5b57ee3
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsBufferParityTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import java.nio.ByteBuffer;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.segment.local.customobject.CpcSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
+import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Asserts that {@code deserialize(ByteBuffer)} on the sketch serdes returns a sketch equivalent to
+ * {@code deserialize(byte[])}, before and after the commit that switches the buffer path from
+ * "allocate byte[] + memcpy + Memory.wrap(bytes)" to "Memory.wrap(buffer, LITTLE_ENDIAN)".
+ *
+ * Each test also exercises a buffer whose position is non-zero in the underlying array (an inner
+ * slice), reproducing the layout produced by
+ * {@code DataTableImplV4.getCustomObject} on the broker reduce path.
+ */
+public class ObjectSerDeUtilsBufferParityTest {
+ private static final int DISTINCT_VALUES = 10_000;
+
+ @Test
+ public void thetaSketchByteBufferParity() {
+ Sketch original = buildThetaSketch();
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(original);
+
+ Sketch fromBytes = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(bytes);
+ Sketch fromBuffer = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ Sketch fromInnerSlice = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getEstimate(), fromBytes.getEstimate());
+ assertEquals(fromInnerSlice.getEstimate(), fromBytes.getEstimate());
+ }
+
+ @Test
+ public void tupleSketchByteBufferParity() {
+ IntegerSketch original = new IntegerSketch(12, IntegerSummary.Mode.Sum);
+ for (int i = 0; i < DISTINCT_VALUES; i++) {
+ original.update(i, 1);
+ }
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(original.compact());
+
+ org.apache.datasketches.tuple.Sketch fromBytes =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes);
+ org.apache.datasketches.tuple.Sketch fromBuffer =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ org.apache.datasketches.tuple.Sketch fromInnerSlice =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getEstimate(), fromBytes.getEstimate());
+ assertEquals(fromInnerSlice.getEstimate(), fromBytes.getEstimate());
+ }
+
+ @Test
+ public void cpcSketchByteBufferParity() {
+ CpcSketch original = new CpcSketch();
+ for (int i = 0; i < DISTINCT_VALUES; i++) {
+ original.update(i);
+ }
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(original);
+
+ CpcSketch fromBytes = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytes);
+ CpcSketch fromBuffer = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ CpcSketch fromInnerSlice = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getEstimate(), fromBytes.getEstimate());
+ assertEquals(fromInnerSlice.getEstimate(), fromBytes.getEstimate());
+ }
+
+ @Test
+ public void thetaSketchAccumulatorByteBufferParity() {
+ Sketch original = buildThetaSketch();
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(original);
+
+ ThetaSketchAccumulator fromBytes = ObjectSerDeUtils.DATA_SKETCH_THETA_ACCUMULATOR_SER_DE.deserialize(bytes);
+ ThetaSketchAccumulator fromBuffer =
+ ObjectSerDeUtils.DATA_SKETCH_THETA_ACCUMULATOR_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ ThetaSketchAccumulator fromInnerSlice =
+ ObjectSerDeUtils.DATA_SKETCH_THETA_ACCUMULATOR_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ assertEquals(fromInnerSlice.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ }
+
+ @Test
+ public void tupleSketchAccumulatorByteBufferParity() {
+ IntegerSketch original = new IntegerSketch(12, IntegerSummary.Mode.Sum);
+ for (int i = 0; i < DISTINCT_VALUES; i++) {
+ original.update(i, 1);
+ }
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(original.compact());
+
+ TupleIntSketchAccumulator fromBytes = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE.deserialize(bytes);
+ TupleIntSketchAccumulator fromBuffer =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ TupleIntSketchAccumulator fromInnerSlice =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_ACCUMULATOR_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ assertEquals(fromInnerSlice.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ }
+
+ @Test
+ public void cpcSketchAccumulatorByteBufferParity() {
+ CpcSketch original = new CpcSketch();
+ for (int i = 0; i < DISTINCT_VALUES; i++) {
+ original.update(i);
+ }
+ byte[] bytes = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(original);
+
+ CpcSketchAccumulator fromBytes = ObjectSerDeUtils.DATA_SKETCH_CPC_ACCUMULATOR_SER_DE.deserialize(bytes);
+ CpcSketchAccumulator fromBuffer =
+ ObjectSerDeUtils.DATA_SKETCH_CPC_ACCUMULATOR_SER_DE.deserialize(ByteBuffer.wrap(bytes));
+ CpcSketchAccumulator fromInnerSlice =
+ ObjectSerDeUtils.DATA_SKETCH_CPC_ACCUMULATOR_SER_DE.deserialize(innerSlice(bytes));
+
+ assertEquals(fromBuffer.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ assertEquals(fromInnerSlice.getResult().getEstimate(), fromBytes.getResult().getEstimate());
+ }
+
+ private static Sketch buildThetaSketch() {
+ UpdateSketch sketch = new UpdateSketchBuilder().build();
+ for (int i = 0; i < DISTINCT_VALUES; i++) {
+ sketch.update(i);
+ }
+ return sketch.compact();
+ }
+
+ /**
+ * Returns a buffer whose {@code position()} is non-zero in the backing array. This mirrors the
+ * layout {@code DataTableImplV4.getCustomObject} produces, which is the actual production input
+ * to the broker reduce path: {@code _variableSizeData.slice()} after the outer buffer has been
+ * advanced past previous columns.
+ */
+ private static ByteBuffer innerSlice(byte[] bytes) {
+ int padding = 7;
+ byte[] padded = new byte[padding + bytes.length + padding];
+ System.arraycopy(bytes, 0, padded, padding, bytes.length);
+ ByteBuffer outer = ByteBuffer.wrap(padded);
+ outer.position(padding);
+ ByteBuffer inner = outer.slice();
+ inner.limit(bytes.length);
+ return inner;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SketchViewPathParityTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SketchViewPathParityTest.java
new file mode 100644
index 000000000000..26c8e1585a9c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SketchViewPathParityTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Proves the zero-copy {@code ByteBuffer} view read path produces identical sketch-aggregation
+ * results to the {@code byte[]} read path. The two paths are selected by the forward-index
+ * compression codec: a PASS_THROUGH BYTES column reports
+ * {@link org.apache.pinot.segment.spi.index.reader.ForwardIndexReader#isBufferViewStableAcrossReads()}
+ * true (view path), while an LZ4 column reports false (byte[] fallback). The same serialized
+ * theta / CPC / Integer-Tuple sketches are written into both a PASS_THROUGH segment and an LZ4
+ * segment; the per-segment aggregation result must match exactly.
+ */
+public class SketchViewPathParityTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "SketchViewPathParityTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final int NUM_RECORDS = 1000;
+ private static final int DISTINCTS_PER_ROW = 3;
+
+ private static final String THETA_COLUMN = "thetaColumn";
+ private static final String CPC_COLUMN = "cpcColumn";
+ private static final String TUPLE_COLUMN = "tupleColumn";
+
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addMetric(THETA_COLUMN, DataType.BYTES)
+ .addMetric(CPC_COLUMN, DataType.BYTES)
+ .addMetric(TUPLE_COLUMN, DataType.BYTES)
+ .build();
+
+ private ImmutableSegment _passThroughSegment;
+ private ImmutableSegment _lz4Segment;
+
+ // The segment the BaseQueriesTest helpers run against; swapped per assertion.
+ private IndexSegment _currentSegment;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _currentSegment;
+ }
+
+ @Override
+ protected List getIndexSegments() {
+ return List.of(_currentSegment);
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ UpdateSketchBuilder thetaBuilder = new UpdateSketchBuilder();
+
+ List records = new ArrayList<>(NUM_RECORDS);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ GenericRow record = new GenericRow();
+ int base = i * DISTINCTS_PER_ROW;
+
+ UpdateSketch theta = thetaBuilder.build();
+ CpcSketch cpc = new CpcSketch();
+ IntegerSketch tuple = new IntegerSketch(12, IntegerSummary.Mode.Sum);
+ for (int d = 0; d < DISTINCTS_PER_ROW; d++) {
+ theta.update(base + d);
+ cpc.update(base + d);
+ tuple.update(Integer.toString(base + d), 1);
+ }
+ record.putValue(THETA_COLUMN, theta.compact().toByteArray());
+ record.putValue(CPC_COLUMN, cpc.toByteArray());
+ record.putValue(TUPLE_COLUMN, tuple.compact().toByteArray());
+ records.add(record);
+ }
+
+ _passThroughSegment = buildSegment("passThrough", FieldConfig.CompressionCodec.PASS_THROUGH, records);
+ _lz4Segment = buildSegment("lz4", FieldConfig.CompressionCodec.LZ4, records);
+ }
+
+ private ImmutableSegment buildSegment(String segmentName, FieldConfig.CompressionCodec codec,
+ List records)
+ throws Exception {
+ List fieldConfigs = new ArrayList<>();
+ for (String column : List.of(THETA_COLUMN, CPC_COLUMN, TUPLE_COLUMN)) {
+ fieldConfigs.add(new FieldConfig(column, FieldConfig.EncodingType.RAW, List.of(), codec, null));
+ }
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNoDictionaryColumns(List.of(THETA_COLUMN, CPC_COLUMN, TUPLE_COLUMN))
+ .setFieldConfigList(fieldConfigs)
+ .build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, SCHEMA);
+ config.setTableName(RAW_TABLE_NAME);
+ config.setSegmentName(segmentName);
+ config.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(records));
+ driver.build();
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+ }
+
+ private long distinctCount(IndexSegment segment, String query) {
+ _currentSegment = segment;
+ BrokerResponseNative response = getBrokerResponse(query);
+ Object value = response.getResultTable().getRows().get(0)[0];
+ return ((Number) value).longValue();
+ }
+
+ @Test
+ public void thetaViewPathMatchesByteArrayPath() {
+ String query = "SELECT DISTINCT_COUNT_THETA_SKETCH(" + THETA_COLUMN + ") FROM " + RAW_TABLE_NAME;
+ long viaView = distinctCount(_passThroughSegment, query);
+ long viaArray = distinctCount(_lz4Segment, query);
+ assertEquals(viaView, viaArray);
+ assertTrue(viaView > 0, "expected a non-trivial distinct count");
+ }
+
+ @Test
+ public void cpcViewPathMatchesByteArrayPath() {
+ String query = "SELECT DISTINCT_COUNT_CPC_SKETCH(" + CPC_COLUMN + ") FROM " + RAW_TABLE_NAME;
+ long viaView = distinctCount(_passThroughSegment, query);
+ long viaArray = distinctCount(_lz4Segment, query);
+ assertEquals(viaView, viaArray);
+ assertTrue(viaView > 0, "expected a non-trivial distinct count");
+ }
+
+ @Test
+ public void tupleViewPathMatchesByteArrayPath() {
+ String query = "SELECT DISTINCT_COUNT_TUPLE_SKETCH(" + TUPLE_COLUMN + ") FROM " + RAW_TABLE_NAME;
+ long viaView = distinctCount(_passThroughSegment, query);
+ long viaArray = distinctCount(_lz4Segment, query);
+ assertEquals(viaView, viaArray);
+ assertTrue(viaView > 0, "expected a non-trivial distinct count");
+ }
+
+ @Test
+ public void passThroughReportsStableViewsAndLz4DoesNot() {
+ assertTrue(_passThroughSegment.getDataSource(THETA_COLUMN).getForwardIndex().isBufferViewStableAcrossReads(),
+ "PASS_THROUGH forward index should report stable views (view path)");
+ assertTrue(!_lz4Segment.getDataSource(THETA_COLUMN).getForwardIndex().isBufferViewStableAcrossReads(),
+ "LZ4 forward index should report unstable views (byte[] fallback)");
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ if (_passThroughSegment != null) {
+ _passThroughSegment.destroy();
+ }
+ if (_lz4Segment != null) {
+ _lz4Segment.destroy();
+ }
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index d19d792466ef..f875bd663b18 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -95,7 +95,7 @@ public static abstract class BaseState {
@Param({"UNIFORM(1000,10000)", "EXP(0.001)"})
String _distribution;
- @Param({"SNAPPY", "LZ4", "ZSTANDARD"})
+ @Param({"PASS_THROUGH", "SNAPPY", "LZ4", "ZSTANDARD"})
ChunkCompressionType _chunkCompressionType;
@Param("1048576")
@@ -193,6 +193,24 @@ public void readV4(V4State state, Blackhole bh)
}
}
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ public void readV4View(V4State state, Blackhole bh)
+ throws IOException {
+ try (PinotDataBuffer buffer = PinotDataBuffer.loadBigEndianFile(state._file);
+ VarByteChunkForwardIndexReaderV4 reader =
+ new VarByteChunkForwardIndexReaderV4(buffer, FieldSpec.DataType.BYTES, true);
+ VarByteChunkForwardIndexReaderV4.ReaderContext context = reader.createContext()) {
+ for (int i = 0; i < state._records; i++) {
+ java.nio.ByteBuffer view = reader.getBytesView(i, context);
+ // Consume the view's contents before the next read invalidates it. Touching remaining()
+ // is enough to ensure the slice is materialised; we also consume the buffer itself.
+ bh.consume(view.remaining());
+ bh.consume(view);
+ }
+ }
+ }
+
@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void readV3(V3State state, Blackhole bh)
@@ -206,4 +224,20 @@ public void readV3(V3State state, Blackhole bh)
}
}
}
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ public void readV3View(V3State state, Blackhole bh)
+ throws IOException {
+ try (PinotDataBuffer buffer = PinotDataBuffer.loadBigEndianFile(state._file);
+ VarByteChunkSVForwardIndexReader reader =
+ new VarByteChunkSVForwardIndexReader(buffer, FieldSpec.DataType.BYTES);
+ ChunkReaderContext context = reader.createContext()) {
+ for (int i = 0; i < state._records; i++) {
+ java.nio.ByteBuffer view = reader.getBytesView(i, context);
+ bh.consume(view.remaining());
+ bh.consume(view);
+ }
+ }
+ }
}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/aggregation/BenchmarkValueAggregatorBufferApi.java b/pinot-perf/src/main/java/org/apache/pinot/perf/aggregation/BenchmarkValueAggregatorBufferApi.java
new file mode 100644
index 000000000000..d4453cf18f5c
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/aggregation/BenchmarkValueAggregatorBufferApi.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf.aggregation;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.segment.local.aggregator.DistinctCountCPCSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.DistinctCountThetaSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.IntegerTupleSketchValueAggregator;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+
+/**
+ * No-regression / parity check for the byte[] vs ByteBuffer entry points of the sketch
+ * {@link org.apache.pinot.segment.local.aggregator.ValueAggregator} family. Confirms that
+ * {@code applyRawValueFromBuffer} is not measurably slower than {@code applyRawValue(byte[])}.
+ *
+ * This benchmark does NOT, and cannot, demonstrate the allocation win. Two reasons:
+ *
+ * - The byte[] arm passes a pre-built {@code byte[]} from trial setup, so it does no per-row
+ * allocation here — the allocation that the ByteBuffer path eliminates in production never
+ * happens in this harness. The win is at the forward-index read level (where
+ * {@code getBytes} allocates a fresh array per row but {@code getBytesView} returns a
+ * view), not at this aggregator boundary.
+ * - The {@code Union.union} / {@code heapify} work dominates per-row time and is identical
+ * on both arms, so any allocation delta is in the noise. For theta specifically, the byte[]
+ * path already deserializes zero-copy via {@code Sketch.wrap(Memory.wrap(bytes))}, so the
+ * only thing the buffer path saves is the array allocation itself.
+ *
+ *
+ * The end-to-end win (read path + star-tree wiring + this aggregator) is measured separately:
+ * {@code BenchmarkRawForwardIndexReader} for the read path, and a star-tree-build benchmark for
+ * the composed path once the {@code BaseSingleTreeBuilder} wiring is in place.
+ *
+ *
The benchmark unions {@code numSketches} pre-serialized sketches into a single accumulator
+ * per invocation. The serialized sketch bytes are prepared once at trial setup.
+ */
+@Fork(1)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Warmup(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@State(Scope.Benchmark)
+public class BenchmarkValueAggregatorBufferApi {
+
+ @Param({"100", "1000", "10000"})
+ private int _numSketches;
+
+ // Sketch hash cardinality per sketch. Kept small relative to nominal entries so sketches are
+ // in exact mode — the wrap vs heapify cost asymmetry shows clearest before the hashtable spills.
+ private static final int _ENTRIES_PER_SKETCH = 1000;
+
+ // Theta nominal entries (default Helix is 4096 = 2^12).
+ private static final int _THETA_NOMINAL_ENTRIES = 4096;
+
+ // CPC lgK (default Helix is 12 → K = 4096).
+ private static final int _CPC_LG_K = 12;
+
+ // Tuple sketch nominal entries (lgK = 4 → 16 entries; matches existing test fixture).
+ private static final int _TUPLE_NOMINAL_ENTRIES = 16;
+
+ private byte[][] _thetaBytes;
+ private ByteBuffer[] _thetaBuffers;
+ private byte[][] _cpcBytes;
+ private ByteBuffer[] _cpcBuffers;
+ private byte[][] _tupleBytes;
+ private ByteBuffer[] _tupleBuffers;
+
+ private DistinctCountThetaSketchValueAggregator _thetaAgg;
+ private DistinctCountCPCSketchValueAggregator _cpcAgg;
+ private IntegerTupleSketchValueAggregator _tupleAgg;
+
+ @Setup(Level.Trial)
+ public void setUp() {
+ _thetaAgg = new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
+ _cpcAgg = new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ _tupleAgg = new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
+
+ _thetaBytes = new byte[_numSketches][];
+ _thetaBuffers = new ByteBuffer[_numSketches];
+ _cpcBytes = new byte[_numSketches][];
+ _cpcBuffers = new ByteBuffer[_numSketches];
+ _tupleBytes = new byte[_numSketches][];
+ _tupleBuffers = new ByteBuffer[_numSketches];
+
+ for (int s = 0; s < _numSketches; s++) {
+ int baseHash = s * _ENTRIES_PER_SKETCH;
+
+ UpdateSketch theta = Sketches.updateSketchBuilder().setNominalEntries(_THETA_NOMINAL_ENTRIES).build();
+ IntStream.range(baseHash, baseHash + _ENTRIES_PER_SKETCH).forEach(theta::update);
+ _thetaBytes[s] = _thetaAgg.serializeAggregatedValue(theta.compact());
+ _thetaBuffers[s] = ByteBuffer.wrap(_thetaBytes[s]);
+
+ CpcSketch cpc = new CpcSketch(_CPC_LG_K);
+ IntStream.range(baseHash, baseHash + _ENTRIES_PER_SKETCH).forEach(cpc::update);
+ _cpcBytes[s] = _cpcAgg.serializeAggregatedValue(cpc);
+ _cpcBuffers[s] = ByteBuffer.wrap(_cpcBytes[s]);
+
+ IntegerSketch tuple = new IntegerSketch(_TUPLE_NOMINAL_ENTRIES, IntegerSummary.Mode.Sum);
+ for (int v = baseHash; v < baseHash + _ENTRIES_PER_SKETCH; v++) {
+ tuple.update(Integer.toString(v), 1);
+ }
+ _tupleBytes[s] = _tupleAgg.serializeAggregatedValue(tuple);
+ _tupleBuffers[s] = ByteBuffer.wrap(_tupleBytes[s]);
+ }
+ }
+
+ // -- Theta ---------------------------------------------------------------------------------
+
+ @Benchmark
+ public Object thetaApplyRawByteArray() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ acc = _thetaAgg.applyRawValue(acc, _thetaBytes[i]);
+ }
+ return acc;
+ }
+
+ @Benchmark
+ public Object thetaApplyRawByteBuffer() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ // duplicate() resets position/limit each call. ByteBuffer state is otherwise mutated by
+ // the union path (Memory.wrap honours the buffer's position cursor on read).
+ acc = _thetaAgg.applyRawValueFromBuffer(acc, _thetaBuffers[i].duplicate());
+ }
+ return acc;
+ }
+
+ // -- CPC -----------------------------------------------------------------------------------
+
+ @Benchmark
+ public Object cpcApplyRawByteArray() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ acc = _cpcAgg.applyRawValue(acc, _cpcBytes[i]);
+ }
+ return acc;
+ }
+
+ @Benchmark
+ public Object cpcApplyRawByteBuffer() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ acc = _cpcAgg.applyRawValueFromBuffer(acc, _cpcBuffers[i].duplicate());
+ }
+ return acc;
+ }
+
+ // -- Tuple ---------------------------------------------------------------------------------
+
+ @Benchmark
+ public Object tupleApplyRawByteArray() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ acc = _tupleAgg.applyRawValue(acc, _tupleBytes[i]);
+ }
+ return acc;
+ }
+
+ @Benchmark
+ public Object tupleApplyRawByteBuffer() {
+ Object acc = null;
+ for (int i = 0; i < _numSketches; i++) {
+ acc = _tupleAgg.applyRawValueFromBuffer(acc, _tupleBuffers[i].duplicate());
+ }
+ return acc;
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
index 6c2374d5a8d9..599e930c257e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
@@ -19,10 +19,13 @@
package org.apache.pinot.segment.local.aggregator;
import com.google.common.annotations.VisibleForTesting;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.datasketches.memory.Memory;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -105,6 +108,27 @@ public Object applyAggregatedValue(Object value, Object aggregatedValue) {
return cpcUnion;
}
+ @Override
+ public Object applyRawValueFromBuffer(Object aggregatedValue, ByteBuffer buf) {
+ // CpcSketch.heapify materialises a heap-resident sketch; CPC has no zero-copy wrap API.
+ // The saving relative to applyRawValue(byte[]) is skipping the intermediate byte[] allocation
+ // and buf.get(bytes) copy at the call site — heapify itself still allocates internal state.
+ CpcUnion cpcUnion = extractUnion(aggregatedValue);
+ if (buf.remaining() > 0) {
+ cpcUnion.update(CpcSketch.heapify(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN)));
+ }
+ return cpcUnion;
+ }
+
+ @Override
+ public Object applyAggregatedValueFromBuffer(Object value, ByteBuffer buf) {
+ CpcUnion cpcUnion = extractUnion(value);
+ if (buf.remaining() > 0) {
+ cpcUnion.update(CpcSketch.heapify(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN)));
+ }
+ return cpcUnion;
+ }
+
@Override
public Object cloneAggregatedValue(Object value) {
return deserializeAggregatedValue(serializeAggregatedValue(value));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
index 5dba35d637c9..67988115a884 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
@@ -19,7 +19,10 @@
package org.apache.pinot.segment.local.aggregator;
import com.google.common.annotations.VisibleForTesting;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
+import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
@@ -172,6 +175,27 @@ public Object applyAggregatedValue(Object value, Object aggregatedValue) {
return thetaUnion;
}
+ @Override
+ public Object applyRawValueFromBuffer(Object aggregatedValue, ByteBuffer buf) {
+ // True zero-copy: Sketch.wrap returns a read-only sketch view over the underlying Memory,
+ // which itself wraps the ByteBuffer without copying. Union.union walks the wrapped sketch's
+ // entries directly out of the source bytes — no heap allocation for sketch internals.
+ Union thetaUnion = extractUnion(aggregatedValue);
+ Sketch sketch = Sketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+ thetaUnion.union(sketch);
+ _maxByteSize = Math.max(_maxByteSize, thetaUnion.getCurrentBytes());
+ return thetaUnion;
+ }
+
+ @Override
+ public Object applyAggregatedValueFromBuffer(Object value, ByteBuffer buf) {
+ Union thetaUnion = extractUnion(value);
+ Sketch sketch = Sketch.wrap(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
+ thetaUnion.union(sketch);
+ _maxByteSize = Math.max(_maxByteSize, thetaUnion.getCurrentBytes());
+ return thetaUnion;
+ }
+
@Override
public Object cloneAggregatedValue(Object value) {
return deserializeAggregatedValue(serializeAggregatedValue(value));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
index e5db940dbcdf..b88f6f8140a5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
@@ -19,11 +19,16 @@
package org.apache.pinot.segment.local.aggregator;
import com.google.common.annotations.VisibleForTesting;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Sketches;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
@@ -113,6 +118,27 @@ public Object applyAggregatedValue(Object value, Object aggregatedValue) {
return tupleUnion;
}
+ @Override
+ public Object applyRawValueFromBuffer(Object aggregatedValue, ByteBuffer buf) {
+ // Sketches.heapifySketch materialises the tuple sketch and its IntegerSummary entries on heap;
+ // tuple sketches have no zero-copy wrap API. Saving is skipping the byte[] alloc at the call
+ // site only — summary objects are still materialised per retained entry during heapify.
+ Union tupleUnion = extractUnion(aggregatedValue);
+ Sketch sketch =
+ Sketches.heapifySketch(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
+ tupleUnion.union(sketch);
+ return tupleUnion;
+ }
+
+ @Override
+ public Object applyAggregatedValueFromBuffer(Object value, ByteBuffer buf) {
+ Union tupleUnion = extractUnion(value);
+ Sketch sketch =
+ Sketches.heapifySketch(Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN), new IntegerSummaryDeserializer());
+ tupleUnion.union(sketch);
+ return tupleUnion;
+ }
+
@Override
public Object cloneAggregatedValue(Object value) {
return deserializeAggregatedValue(serializeAggregatedValue(value));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
index 20b8c3971cd3..d29272228929 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -54,12 +55,51 @@ public interface ValueAggregator {
*/
A applyRawValue(A value, R rawValue);
+ /**
+ * Applies a raw value to the current aggregated value, reading from a {@link ByteBuffer} view of
+ * the serialized payload.
+ *
+ * The default implementation drains the buffer into a {@code byte[]} and delegates to
+ * {@link #applyRawValue}, preserving source compatibility for implementors that only handle
+ * byte-array raw values. Sketch implementations override to consume the buffer directly via
+ * {@code Memory.wrap(ByteBuffer)}, avoiding the per-call {@code byte[]} allocation.
+ *
+ *
The implementation MUST drain the buffer's remaining bytes before returning. The caller
+ * may invalidate the buffer immediately after the call (see the lifetime contract on
+ * {@link org.apache.pinot.segment.spi.index.reader.ForwardIndexReader#getBytesView}).
+ *
+ *
This method is only meaningful for aggregators whose raw values are byte payloads
+ * ({@code R = byte[]} or {@code R = Object} with {@code byte[]} dispatch). Aggregators with
+ * non-byte raw types should not be invoked through this method.
+ */
+ @SuppressWarnings("unchecked")
+ default A applyRawValueFromBuffer(A value, ByteBuffer buf) {
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ return applyRawValue(value, (R) bytes);
+ }
+
/**
* Applies an aggregated value to the current aggregated value.
*
NOTE: if value is mutable, will directly modify the value.
*/
A applyAggregatedValue(A value, A aggregatedValue);
+ /**
+ * Applies an aggregated value to the current aggregated value, reading from a {@link ByteBuffer}
+ * view of a serialized aggregated value.
+ *
+ *
The default implementation drains the buffer, deserializes via
+ * {@link #deserializeAggregatedValue(byte[])}, and delegates to {@link #applyAggregatedValue}.
+ * Sketch implementations override to consume the buffer directly. The same lifetime contract
+ * applies: the buffer must be consumed before this method returns.
+ */
+ default A applyAggregatedValueFromBuffer(A value, ByteBuffer buf) {
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ return applyAggregatedValue(value, deserializeAggregatedValue(bytes));
+ }
+
/**
* Clones an aggregated value.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index 9caacad67456..8fbc13b77a67 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -134,6 +134,18 @@ public byte[] getBytes(int docId, ReaderContext context) {
return context.getValue(docId);
}
+ @Override
+ public ByteBuffer getBytesView(int docId, ReaderContext context) {
+ return context.getValueView(docId);
+ }
+
+ @Override
+ public boolean isBufferViewStableAcrossReads() {
+ // PASS_THROUGH views slice the mmap'd chunk buffer (fresh per call); compressed views slice the
+ // per-context decompression scratch buffer, which is overwritten on the next chunk decode.
+ return getCompressionType() == ChunkCompressionType.PASS_THROUGH;
+ }
+
@Override
public Map getMap(int docId, ReaderContext context) {
return MapUtils.deserializeMap(context.getValue(docId));
@@ -284,6 +296,24 @@ public byte[] getValue(int docId) {
}
}
+ /**
+ * View variant of {@link #getValue} — returns a {@link ByteBuffer} slice into the cached
+ * decompressed chunk (or into the raw {@code PinotDataBuffer} for {@code PASS_THROUGH}). The
+ * returned slice is valid only until the next call on this context.
+ */
+ public ByteBuffer getValueView(int docId) {
+ if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+ return readSmallUncompressedValueView(docId);
+ } else {
+ try {
+ return decompressAndReadView(docId);
+ } catch (IOException e) {
+ LOGGER.error("Exception caught while decompressing data chunk", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
protected long chunkIndexFor(int docId) {
long low = 0;
long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
@@ -307,6 +337,11 @@ protected abstract byte[] processChunkAndReadFirstValue(int docId, long offset,
protected abstract byte[] readSmallUncompressedValue(int docId);
+ protected abstract ByteBuffer processChunkAndReadFirstValueView(int docId, long offset, long limit)
+ throws IOException;
+
+ protected abstract ByteBuffer readSmallUncompressedValueView(int docId);
+
private byte[] decompressAndRead(int docId)
throws IOException {
long metadataEntry = chunkIndexFor(docId);
@@ -325,6 +360,26 @@ private byte[] decompressAndRead(int docId)
return processChunkAndReadFirstValue(docId, offset, limit);
}
+ private ByteBuffer decompressAndReadView(int docId)
+ throws IOException {
+ // Mirrors decompressAndRead but dispatches to the view-returning terminal call. Kept as a
+ // duplicate metadata walk to avoid touching the hot byte[] path.
+ long metadataEntry = chunkIndexFor(docId);
+ int info = _metadata.getInt(metadataEntry);
+ _docIdOffset = info & 0x7FFFFFFF;
+ _regularChunk = _docIdOffset == info;
+ long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 0xFFFFFFFFL;
+ long limit;
+ if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+ _nextDocIdOffset = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+ limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + Integer.BYTES) & 0xFFFFFFFFL;
+ } else {
+ _nextDocIdOffset = Integer.MAX_VALUE;
+ limit = _chunks.size();
+ }
+ return processChunkAndReadFirstValueView(docId, offset, limit);
+ }
+
private void initAndRecordRangesForDocId(int docId, List ranges) {
// Due to binary search on metadata buffer, it's simple to record the entire metadata buffer byte ranges
_ranges = new ArrayList<>();
@@ -384,6 +439,30 @@ protected byte[] readSmallUncompressedValue(int docId) {
return bytes;
}
+ @Override
+ protected ByteBuffer processChunkAndReadFirstValueView(int docId, long offset, long limit) {
+ _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+ if (!_regularChunk) {
+ // Huge value: whole chunk is the value. Return a duplicated view so the caller can read it
+ // without disturbing _chunk's position cursor.
+ return _chunk.duplicate();
+ }
+ _numDocsInCurrentChunk = _chunk.getInt(0);
+ return readSmallUncompressedValueView(docId);
+ }
+
+ @Override
+ protected ByteBuffer readSmallUncompressedValueView(int docId) {
+ int index = docId - _docIdOffset;
+ int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+ int nextOffset =
+ index == _numDocsInCurrentChunk - 1 ? _chunk.limit() : _chunk.getInt((index + 2) * Integer.BYTES);
+ ByteBuffer view = _chunk.duplicate();
+ view.position(offset);
+ view.limit(nextOffset);
+ return view.slice();
+ }
+
@Override
public void close() {
}
@@ -440,6 +519,32 @@ protected byte[] readSmallUncompressedValue(int docId) {
return bytes;
}
+ @Override
+ protected ByteBuffer processChunkAndReadFirstValueView(int docId, long offset, long limit)
+ throws IOException {
+ _decompressedBuffer.clear();
+ ByteBuffer compressed = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+ if (_regularChunk) {
+ decompressChunk(compressed);
+ return readSmallUncompressedValueView(docId);
+ }
+ // Huge compressed value: no slice path available (decompression target is allocated per call).
+ // Fall back to wrapping the byte[].
+ return ByteBuffer.wrap(readHugeCompressedValue(compressed, _chunkDecompressor.decompressedLength(compressed)));
+ }
+
+ @Override
+ protected ByteBuffer readSmallUncompressedValueView(int docId) {
+ int index = docId - _docIdOffset;
+ int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
+ int nextOffset = index == _numDocsInCurrentChunk - 1 ? _decompressedBuffer.limit()
+ : _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
+ ByteBuffer view = _decompressedBuffer.duplicate();
+ view.position(offset);
+ view.limit(nextOffset);
+ return view.slice();
+ }
+
private byte[] readHugeCompressedValue(ByteBuffer compressed, int decompressedLength)
throws IOException {
// huge values don't have length prefixes; they occupy the entire chunk so are unambiguous
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index 2d529ff93a82..6ad4d3ea84b1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -24,6 +24,7 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -129,6 +130,21 @@ public byte[] getBytes(int docId, ChunkReaderContext context) {
}
}
+ @Override
+ public ByteBuffer getBytesView(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ return getBytesViewCompressed(docId, context);
+ } else {
+ return getBytesViewUncompressed(docId);
+ }
+ }
+
+ @Override
+ public boolean isBufferViewStableAcrossReads() {
+ // PASS_THROUGH views slice the underlying mmap'd buffer (fresh slice per call, never reused);
+ // compressed views slice the per-context decompression scratch buffer (overwritten on next read).
+ return getCompressionType() == ChunkCompressionType.PASS_THROUGH;
+ }
@Override
public Map getMap(int docId, ChunkReaderContext context) {
@@ -170,6 +186,39 @@ private byte[] getBytesUncompressed(int docId) {
return bytes;
}
+ /**
+ * View variant of {@link #getBytesCompressed} — slices into the per-context decompression buffer.
+ * The returned slice is valid only until the next call on the same context.
+ */
+ private ByteBuffer getBytesViewCompressed(int docId, ChunkReaderContext context) {
+ int chunkRowId = docId % _numDocsPerChunk;
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+ int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+ int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+ ByteBuffer view = chunkBuffer.duplicate();
+ view.position(valueStartOffset);
+ view.limit(valueEndOffset);
+ return view.slice();
+ }
+
+ /**
+ * View variant of {@link #getBytesUncompressed} — returns a direct view into the underlying
+ * {@code PinotDataBuffer} with no per-row allocation.
+ */
+ private ByteBuffer getBytesViewUncompressed(int docId) {
+ int chunkId = docId / _numDocsPerChunk;
+ int chunkRowId = docId % _numDocsPerChunk;
+
+ long chunkStartOffset = getChunkPosition(chunkId);
+ long valueStartOffset =
+ chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * ROW_OFFSET_SIZE);
+ long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+ return _dataBuffer.toDirectByteBuffer(valueStartOffset, (int) (valueEndOffset - valueStartOffset));
+ }
+
/**
* Helper method to compute the end offset of the value in the chunk buffer.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index e5ac0e94f75d..39775129a979 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -95,6 +96,29 @@ public int getDictId(int docId) {
return _forwardIndexReader.getDictId(docId, _forwardIndexReaderContext);
}
+ /**
+ * Returns the BYTES single-value at the given doc id as a {@link ByteBuffer} view, avoiding the
+ * per-call {@code byte[]} allocation of {@link #getValue}. Only valid for a non-dictionary-encoded
+ * single-value BYTES column whose underlying forward index reader reports
+ * {@link #isBufferViewStableAcrossReads()} {@code true}; callers must gate on that flag.
+ *
+ * This is the build-time analog of {@link org.apache.pinot.core.common.BlockValSet#getBytesValueViewsSV()}
+ * on the query path. The lifetime contract of the returned buffer follows
+ * {@link org.apache.pinot.segment.spi.index.reader.ForwardIndexReader#getBytesView}.
+ */
+ public ByteBuffer getValueAsBuffer(int docId) {
+ return _forwardIndexReader.getBytesView(docId, _forwardIndexReaderContext);
+ }
+
+ /**
+ * Returns {@code true} if {@link #getValueAsBuffer} views remain valid across the whole batch
+ * (i.e. holding many of them in a record array across a sort is safe). Answers from the underlying
+ * forward index reader.
+ */
+ public boolean isBufferViewStableAcrossReads() {
+ return _forwardIndexReader.isBufferViewStableAcrossReads();
+ }
+
public Object getValue(int docId) {
if (_forwardIndexReader.isDictionaryEncoded()) {
// Dictionary-encoded forward index
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
index 517bcd4a2725..dcdf5acdb7b0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -79,6 +80,11 @@ abstract class BaseSingleTreeBuilder implements SingleTreeBuilder {
final ValueAggregator[] _valueAggregators;
// Readers and data types for column in function-column pair
final PinotSegmentColumnReader[] _metricReaders;
+ // True for a metric whose source column is BYTES and whose forward index reader reports stable
+ // views across reads (PASS_THROUGH). When true, getSegmentRecord reads a zero-copy ByteBuffer
+ // view via getValueAsBuffer instead of allocating a byte[] per row, and mergeSegmentRecord
+ // dispatches to ValueAggregator.applyRawValueFromBuffer.
+ final boolean[] _metricUsesBufferPath;
final AggregationSpec[] _aggregationSpecs;
final int _maxLeafRecords;
@@ -138,6 +144,7 @@ static class Record {
_metrics = new String[_numMetrics];
_valueAggregators = new ValueAggregator[_numMetrics];
_metricReaders = new PinotSegmentColumnReader[_numMetrics];
+ _metricUsesBufferPath = new boolean[_numMetrics];
_aggregationSpecs = new AggregationSpec[_numMetrics];
int index = 0;
@@ -154,6 +161,13 @@ static class Record {
if (_valueAggregators[index].getAggregationType() != AggregationFunctionType.COUNT) {
String column = functionColumnPair.getColumn();
_metricReaders[index] = new PinotSegmentColumnReader(segment, column);
+ // Enable the zero-copy buffer path only when (a) the source column is BYTES (matches the
+ // ValueAggregator buffer SPI surface) and (b) the reader's views survive the read-all-then-
+ // sort batching done by sortAndAggregateSegmentRecords. The flag is reader-intrinsic
+ // (PASS_THROUGH var-byte readers report true, compressed readers report false).
+ _metricUsesBufferPath[index] =
+ _metricReaders[index].getValueType().getStoredType() == BYTES
+ && _metricReaders[index].isBufferViewStableAcrossReads();
}
index++;
@@ -244,7 +258,11 @@ Record getSegmentRecord(int docId) {
for (int i = 0; i < _numMetrics; i++) {
// Ignore the column for COUNT aggregation function
if (_metricReaders[i] != null) {
- metrics[i] = _metricReaders[i].getValue(docId);
+ // Zero-copy ByteBuffer view for stable BYTES readers (PASS_THROUGH); byte[] otherwise.
+ // The view survives the read-all-then-sort batch because PASS_THROUGH views are mmap-backed
+ // and never overwritten — see _metricUsesBufferPath at construction.
+ metrics[i] = _metricUsesBufferPath[i] ? _metricReaders[i].getValueAsBuffer(docId)
+ : _metricReaders[i].getValue(docId);
}
}
return new Record(dimensions, metrics);
@@ -265,7 +283,12 @@ Record mergeSegmentRecord(@Nullable Record aggregatedRecord, Record segmentRecor
for (int i = 0; i < _numMetrics; i++) {
Object rawValue = segmentRecord._metrics[i];
if (rawValue != null) {
- metrics[i] = _valueAggregators[i].getInitialAggregatedValue(rawValue);
+ // Buffer path (PASS_THROUGH BYTES metric): applyRawValueFromBuffer(null, buf) is
+ // semantically equivalent to getInitialAggregatedValue(rawBytes) for the sketch
+ // aggregators — extractUnion(null) creates a fresh accumulator and unions the buffer.
+ metrics[i] = (rawValue instanceof ByteBuffer)
+ ? _valueAggregators[i].applyRawValueFromBuffer(null, (ByteBuffer) rawValue)
+ : _valueAggregators[i].getInitialAggregatedValue(rawValue);
} else {
assert _valueAggregators[i].getAggregationType() == AggregationFunctionType.COUNT;
metrics[i] = 1L;
@@ -276,7 +299,9 @@ Record mergeSegmentRecord(@Nullable Record aggregatedRecord, Record segmentRecor
for (int i = 0; i < _numMetrics; i++) {
Object rawValue = segmentRecord._metrics[i];
if (rawValue != null) {
- aggregatedRecord._metrics[i] = _valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i], rawValue);
+ aggregatedRecord._metrics[i] = (rawValue instanceof ByteBuffer)
+ ? _valueAggregators[i].applyRawValueFromBuffer(aggregatedRecord._metrics[i], (ByteBuffer) rawValue)
+ : _valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i], rawValue);
} else {
assert _valueAggregators[i].getAggregationType() == AggregationFunctionType.COUNT;
aggregatedRecord._metrics[i] = ((long) aggregatedRecord._metrics[i]) + 1;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
index ee04e64fa538..c4f1edf8a137 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.stream.IntStream;
import org.apache.datasketches.cpc.CpcSketch;
@@ -102,6 +103,38 @@ public void applyRawValueShouldUnion() {
assertEquals(agg.getMaxAggregatedValueByteSize(), 2580);
}
+ @Test
+ public void applyRawValueFromBufferShouldMatchByteArray() {
+ CpcSketch input1 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input1::update);
+ CpcSketch input2 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input2::update);
+ DistinctCountCPCSketchValueAggregator agg = new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ byte[] result2bytes = agg.serializeAggregatedValue(input2);
+
+ CpcSketch viaArray = toSketch(agg.applyRawValue(input1, result2bytes));
+ DistinctCountCPCSketchValueAggregator aggBuf =
+ new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ CpcSketch viaBuffer = toSketch(aggBuf.applyRawValueFromBuffer(input1, ByteBuffer.wrap(result2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
+ @Test
+ public void applyAggregatedValueFromBufferShouldMatchByteArray() {
+ CpcSketch input1 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input1::update);
+ CpcSketch input2 = new CpcSketch();
+ IntStream.range(0, 1000).forEach(input2::update);
+ DistinctCountCPCSketchValueAggregator agg = new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ byte[] input2bytes = agg.serializeAggregatedValue(input2);
+
+ CpcSketch viaArray = toSketch(agg.applyAggregatedValue(input1, input2));
+ DistinctCountCPCSketchValueAggregator aggBuf =
+ new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ CpcSketch viaBuffer = toSketch(aggBuf.applyAggregatedValueFromBuffer(input1, ByteBuffer.wrap(input2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
@Test
public void applyRawValueShouldAdd() {
CpcSketch input1 = new CpcSketch();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
index fd5d24d4971a..94bbbc201001 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.stream.IntStream;
import org.apache.datasketches.theta.Sketch;
@@ -108,6 +109,42 @@ public void applyRawValueShouldUnion() {
assertEquals(agg.getMaxAggregatedValueByteSize(), union.getCurrentBytes());
}
+ @Test
+ public void applyRawValueFromBufferShouldMatchByteArray() {
+ UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input1::update);
+ Sketch result1 = input1.compact();
+ UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input2::update);
+ Sketch result2 = input2.compact();
+ DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
+ byte[] result2bytes = agg.serializeAggregatedValue(result2);
+
+ Sketch viaArray = toSketch(agg.applyRawValue(result1, result2bytes));
+ DistinctCountThetaSketchValueAggregator aggBuf =
+ new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
+ Sketch viaBuffer = toSketch(aggBuf.applyRawValueFromBuffer(result1, ByteBuffer.wrap(result2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
+ @Test
+ public void applyAggregatedValueFromBufferShouldMatchByteArray() {
+ UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input1::update);
+ Sketch result1 = input1.compact();
+ UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+ IntStream.range(0, 1000).forEach(input2::update);
+ Sketch result2 = input2.compact();
+ DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
+ byte[] result2bytes = agg.serializeAggregatedValue(result2);
+
+ Sketch viaArray = toSketch(agg.applyAggregatedValue(result1, result2));
+ DistinctCountThetaSketchValueAggregator aggBuf =
+ new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
+ Sketch viaBuffer = toSketch(aggBuf.applyAggregatedValueFromBuffer(result1, ByteBuffer.wrap(result2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
@Test
public void applyRawValueShouldAdd() {
UpdateSketch input1 = Sketches.updateSketchBuilder().build();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
index c6dbeb6d7f45..2435f5c9409a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.Union;
@@ -76,6 +77,40 @@ public void applyRawValueShouldUnion() {
assertEquals(agg.getMaxAggregatedValueByteSize(), 196632);
}
+ @Test
+ public void applyRawValueFromBufferShouldMatchByteArray() {
+ IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ s1.update("a", 1);
+ s2.update("b", 1);
+ IntegerTupleSketchValueAggregator agg =
+ new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
+ byte[] s2bytes = agg.serializeAggregatedValue(s2);
+
+ Sketch viaArray = toSketch(agg.applyRawValue(s1, s2bytes));
+ IntegerTupleSketchValueAggregator aggBuf =
+ new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
+ Sketch viaBuffer = toSketch(aggBuf.applyRawValueFromBuffer(s1, ByteBuffer.wrap(s2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
+ @Test
+ public void applyAggregatedValueFromBufferShouldMatchByteArray() {
+ IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ s1.update("a", 1);
+ s2.update("b", 1);
+ IntegerTupleSketchValueAggregator agg =
+ new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
+ byte[] s2bytes = agg.serializeAggregatedValue(s2);
+
+ Sketch viaArray = toSketch(agg.applyAggregatedValue(s1, s2));
+ IntegerTupleSketchValueAggregator aggBuf =
+ new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
+ Sketch viaBuffer = toSketch(aggBuf.applyAggregatedValueFromBuffer(s1, ByteBuffer.wrap(s2bytes)));
+ assertEquals(viaBuffer.getEstimate(), viaArray.getEstimate());
+ }
+
@SuppressWarnings("unchecked")
private Sketch toSketch(Object value) {
if (value instanceof Union) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/ForwardIndexBytesViewTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/ForwardIndexBytesViewTest.java
new file mode 100644
index 000000000000..789bea13dfe0
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/forward/ForwardIndexBytesViewTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Verifies that {@code getBytesView} on the var-byte forward index readers returns the same payload
+ * as {@code getBytes} across every supported compression mode, for both the V3 single-value reader
+ * and the V4 reader. Also exercises the huge-value (irregular chunk) path and the default
+ * fallback on a reader that only implements {@code getBytes}.
+ */
+public class ForwardIndexBytesViewTest implements PinotBuffersAfterMethodCheckRule {
+
+ private static final int NUM_DOCS = 1024;
+ private static final int VALUE_SIZE_BYTES = 96;
+ private static final long SEED = 0xC0FFEEL;
+
+ private File _dir;
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ _dir = new File(FileUtils.getTempDirectory(), "ForwardIndexBytesViewTest-" + UUID.randomUUID());
+ FileUtils.forceMkdir(_dir);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ FileUtils.deleteQuietly(_dir);
+ }
+
+ @DataProvider
+ public Object[][] compressions() {
+ return new Object[][]{
+ {ChunkCompressionType.PASS_THROUGH},
+ {ChunkCompressionType.SNAPPY},
+ {ChunkCompressionType.LZ4},
+ {ChunkCompressionType.ZSTANDARD}
+ };
+ }
+
+ @Test(dataProvider = "compressions")
+ public void testV3BytesViewMatchesGetBytes(ChunkCompressionType compression)
+ throws IOException {
+ File file = new File(_dir, "v3-" + compression + ".fwd");
+ byte[][] expected = randomValues(NUM_DOCS, VALUE_SIZE_BYTES);
+
+ try (VarByteChunkForwardIndexWriter writer = new VarByteChunkForwardIndexWriter(file, compression, NUM_DOCS,
+ /* numDocsPerChunk */ 128, VALUE_SIZE_BYTES, /* writerVersion */ 2)) {
+ for (byte[] value : expected) {
+ writer.putBytes(value);
+ }
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(file);
+ VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, DataType.BYTES);
+ ChunkReaderContext context = reader.createContext()) {
+ assertGetBytesViewMatches(reader, context, expected);
+ }
+ }
+
+ @Test(dataProvider = "compressions")
+ public void testV4BytesViewMatchesGetBytes(ChunkCompressionType compression)
+ throws IOException {
+ File file = new File(_dir, "v4-" + compression + ".fwd");
+ byte[][] expected = randomValues(NUM_DOCS, VALUE_SIZE_BYTES);
+
+ try (VarByteChunkForwardIndexWriterV4 writer = new VarByteChunkForwardIndexWriterV4(file, compression,
+ /* chunkSize */ 4096)) {
+ for (byte[] value : expected) {
+ writer.putBytes(value);
+ }
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(file);
+ VarByteChunkForwardIndexReaderV4 reader = new VarByteChunkForwardIndexReaderV4(buffer, DataType.BYTES, true);
+ VarByteChunkForwardIndexReaderV4.ReaderContext context = reader.createContext()) {
+ assertGetBytesViewMatches(reader, context, expected);
+ }
+ }
+
+ /**
+ * Exercises the huge-value path on V4: writes a single value larger than the chunk buffer so it
+ * lands in its own irregular chunk. Both compressed and uncompressed huge paths fall back to
+ * {@code ByteBuffer.wrap(byte[])}, so this is a correctness regression test rather than a
+ * zero-copy assertion.
+ */
+ @Test(dataProvider = "compressions")
+ public void testV4BytesViewHandlesHugeValue(ChunkCompressionType compression)
+ throws IOException {
+ File file = new File(_dir, "v4-huge-" + compression + ".fwd");
+ int chunkSize = 256;
+ int hugeValueSize = chunkSize * 8;
+ byte[][] expected = {randomBytes(VALUE_SIZE_BYTES, 1), randomBytes(hugeValueSize, 2), randomBytes(VALUE_SIZE_BYTES,
+ 3)};
+
+ try (VarByteChunkForwardIndexWriterV4 writer = new VarByteChunkForwardIndexWriterV4(file, compression, chunkSize)) {
+ for (byte[] value : expected) {
+ writer.putBytes(value);
+ }
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(file);
+ VarByteChunkForwardIndexReaderV4 reader = new VarByteChunkForwardIndexReaderV4(buffer, DataType.BYTES, true);
+ VarByteChunkForwardIndexReaderV4.ReaderContext context = reader.createContext()) {
+ assertGetBytesViewMatches(reader, context, expected);
+ }
+ }
+
+ /**
+ * Verifies the default {@link ForwardIndexReader#getBytesView} implementation works on a reader
+ * that only implements {@code getBytes}. Guarantees backwards compatibility for any external
+ * {@code ForwardIndexReader} that does not override the new method.
+ */
+ @Test
+ public void testDefaultGetBytesViewFallback() {
+ byte[] payload = randomBytes(VALUE_SIZE_BYTES, 42);
+
+ ForwardIndexReader reader = new ForwardIndexReader() {
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public DataType getStoredType() {
+ return DataType.BYTES;
+ }
+
+ @Override
+ public byte[] getBytes(int docId, ForwardIndexReaderContext context) {
+ return payload;
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+
+ ByteBuffer view = reader.getBytesView(0, null);
+ assertNotNull(view);
+ byte[] copy = new byte[view.remaining()];
+ view.get(copy);
+ assertEquals(copy, payload);
+ }
+
+ private static void assertGetBytesViewMatches(ForwardIndexReader reader,
+ C context, byte[][] expected)
+ throws IOException {
+ // Walk getBytes and getBytesView in separate passes with a fresh context each, mirroring the
+ // single-row consumption contract that real callers honour. Mixing both APIs on the same docId
+ // with a shared context exposes a latent quirk where the cached chunk state for an irregular
+ // (huge) chunk can't service a second read of the same docId.
+ for (int i = 0; i < expected.length; i++) {
+ byte[] viaArray = reader.getBytes(i, context);
+ assertEquals(viaArray, expected[i], "getBytes mismatch at doc " + i);
+ }
+ C freshContext = reader.createContext();
+ try {
+ for (int i = 0; i < expected.length; i++) {
+ ByteBuffer viaView = reader.getBytesView(i, freshContext);
+ byte[] viaViewCopy = new byte[viaView.remaining()];
+ viaView.get(viaViewCopy);
+ assertEquals(viaViewCopy, expected[i], "getBytesView mismatch at doc " + i);
+ }
+ } finally {
+ if (freshContext != null) {
+ freshContext.close();
+ }
+ }
+ }
+
+ private static byte[][] randomValues(int numDocs, int valueSize) {
+ byte[][] values = new byte[numDocs][];
+ Random random = new Random(SEED);
+ for (int i = 0; i < numDocs; i++) {
+ values[i] = new byte[valueSize];
+ random.nextBytes(values[i]);
+ }
+ return values;
+ }
+
+ private static byte[] randomBytes(int size, int seed) {
+ byte[] bytes = new byte[size];
+ new Random(seed).nextBytes(bytes);
+ return bytes;
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/SketchBuildPathParityTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/SketchBuildPathParityTest.java
new file mode 100644
index 000000000000..c57aeb5ab00c
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/SketchBuildPathParityTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.startree.v2.builder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Proves the star-tree {@code BaseSingleTreeBuilder} build path is correct under the new buffer
+ * wiring. Builds two segments containing the same serialized theta sketch records and a star-tree
+ * index over them — one with the BYTES source column compressed as {@code PASS_THROUGH} (which
+ * triggers the zero-copy {@code getValueAsBuffer} path in {@code getSegmentRecord} +
+ * {@code applyRawValueFromBuffer} dispatch in {@code mergeSegmentRecord}), the other with
+ * {@code LZ4} (which falls back to {@code getValue} + {@code applyRawValue}). The pre-aggregated
+ * star-tree theta metric must produce identical cardinality estimates regardless of which path
+ * the builder took.
+ */
+public class SketchBuildPathParityTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "SketchBuildPathParityTest");
+ private static final String TABLE_NAME = "testTable";
+ private static final String DIM_COLUMN = "dim";
+ private static final String THETA_COLUMN = "thetaColumn";
+ private static final String FUNCTION_COLUMN_PAIR = "distinctCountThetaSketch__" + THETA_COLUMN;
+ private static final int NUM_RECORDS = 1000;
+ private static final int DIM_CARDINALITY = 100;
+ private static final int DISTINCTS_PER_ROW = 3;
+
+ private ImmutableSegment _passThroughSegment;
+ private ImmutableSegment _lz4Segment;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(DIM_COLUMN, DataType.INT)
+ .addMetric(THETA_COLUMN, DataType.BYTES)
+ .build();
+
+ UpdateSketchBuilder sketchBuilder = new UpdateSketchBuilder();
+ List records = new ArrayList<>(NUM_RECORDS);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ GenericRow record = new GenericRow();
+ record.putValue(DIM_COLUMN, i % DIM_CARDINALITY);
+ UpdateSketch theta = sketchBuilder.build();
+ int base = i * DISTINCTS_PER_ROW;
+ for (int d = 0; d < DISTINCTS_PER_ROW; d++) {
+ theta.update(base + d);
+ }
+ record.putValue(THETA_COLUMN, theta.compact().toByteArray());
+ records.add(record);
+ }
+
+ _passThroughSegment = buildSegmentWithStarTree("passThrough", FieldConfig.CompressionCodec.PASS_THROUGH,
+ schema, records);
+ _lz4Segment = buildSegmentWithStarTree("lz4", FieldConfig.CompressionCodec.LZ4, schema, records);
+ }
+
+ private ImmutableSegment buildSegmentWithStarTree(String segmentName, FieldConfig.CompressionCodec codec,
+ Schema schema, List records)
+ throws Exception {
+ // Per-column compression for the BYTES source column. The star-tree builder reads this column
+ // through PinotSegmentColumnReader: for PASS_THROUGH the reader reports stable views and the
+ // builder takes the new buffer path; for LZ4 the flag is false and the builder uses the
+ // existing byte[] path.
+ List fieldConfigs = Collections.singletonList(
+ new FieldConfig(THETA_COLUMN, FieldConfig.EncodingType.RAW, Collections.emptyList(), codec, null));
+ StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(
+ Collections.singletonList(DIM_COLUMN), null, null,
+ Collections.singletonList(new StarTreeAggregationConfig(THETA_COLUMN, "DISTINCTCOUNTTHETASKETCH", null,
+ FieldConfig.CompressionCodec.PASS_THROUGH, true, null, null, null)),
+ 100);
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(Collections.singletonList(THETA_COLUMN))
+ .setFieldConfigList(fieldConfigs)
+ .setStarTreeIndexConfigs(Collections.singletonList(starTreeIndexConfig))
+ .build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(segmentName);
+ config.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(records));
+ driver.build();
+
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+ }
+
+ /**
+ * Sanity check: the two segments report stability correctly. This confirms the parity assertions
+ * below are actually comparing the buffer path against the byte[] path (not two byte[] runs).
+ */
+ @Test
+ public void buildPathsAreDistinguishedByStabilityFlag() {
+ assertTrue(_passThroughSegment.getDataSource(THETA_COLUMN).getForwardIndex().isBufferViewStableAcrossReads(),
+ "PASS_THROUGH source should report stable views (build buffer path)");
+ assertFalse(_lz4Segment.getDataSource(THETA_COLUMN).getForwardIndex().isBufferViewStableAcrossReads(),
+ "LZ4 source should report unstable views (build byte[] path)");
+ }
+
+ /**
+ * Core correctness assertion: the pre-aggregated star-tree theta metric produces byte-identical
+ * cardinality estimates in both segments. Any divergence indicates the buffer dispatch is
+ * producing different aggregated sketches than the byte[] path.
+ */
+ @Test
+ public void starTreeBuildProducesIdenticalAggregatesForBothPaths() {
+ List passThroughTrees = _passThroughSegment.getStarTrees();
+ List lz4Trees = _lz4Segment.getStarTrees();
+ assertNotNull(passThroughTrees, "PASS_THROUGH segment must have a star-tree");
+ assertNotNull(lz4Trees, "LZ4 segment must have a star-tree");
+ assertEquals(passThroughTrees.size(), 1);
+ assertEquals(lz4Trees.size(), 1);
+
+ StarTreeV2 passThroughTree = passThroughTrees.get(0);
+ StarTreeV2 lz4Tree = lz4Trees.get(0);
+ int passThroughDocs = passThroughTree.getMetadata().getNumDocs();
+ int lz4Docs = lz4Tree.getMetadata().getNumDocs();
+ assertEquals(passThroughDocs, lz4Docs, "star-trees must have the same number of aggregated docs");
+
+ // Read each pre-aggregated theta sketch from both star-trees and compare.
+ var passReader = passThroughTree.getDataSource(FUNCTION_COLUMN_PAIR).getForwardIndex();
+ var lz4Reader = lz4Tree.getDataSource(FUNCTION_COLUMN_PAIR).getForwardIndex();
+ try (var passContext = passReader.createContext(); var lz4Context = lz4Reader.createContext()) {
+ for (int docId = 0; docId < passThroughDocs; docId++) {
+ @SuppressWarnings("unchecked")
+ byte[] passBytes = ((org.apache.pinot.segment.spi.index.reader.ForwardIndexReader) passReader)
+ .getBytes(docId, passContext);
+ @SuppressWarnings("unchecked")
+ byte[] lz4Bytes = ((org.apache.pinot.segment.spi.index.reader.ForwardIndexReader) lz4Reader)
+ .getBytes(docId, lz4Context);
+ Sketch passSketch = Sketch.wrap(org.apache.datasketches.memory.Memory.wrap(passBytes));
+ Sketch lz4Sketch = Sketch.wrap(org.apache.datasketches.memory.Memory.wrap(lz4Bytes));
+ assertEquals(passSketch.getEstimate(), lz4Sketch.getEstimate(),
+ "aggregated theta sketch estimate diverged at star-tree docId " + docId);
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ if (_passThroughSegment != null) {
+ _passThroughSegment.destroy();
+ }
+ if (_lz4Segment != null) {
+ _lz4Segment.destroy();
+ }
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index d05078494cc7..3d56b36358af 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.spi.index.reader;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -509,6 +510,46 @@ default byte[] getBytes(int docId, T context) {
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the BYTES type single-value at the given document id as a {@link ByteBuffer} view,
+ * avoiding the per-call {@code byte[]} allocation performed by {@link #getBytes(int, ForwardIndexReaderContext)}.
+ *
+ * Lifetime contract: the returned buffer is valid only until the next call on this
+ * {@code ForwardIndexReader} that uses the same {@code context}. Reader implementations that
+ * decompress chunks into a per-context scratch buffer return a slice that becomes invalid the
+ * moment the next value is decoded. Callers MUST fully consume the returned buffer before making
+ * another reader call with the same context, and MUST NOT retain the buffer across calls.
+ *
+ *
The default implementation falls back to {@link #getBytes(int, ForwardIndexReaderContext)}
+ * and wraps the resulting array, so it is safe to call on any reader.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return a {@link ByteBuffer} positioned at the start of the value with limit at its end
+ */
+ default ByteBuffer getBytesView(int docId, T context) {
+ return ByteBuffer.wrap(getBytes(docId, context));
+ }
+
+ /**
+ * Returns {@code true} if the {@link ByteBuffer} returned by {@link #getBytesView} remains valid
+ * after subsequent reads on the same context — i.e. it is safe for a batched caller to materialize
+ * a block of views into an array and consume them later.
+ *
+ *
This holds for readers whose views point at storage that is not reused between reads (e.g. an
+ * uncompressed, memory-mapped forward index, where each call returns a fresh slice of the mapped
+ * buffer). It does NOT hold for readers that decode into a per-context scratch buffer (e.g. a
+ * compressed chunk reader), where the next read overwrites the previous view.
+ *
+ *
Default is {@code false} (conservative): the default {@link #getBytesView} wraps a fresh
+ * {@code byte[]} which is itself stable, but unknown implementations are assumed unsafe to batch.
+ * Callers that hold views across a block read MUST gate on this flag and fall back to
+ * {@link #getBytes} when it returns {@code false}.
+ */
+ default boolean isBufferViewStableAcrossReads() {
+ return false;
+ }
+
/**
* Reads the MAP type single-value at the given document id.
*