Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
Expand Down Expand Up @@ -113,6 +114,9 @@ public class TopNBenchmark
@Param({"all", "hour"})
private String queryGranularity;

@Param({"false", "force"})
private String vectorize;

private static final Logger log = new Logger(TopNBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
Expand Down Expand Up @@ -161,6 +165,7 @@ private void setupQueries()
.dimension("dimSequential")
.metric("sumFloatNormal")
.intervals(intervalSpec)
.context(ImmutableMap.of("vectorize", vectorize))
.aggregators(queryAggs);

basicQueries.put("A", queryBuilderA);
Expand All @@ -177,6 +182,7 @@ private void setupQueries()
.dimension("dimUniform")
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.intervals(intervalSpec)
.context(ImmutableMap.of("vectorize", vectorize))
.aggregators(queryAggs);

basicQueries.put("numericSort", queryBuilderA);
Expand All @@ -193,6 +199,7 @@ private void setupQueries()
.dimension("dimUniform")
.metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC))
.intervals(intervalSpec)
.context(ImmutableMap.of("vectorize", vectorize))
.aggregators(queryAggs);

basicQueries.put("alphanumericSort", queryBuilderA);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby.epinephelinae;

import it.unimi.dsi.fastutil.Hash;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Iterator;

/**
* On-heap {@link VectorGrouper} that grows aggregator state on demand, up to maximum limit of 2GB.
*
* Vectorized analogue of {@link org.apache.druid.query.topn.BaseTopNAlgorithm}'s
* {@code runWithCardinalityUnknown} path: used when dimension cardinality is unknown (numeric columns,
* non-dict-encoded string virtual columns) or when a dict-encoded string column's cardinality exceeds
* the processing buffer. Memory footprint is on-heap and grows with the distinct-key count — matching
* the non-vectorized path's memory profile for the same queries.
*/
public class HeapVectorGrouper implements VectorGrouper
{
private static final Hash.Strategy<byte[]> BYTE_ARRAY_HASH_STRATEGY = new Hash.Strategy<byte[]>()
{
@Override
public int hashCode(byte[] o)
{
return Arrays.hashCode(o);
}

@Override
public boolean equals(byte[] a, byte[] b)
{
return Arrays.equals(a, b);
}
};

private static final int MIN_INITIAL_STATE_BUFFER_SIZE = 4096;

private final AggregatorAdapters aggregators;
private final int keySize;
private final int aggStateSize;
private final Object2IntOpenCustomHashMap<byte[]> keyToOffset;

private boolean initialized;
private ByteBuffer aggStateBuffer;
private int aggStateEnd;

private int[] vAggregationPositions;
private int[] vAggregationRows;
private byte[] keyScratch;

public HeapVectorGrouper(final AggregatorAdapters aggregators, final int keySize)
{
this.aggregators = aggregators;
this.keySize = keySize;
this.aggStateSize = aggregators.spaceNeeded();
this.keyToOffset = new Object2IntOpenCustomHashMap<>(BYTE_ARRAY_HASH_STRATEGY);
this.keyToOffset.defaultReturnValue(-1);
}

@Override
public void initVectorized(final int maxVectorSize)
{
if (initialized) {
if (vAggregationPositions.length != maxVectorSize) {
throw new ISE(
"initVectorized called with different maxVectorSize (existing=%d, new=%d)",
vAggregationPositions.length,
maxVectorSize
);
}
return;
}
this.aggStateBuffer = ByteBuffer.allocate(MIN_INITIAL_STATE_BUFFER_SIZE);
this.vAggregationPositions = new int[maxVectorSize];
this.vAggregationRows = new int[maxVectorSize];
this.keyScratch = new byte[keySize];
this.aggStateEnd = 0;
this.initialized = true;
}

/**
* Contract: keys for rows [startRow, endRow) must be packed contiguously at {@code keySpace[0 ..
* numRows * keySize)}; {@code startRow}/{@code endRow} are source-vector indices used to look up aggregator
* input values.
*/
@Override
public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow)
{
final int numRows = endRow - startRow;

for (int i = 0; i < numRows; i++) {
keySpace.getByteArray((long) i * keySize, keyScratch, 0, keySize);
int offset = keyToOffset.getInt(keyScratch);
if (offset == -1) {
if ((long) aggStateEnd + aggStateSize > aggStateBuffer.capacity()) {
growBuffer((long) aggStateEnd + aggStateSize);
}
offset = aggStateEnd;
final byte[] keyCopy = Arrays.copyOf(keyScratch, keySize);
keyToOffset.put(keyCopy, offset);
aggregators.init(aggStateBuffer, offset);
aggStateEnd += aggStateSize;
}
vAggregationPositions[i] = offset;
}

aggregators.aggregateVector(
aggStateBuffer,
numRows,
vAggregationPositions,
Groupers.writeAggregationRows(vAggregationRows, startRow, endRow)
);

return AggregateResult.ok();
}

private void growBuffer(final long neededCapacity)
{

Check failure

Code scanning / CodeQL

Comparison of narrow type with wide type in loop condition High

Comparison between
expression
of type int and
expression
of wider type long.
if (neededCapacity > Integer.MAX_VALUE) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to make this limit configurable

throw new ISE("Aggregator state exceeds 2 GB; cardinality too high for HeapVectorGrouper");
}
int newCapacity = aggStateBuffer.capacity();
while (newCapacity < neededCapacity) {
final long doubled = (long) newCapacity * 2;
newCapacity = doubled > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) doubled;
}

final ByteBuffer oldBuffer = aggStateBuffer;
final ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);
oldBuffer.position(0);
oldBuffer.limit(aggStateEnd);
newBuffer.put(oldBuffer);

for (int pos = 0; pos < aggStateEnd; pos += aggStateSize) {
aggregators.relocate(pos, pos, oldBuffer, newBuffer);
}

this.aggStateBuffer = newBuffer;
}

@Override
public void reset()
{
aggregators.reset();
keyToOffset.clear();
aggStateEnd = 0;
}

@Override
public void close()
{
reset();
}

@Override
public CloseableIterator<Grouper.Entry<MemoryPointer>> iterator()
{
final Iterator<Object2IntMap.Entry<byte[]>> mapIter =
keyToOffset.object2IntEntrySet().fastIterator();

return new CloseableIterator<>()
{
final ReusableEntry<MemoryPointer> reusableEntry =
new ReusableEntry<>(new MemoryPointer(), new Object[aggregators.size()]);

@Override
public boolean hasNext()
{
return mapIter.hasNext();
}

@Override
public Grouper.Entry<MemoryPointer> next()
{
final Object2IntMap.Entry<byte[]> mapEntry = mapIter.next();
reusableEntry.getKey().set(Memory.wrap(mapEntry.getKey(), ByteOrder.nativeOrder()), 0);

final int position = mapEntry.getIntValue();
for (int i = 0; i < aggregators.size(); i++) {
reusableEntry.getValues()[i] = aggregators.get(aggStateBuffer, position, i);
}
return reusableEntry;
}

@Override
public void close()
{
// Nothing to close.
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.CursorGranularizer;
import org.apache.druid.query.QueryContexts;
Expand All @@ -35,6 +36,7 @@
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
import org.apache.druid.query.topn.vector.VectorTopNEngine;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
Expand Down Expand Up @@ -96,13 +98,34 @@ public Sequence<Result<TopNResultValue>> query(
if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated()));
}

final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);

final boolean canVectorize = cursorHolder.canVectorize()
&& VectorTopNEngine.canVectorize(query, cursorFactory);
final boolean shouldVectorize = query.context().getVectorize().shouldVectorize(canVectorize);

if (shouldVectorize) {
final ResourceHolder<ByteBuffer> bufHolder = bufferPool.take();
try {
final Closer resourceCloser = Closer.create();
resourceCloser.register(bufHolder);
resourceCloser.register(cursorHolder);
return Sequences.filter(
VectorTopNEngine.process(query, timeBoundaryInspector, cursorHolder, bufHolder.get()),
Predicates.notNull()
).withBaggage(resourceCloser);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Vectorized TopN bypasses existing query metrics reporting.

The new early return into VectorTopNEngine.process skips the row-path bookkeeping that reports TopN metrics today. In the non-vector path this method records queryMetrics.cursor(...), then getMapFn records dimensionCardinality(...) and algorithm selection, and TopNMapFn records selector and pass-size metrics. None of that runs when shouldVectorize is true, so enabling vectorization changes emitted TopN metrics and removes operational visibility into algorithm choice and cardinality. If that loss is intended it should be wired back explicitly; otherwise this is a regression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we should discuss is what algorithm/cursor metrics we want to expose in the vectorized path?

}
catch (Throwable t) {
throw CloseableUtils.closeAndWrapInCatch(t, bufHolder);
}
}

final Cursor cursor = cursorHolder.asCursor();
if (cursor == null) {
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
}

final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);

final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();

final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
Expand Down
Loading
Loading