From 3ddac399a7f1d3c8b0861870f4fbb738475681de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Fri, 17 Apr 2026 20:19:48 -0700 Subject: [PATCH] perf: vectorize topN native engine --- .../druid/benchmark/query/TopNBenchmark.java | 7 + .../epinephelinae/HeapVectorGrouper.java | 218 +++++++ .../druid/query/topn/TopNQueryEngine.java | 27 +- ...leValueStringTopNVectorColumnSelector.java | 92 +++ .../DoubleTopNVectorColumnSelector.java | 76 +++ .../vector/FloatTopNVectorColumnSelector.java | 76 +++ .../vector/LongTopNVectorColumnSelector.java | 76 +++ ...ullableDoubleTopNVectorColumnSelector.java | 88 +++ ...NullableFloatTopNVectorColumnSelector.java | 88 +++ .../NullableLongTopNVectorColumnSelector.java | 88 +++ ...leValueStringTopNVectorColumnSelector.java | 85 +++ .../TopNVectorColumnProcessorFactory.java | 156 +++++ .../topn/vector/TopNVectorColumnSelector.java | 65 +++ .../query/topn/vector/VectorTopNEngine.java | 373 ++++++++++++ .../epinephelinae/HeapVectorGrouperTest.java | 218 +++++++ .../TopNVectorColumnProcessorFactoryTest.java | 103 ++++ .../vector/TopNVectorColumnSelectorTest.java | 550 ++++++++++++++++++ .../sql/calcite/CalciteArraysQueryTest.java | 4 + .../sql/calcite/CalciteJoinQueryTest.java | 9 + 19 files changed, 2397 insertions(+), 2 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouper.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/DictionaryBuildingSingleValueStringTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/DoubleTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/FloatTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/LongTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/NullableDoubleTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/NullableFloatTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/NullableLongTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/SingleValueStringTopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactory.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelector.java create mode 100644 processing/src/main/java/org/apache/druid/query/topn/vector/VectorTopNEngine.java create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouperTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactoryTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelectorTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java index 438143e311ca..adaedad02dc0 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.StupidPool; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.FileUtils; @@ -113,6 +114,9 @@ public class TopNBenchmark @Param({"all", "hour"}) private String queryGranularity; + @Param({"false", "force"}) + private String vectorize; + private static final Logger log = new Logger(TopNBenchmark.class); private static final int RNG_SEED = 9999; private static final IndexMergerV9 INDEX_MERGER_V9; @@ -161,6 +165,7 @@ private void setupQueries() .dimension("dimSequential") .metric("sumFloatNormal") .intervals(intervalSpec) + .context(ImmutableMap.of("vectorize", vectorize)) .aggregators(queryAggs); basicQueries.put("A", queryBuilderA); @@ -177,6 +182,7 @@ private void setupQueries() .dimension("dimUniform") .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) .intervals(intervalSpec) + .context(ImmutableMap.of("vectorize", vectorize)) .aggregators(queryAggs); basicQueries.put("numericSort", queryBuilderA); @@ -193,6 +199,7 @@ private void setupQueries() .dimension("dimUniform") .metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC)) .intervals(intervalSpec) + .context(ImmutableMap.of("vectorize", vectorize)) .aggregators(queryAggs); basicQueries.put("alphanumericSort", queryBuilderA); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouper.java new file mode 100644 index 000000000000..32f67fabe724 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouper.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae; + +import it.unimi.dsi.fastutil.Hash; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap; +import org.apache.datasketches.memory.Memory; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.aggregation.AggregatorAdapters; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Iterator; + +/** + * On-heap {@link VectorGrouper} that grows aggregator state on demand, up to maximum limit of 2GB. + * + * Vectorized analogue of {@link org.apache.druid.query.topn.BaseTopNAlgorithm}'s + * {@code runWithCardinalityUnknown} path: used when dimension cardinality is unknown (numeric columns, + * non-dict-encoded string virtual columns) or when a dict-encoded string column's cardinality exceeds + * the processing buffer. Memory footprint is on-heap and grows with the distinct-key count — matching + * the non-vectorized path's memory profile for the same queries. + */ +public class HeapVectorGrouper implements VectorGrouper +{ + private static final Hash.Strategy BYTE_ARRAY_HASH_STRATEGY = new Hash.Strategy() + { + @Override + public int hashCode(byte[] o) + { + return Arrays.hashCode(o); + } + + @Override + public boolean equals(byte[] a, byte[] b) + { + return Arrays.equals(a, b); + } + }; + + private static final int MIN_INITIAL_STATE_BUFFER_SIZE = 4096; + + private final AggregatorAdapters aggregators; + private final int keySize; + private final int aggStateSize; + private final Object2IntOpenCustomHashMap keyToOffset; + + private boolean initialized; + private ByteBuffer aggStateBuffer; + private int aggStateEnd; + + private int[] vAggregationPositions; + private int[] vAggregationRows; + private byte[] keyScratch; + + public HeapVectorGrouper(final AggregatorAdapters aggregators, final int keySize) + { + this.aggregators = aggregators; + this.keySize = keySize; + this.aggStateSize = aggregators.spaceNeeded(); + this.keyToOffset = new Object2IntOpenCustomHashMap<>(BYTE_ARRAY_HASH_STRATEGY); + this.keyToOffset.defaultReturnValue(-1); + } + + @Override + public void initVectorized(final int maxVectorSize) + { + if (initialized) { + if (vAggregationPositions.length != maxVectorSize) { + throw new ISE( + "initVectorized called with different maxVectorSize (existing=%d, new=%d)", + vAggregationPositions.length, + maxVectorSize + ); + } + return; + } + this.aggStateBuffer = ByteBuffer.allocate(MIN_INITIAL_STATE_BUFFER_SIZE); + this.vAggregationPositions = new int[maxVectorSize]; + this.vAggregationRows = new int[maxVectorSize]; + this.keyScratch = new byte[keySize]; + this.aggStateEnd = 0; + this.initialized = true; + } + + /** + * Contract: keys for rows [startRow, endRow) must be packed contiguously at {@code keySpace[0 .. + * numRows * keySize)}; {@code startRow}/{@code endRow} are source-vector indices used to look up aggregator + * input values. + */ + @Override + public AggregateResult aggregateVector(final Memory keySpace, final int startRow, final int endRow) + { + final int numRows = endRow - startRow; + + for (int i = 0; i < numRows; i++) { + keySpace.getByteArray((long) i * keySize, keyScratch, 0, keySize); + int offset = keyToOffset.getInt(keyScratch); + if (offset == -1) { + if ((long) aggStateEnd + aggStateSize > aggStateBuffer.capacity()) { + growBuffer((long) aggStateEnd + aggStateSize); + } + offset = aggStateEnd; + final byte[] keyCopy = Arrays.copyOf(keyScratch, keySize); + keyToOffset.put(keyCopy, offset); + aggregators.init(aggStateBuffer, offset); + aggStateEnd += aggStateSize; + } + vAggregationPositions[i] = offset; + } + + aggregators.aggregateVector( + aggStateBuffer, + numRows, + vAggregationPositions, + Groupers.writeAggregationRows(vAggregationRows, startRow, endRow) + ); + + return AggregateResult.ok(); + } + + private void growBuffer(final long neededCapacity) + { + if (neededCapacity > Integer.MAX_VALUE) { + throw new ISE("Aggregator state exceeds 2 GB; cardinality too high for HeapVectorGrouper"); + } + int newCapacity = aggStateBuffer.capacity(); + while (newCapacity < neededCapacity) { + final long doubled = (long) newCapacity * 2; + newCapacity = doubled > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) doubled; + } + + final ByteBuffer oldBuffer = aggStateBuffer; + final ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity); + oldBuffer.position(0); + oldBuffer.limit(aggStateEnd); + newBuffer.put(oldBuffer); + + for (int pos = 0; pos < aggStateEnd; pos += aggStateSize) { + aggregators.relocate(pos, pos, oldBuffer, newBuffer); + } + + this.aggStateBuffer = newBuffer; + } + + @Override + public void reset() + { + aggregators.reset(); + keyToOffset.clear(); + aggStateEnd = 0; + } + + @Override + public void close() + { + reset(); + } + + @Override + public CloseableIterator> iterator() + { + final Iterator> mapIter = + keyToOffset.object2IntEntrySet().fastIterator(); + + return new CloseableIterator<>() + { + final ReusableEntry reusableEntry = + new ReusableEntry<>(new MemoryPointer(), new Object[aggregators.size()]); + + @Override + public boolean hasNext() + { + return mapIter.hasNext(); + } + + @Override + public Grouper.Entry next() + { + final Object2IntMap.Entry mapEntry = mapIter.next(); + reusableEntry.getKey().set(Memory.wrap(mapEntry.getKey(), ByteOrder.nativeOrder()), 0); + + final int position = mapEntry.getIntValue(); + for (int i = 0; i < aggregators.size(); i++) { + reusableEntry.getValues()[i] = aggregators.get(aggStateBuffer, position, i); + } + return reusableEntry; + } + + @Override + public void close() + { + // Nothing to close. + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index f8c7762c004a..bd0fc5bc5fb1 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.CursorGranularizer; import org.apache.druid.query.QueryContexts; @@ -35,6 +36,7 @@ import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor; import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory; +import org.apache.druid.query.topn.vector.VectorTopNEngine; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; @@ -96,13 +98,34 @@ public Sequence> query( if (cursorHolder.isPreAggregated()) { query = query.withAggregatorSpecs(Preconditions.checkNotNull(cursorHolder.getAggregatorsForPreAggregated())); } + + final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class); + + final boolean canVectorize = cursorHolder.canVectorize() + && VectorTopNEngine.canVectorize(query, cursorFactory); + final boolean shouldVectorize = query.context().getVectorize().shouldVectorize(canVectorize); + + if (shouldVectorize) { + final ResourceHolder bufHolder = bufferPool.take(); + try { + final Closer resourceCloser = Closer.create(); + resourceCloser.register(bufHolder); + resourceCloser.register(cursorHolder); + return Sequences.filter( + VectorTopNEngine.process(query, timeBoundaryInspector, cursorHolder, bufHolder.get()), + Predicates.notNull() + ).withBaggage(resourceCloser); + } + catch (Throwable t) { + throw CloseableUtils.closeAndWrapInCatch(t, bufHolder); + } + } + final Cursor cursor = cursorHolder.asCursor(); if (cursor == null) { return Sequences.withBaggage(Sequences.empty(), cursorHolder); } - final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class); - final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); final ColumnSelectorPlus> selectorPlus = diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/DictionaryBuildingSingleValueStringTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/DictionaryBuildingSingleValueStringTopNVectorColumnSelector.java new file mode 100644 index 000000000000..4a9f44e74d81 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/DictionaryBuildingSingleValueStringTopNVectorColumnSelector.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +/** + * {@link TopNVectorColumnSelector} for single-valued STRING columns that are not natively dictionary-encoded, + * such as expression virtual columns. Builds a local int dictionary on-the-fly and encodes keys as 4-byte + * dictionary IDs, matching the key format of {@link SingleValueStringTopNVectorColumnSelector}. + */ +public class DictionaryBuildingSingleValueStringTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorObjectSelector selector; + private final List dictionary = new ArrayList<>(); + private final Object2IntMap reverseDictionary = new Object2IntOpenHashMap<>(); + + DictionaryBuildingSingleValueStringTopNVectorColumnSelector(final VectorObjectSelector selector) + { + this.selector = selector; + reverseDictionary.defaultReturnValue(-1); + } + + @Override + public int getGroupingKeySize() + { + return Integer.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + final String value = DimensionHandlerUtils.convertObjectToString(vector[i]); + int dictId = reverseDictionary.getInt(value); + if (dictId < 0) { + dictId = dictionary.size(); + dictionary.add(value); + reverseDictionary.put(value, dictId); + } + keySpace.putInt(j, dictId); + } + } + + @Override + @Nullable + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + return dictionary.get(keyMemory.memory().getInt(keyMemory.position() + keyOffset)); + } + + @Override + public void reset() + { + dictionary.clear(); + reverseDictionary.clear(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/DoubleTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/DoubleTopNVectorColumnSelector.java new file mode 100644 index 000000000000..c6475f44ae3b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/DoubleTopNVectorColumnSelector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.vector.VectorValueSelector; + +/** + * {@link TopNVectorColumnSelector} for non-nullable DOUBLE columns. + * Keys are encoded as 8-byte doubles in the shared key space. + */ +public class DoubleTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + DoubleTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Double.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final double[] vector = selector.getDoubleVector(); + + if (keySize == Double.BYTES) { + keySpace.putDoubleArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putDouble(j, vector[i]); + } + } + } + + @Override + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + return keyMemory.memory().getDouble(keyMemory.position() + keyOffset); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/FloatTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/FloatTopNVectorColumnSelector.java new file mode 100644 index 000000000000..b86840704e20 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/FloatTopNVectorColumnSelector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.vector.VectorValueSelector; + +/** + * {@link TopNVectorColumnSelector} for non-nullable FLOAT columns. + * Keys are encoded as 4-byte floats in the shared key space. + */ +public class FloatTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + FloatTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Float.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final float[] vector = selector.getFloatVector(); + + if (keySize == Float.BYTES) { + keySpace.putFloatArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putFloat(j, vector[i]); + } + } + } + + @Override + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + return keyMemory.memory().getFloat(keyMemory.position() + keyOffset); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/LongTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/LongTopNVectorColumnSelector.java new file mode 100644 index 000000000000..2be4ec093232 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/LongTopNVectorColumnSelector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.vector.VectorValueSelector; + +/** + * {@link TopNVectorColumnSelector} for non-nullable LONG columns. + * Keys are encoded as 8-byte longs in the shared key space. + */ +public class LongTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + LongTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Long.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final long[] vector = selector.getLongVector(); + + if (keySize == Long.BYTES) { + keySpace.putLongArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putLong(j, vector[i]); + } + } + } + + @Override + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + return keyMemory.memory().getLong(keyMemory.position() + keyOffset); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/NullableDoubleTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableDoubleTopNVectorColumnSelector.java new file mode 100644 index 000000000000..b29660e13442 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableDoubleTopNVectorColumnSelector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; + +/** + * {@link TopNVectorColumnSelector} for nullable DOUBLE columns. + * Keys are encoded as a 1-byte null flag followed by an 8-byte double value. + */ +public class NullableDoubleTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + NullableDoubleTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Byte.BYTES + Double.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final double[] vector = selector.getDoubleVector(); + final boolean[] nulls = selector.getNullVector(); + + if (nulls != null) { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, nulls[i] ? TypeStrategies.IS_NULL_BYTE : TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putDouble(j + 1, vector[i]); + } + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putDouble(j + 1, vector[i]); + } + } + } + + @Override + @Nullable + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == TypeStrategies.IS_NULL_BYTE) { + return null; + } + return keyMemory.memory().getDouble(keyMemory.position() + keyOffset + 1); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/NullableFloatTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableFloatTopNVectorColumnSelector.java new file mode 100644 index 000000000000..0a9e1eb39fb3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableFloatTopNVectorColumnSelector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; + +/** + * {@link TopNVectorColumnSelector} for nullable FLOAT columns. + * Keys are encoded as a 1-byte null flag followed by a 4-byte float value. + */ +public class NullableFloatTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + NullableFloatTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Byte.BYTES + Float.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final float[] vector = selector.getFloatVector(); + final boolean[] nulls = selector.getNullVector(); + + if (nulls != null) { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, nulls[i] ? TypeStrategies.IS_NULL_BYTE : TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putFloat(j + 1, vector[i]); + } + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putFloat(j + 1, vector[i]); + } + } + } + + @Override + @Nullable + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == TypeStrategies.IS_NULL_BYTE) { + return null; + } + return keyMemory.memory().getFloat(keyMemory.position() + keyOffset + 1); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/NullableLongTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableLongTopNVectorColumnSelector.java new file mode 100644 index 000000000000..ff786e05426f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/NullableLongTopNVectorColumnSelector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; + +/** + * {@link TopNVectorColumnSelector} for nullable LONG columns. + * Keys are encoded as a 1-byte null flag followed by an 8-byte long value. + */ +public class NullableLongTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final VectorValueSelector selector; + + NullableLongTopNVectorColumnSelector(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Byte.BYTES + Long.BYTES; + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final long[] vector = selector.getLongVector(); + final boolean[] nulls = selector.getNullVector(); + + if (nulls != null) { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, nulls[i] ? TypeStrategies.IS_NULL_BYTE : TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putLong(j + 1, vector[i]); + } + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putByte(j, TypeStrategies.IS_NOT_NULL_BYTE); + keySpace.putLong(j + 1, vector[i]); + } + } + } + + @Override + @Nullable + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + if (keyMemory.memory().getByte(keyMemory.position() + keyOffset) == TypeStrategies.IS_NULL_BYTE) { + return null; + } + return keyMemory.memory().getLong(keyMemory.position() + keyOffset + 1); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/SingleValueStringTopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/SingleValueStringTopNVectorColumnSelector.java new file mode 100644 index 000000000000..c9544bee5a6d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/SingleValueStringTopNVectorColumnSelector.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; + +/** + * {@link TopNVectorColumnSelector} for dictionary-encoded single-value STRING columns. + * Keys are encoded as dictionary IDs (4-byte ints) in the shared key space. + */ +public class SingleValueStringTopNVectorColumnSelector implements TopNVectorColumnSelector +{ + private final SingleValueDimensionVectorSelector selector; + + SingleValueStringTopNVectorColumnSelector(final SingleValueDimensionVectorSelector selector) + { + this.selector = selector; + } + + @Override + public int getGroupingKeySize() + { + return Integer.BYTES; + } + + @Override + public int getValueCardinality() + { + return selector.getValueCardinality(); + } + + @Override + public void writeKeys( + final WritableMemory keySpace, + final int keySize, + final int keyOffset, + final int startRow, + final int endRow + ) + { + final int[] vector = selector.getRowVector(); + + if (keySize == Integer.BYTES) { + keySpace.putIntArray(keyOffset, vector, startRow, endRow - startRow); + } else { + for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { + keySpace.putInt(j, vector[i]); + } + } + } + + @Override + @Nullable + public Object getDimensionValue(final MemoryPointer keyMemory, final int keyOffset) + { + return selector.lookupName(keyMemory.memory().getInt(keyMemory.position() + keyOffset)); + } + + @Override + public void reset() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactory.java new file mode 100644 index 000000000000..be15268854d2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import com.google.common.base.Preconditions; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +/** + * Creates {@link TopNVectorColumnSelector} instances appropriate for each column type. + * + * @see org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnProcessorFactory the groupBy equivalent + */ +public class TopNVectorColumnProcessorFactory implements VectorColumnProcessorFactory +{ + private static final TopNVectorColumnProcessorFactory INSTANCE = new TopNVectorColumnProcessorFactory(); + + private TopNVectorColumnProcessorFactory() + { + // Singleton. + } + + public static TopNVectorColumnProcessorFactory instance() + { + return INSTANCE; + } + + @Override + public TopNVectorColumnSelector makeSingleValueDimensionProcessor( + final ColumnCapabilities capabilities, + final SingleValueDimensionVectorSelector selector + ) + { + Preconditions.checkArgument( + capabilities.is(ValueType.STRING), + "topN dimension processors must be STRING typed" + ); + return new SingleValueStringTopNVectorColumnSelector(selector); + } + + @Override + public TopNVectorColumnSelector makeMultiValueDimensionProcessor( + final ColumnCapabilities capabilities, + final MultiValueDimensionVectorSelector selector + ) + { + throw new UnsupportedOperationException( + "Vectorized topN on multi-value dictionary-encoded dimensions is not supported" + ); + } + + @Override + public TopNVectorColumnSelector makeFloatProcessor( + final ColumnCapabilities capabilities, + final VectorValueSelector selector + ) + { + if (capabilities.hasNulls().isFalse()) { + return new FloatTopNVectorColumnSelector(selector); + } + return new NullableFloatTopNVectorColumnSelector(selector); + } + + @Override + public TopNVectorColumnSelector makeDoubleProcessor( + final ColumnCapabilities capabilities, + final VectorValueSelector selector + ) + { + if (capabilities.hasNulls().isFalse()) { + return new DoubleTopNVectorColumnSelector(selector); + } + return new NullableDoubleTopNVectorColumnSelector(selector); + } + + @Override + public TopNVectorColumnSelector makeLongProcessor( + final ColumnCapabilities capabilities, + final VectorValueSelector selector + ) + { + if (capabilities.hasNulls().isFalse()) { + return new LongTopNVectorColumnSelector(selector); + } + return new NullableLongTopNVectorColumnSelector(selector); + } + + @Override + public TopNVectorColumnSelector makeArrayProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + throw new UnsupportedOperationException( + "Vectorized topN on ARRAY columns is not supported" + ); + } + + @Override + public TopNVectorColumnSelector makeObjectProcessor( + final ColumnCapabilities capabilities, + final VectorObjectSelector selector + ) + { + if (capabilities.is(ValueType.STRING)) { + if (capabilities.hasMultipleValues().isTrue()) { + throw new UnsupportedOperationException( + "Vectorized topN on multi-value dimensions is not supported" + ); + } + return new DictionaryBuildingSingleValueStringTopNVectorColumnSelector(selector); + } + + throw new UnsupportedOperationException( + "Vectorized topN is not supported for column type: " + capabilities.asTypeString() + ); + } + + /** + * Prefer dictionary-encoded selectors over object selectors when a valid, unique dictionary exists — + * avoids the cost of resolving the string and building a local dictionary. Non-unique dictionaries + * (two dict IDs mapping to the same string) route through the object path so + * {@link DictionaryBuildingSingleValueStringTopNVectorColumnSelector} can collapse aliased IDs by + * string value, matching {@link org.apache.druid.query.topn.types.StringTopNColumnAggregatesProcessor}'s + * behavior on the non-vectorized path. + */ + @Override + public boolean useDictionaryEncodedSelector(final ColumnCapabilities capabilities) + { + Preconditions.checkArgument(capabilities != null, "Capabilities must not be null"); + Preconditions.checkArgument(capabilities.is(ValueType.STRING), "Must only be called on a STRING column"); + return capabilities.isDictionaryEncoded().isTrue() && capabilities.areDictionaryValuesUnique().isTrue(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelector.java new file mode 100644 index 000000000000..7f93986e8916 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.GroupingSelector; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; + +import javax.annotation.Nullable; + +/** + * Column processor for topN dimensions in the vectorized execution path. + * + * Writes dimension keys into shared memory in the same format as + * {@link org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector}, so that a + * {@link org.apache.druid.query.groupby.epinephelinae.VectorGrouper} can be used for aggregation. After all rows in a + * bucket have been aggregated, {@link #getDimensionValue(MemoryPointer, int)} decodes a key back to the raw dimension + * value for insertion into a {@link org.apache.druid.query.topn.TopNResultBuilder}. + * + * @see TopNVectorColumnProcessorFactory + * @see org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor the non-vectorized equivalent + */ +public interface TopNVectorColumnSelector extends GroupingSelector +{ + /** + * Size in bytes of the key written for each row by this selector. + */ + int getGroupingKeySize(); + + /** + * Write dimension keys for rows [startRow, endRow) from the current vector into keySpace at keyOffset. + */ + void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow); + + /** + * Decode the key at keyMemory[keyOffset] back to a dimension value. + * + * @return the raw dimension value, or null for null dimension keys + */ + @Nullable + Object getDimensionValue(MemoryPointer keyMemory, int keyOffset); + + /** + * Reset any internal state, such as a locally-built dictionary. Must be called between processing buckets when + * a dictionary-building selector is in use. + */ + void reset(); +} diff --git a/processing/src/main/java/org/apache/druid/query/topn/vector/VectorTopNEngine.java b/processing/src/main/java/org/apache/druid/query/topn/vector/VectorTopNEngine.java new file mode 100644 index 000000000000..df325e39a0fb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/topn/vector/VectorTopNEngine.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import com.google.common.base.Suppliers; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.Order; +import org.apache.druid.query.Result; +import org.apache.druid.query.aggregation.AggregatorAdapters; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper; +import org.apache.druid.query.groupby.epinephelinae.Grouper; +import org.apache.druid.query.groupby.epinephelinae.HeapVectorGrouper; +import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNResultBuilder; +import org.apache.druid.query.topn.TopNResultValue; +import org.apache.druid.query.vector.VectorCursorGranularizer; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.TimeBoundaryInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorCursor; +import org.apache.druid.segment.virtual.VirtualizedColumnInspector; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Vectorized execution engine for {@link TopNQuery}, analogous to + * {@link org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine} for groupBy. + * + * Uses a {@link VectorGrouper} for batch aggregation (with vectorized null handling via + * {@link org.apache.druid.segment.vector.VectorValueSelector#getNullVector()}) and then applies top-N + * ordering via {@link TopNResultBuilder} after each time-bucket is fully aggregated. + * + * @see org.apache.druid.query.topn.TopNQueryEngine for the entry point that selects this path + */ +public class VectorTopNEngine +{ + private VectorTopNEngine() + { + // No instantiation. + } + + public static Sequence> process( + final TopNQuery query, + @Nullable final TimeBoundaryInspector timeBoundaryInspector, + final CursorHolder cursorHolder, + final ByteBuffer processingBuffer + ) + { + return new BaseSequence<>( + new BaseSequence.IteratorMaker, CloseableIterator>>() + { + @Override + public CloseableIterator> make() + { + final VectorCursor cursor = cursorHolder.asVectorCursor(); + + if (cursor == null) { + return new CloseableIterator<>() + { + @Override + public boolean hasNext() + { + return false; + } + + @Override + public Result next() + { + throw new NoSuchElementException(); + } + + @Override + public void close() + { + // Nothing to do. + } + }; + } + + final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + final TopNVectorColumnSelector selector = ColumnProcessors.makeVectorProcessor( + query.getDimensionSpec(), + TopNVectorColumnProcessorFactory.instance(), + columnSelectorFactory + ); + + return new VectorTopNEngineIterator( + query, + timeBoundaryInspector, + cursor, + cursorHolder.getTimeOrder(), + selector, + processingBuffer + ); + } + + @Override + public void cleanup(final CloseableIterator> iterFromMake) + { + try { + iterFromMake.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + ); + } + + /** + * Returns true if the given query is eligible for the vectorized topN path. + */ + public static boolean canVectorize(final TopNQuery query, final ColumnInspector inspector) + { + final DimensionSpec dimensionSpec = query.getDimensionSpec(); + + if (!dimensionSpec.canVectorize()) { + return false; + } + + // Decorated specs (e.g. extraction functions that are not one-to-one) change value semantics in ways that + // are incompatible with the vectorized grouper key approach. + if (dimensionSpec.mustDecorate()) { + return false; + } + + if (dimensionSpec.getOutputType().isArray()) { + return false; + } + + // Wrap with virtual columns so capabilities lookups for virtual column dimensions work correctly. + final ColumnInspector effectiveInspector = + new VirtualizedColumnInspector(inspector, query.getVirtualColumns()); + + final ColumnCapabilities capabilities = effectiveInspector.getColumnCapabilities(dimensionSpec.getDimension()); + // null means column does not exist; nil columns can be vectorized + if (capabilities != null && capabilities.hasMultipleValues().isMaybeTrue()) { + return false; + } + + // COMPLEX columns route to makeObjectProcessor which only handles STRING; reject them here so the + // capability check matches what the factory actually supports. + if (capabilities != null && capabilities.is(ValueType.COMPLEX)) { + return false; + } + + // TODO(vectorized-topn): the non-vectorized path coerces raw values to the dimension's output type before + // grouping (see TopNColumnAggregatesProcessorFactory). This path groups on the raw column type, so mixed-type + // queries (e.g. DOUBLE column with LONG output) would produce distinct groups that coerce to the same output + // value. Falling back for now; a future change could coerce at writeKeys time to match non-vec semantics. + if (capabilities != null && dimensionSpec.getOutputType().getType() != capabilities.getType()) { + return false; + } + + for (final AggregatorFactory agg : query.getAggregatorSpecs()) { + if (!agg.canVectorize(effectiveInspector)) { + return false; + } + } + + return true; + } + + static class VectorTopNEngineIterator implements CloseableIterator> + { + private final TopNQuery query; + private final VectorCursor cursor; + private final TopNVectorColumnSelector selector; + private final ByteBuffer processingBuffer; + private final int keySize; + private final WritableMemory keySpace; + private final Comparator comparator; + private final VectorGrouper grouper; + + @Nullable + private final VectorCursorGranularizer granularizer; + + private final Iterator bucketIterator; + + @Nullable + private Interval bucketInterval; + + VectorTopNEngineIterator( + final TopNQuery query, + @Nullable final TimeBoundaryInspector timeBoundaryInspector, + final VectorCursor cursor, + final Order timeOrder, + final TopNVectorColumnSelector selector, + final ByteBuffer processingBuffer + ) + { + this.query = query; + this.cursor = cursor; + this.selector = selector; + this.processingBuffer = processingBuffer; + this.keySize = selector.getGroupingKeySize(); + this.keySpace = WritableMemory.allocate(keySize * cursor.getMaxVectorSize()); + this.comparator = query.getTopNMetricSpec().getComparator( + query.getAggregatorSpecs(), + query.getPostAggregatorSpecs() + ); + this.granularizer = VectorCursorGranularizer.create( + cursor, + timeBoundaryInspector, + timeOrder, + query.getGranularity(), + query.getSingleInterval() + ); + + if (granularizer != null) { + this.bucketIterator = granularizer.getBucketIterable().iterator(); + } else { + this.bucketIterator = Collections.emptyIterator(); + } + + this.bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null; + this.grouper = makeGrouper(); + } + + @Override + public boolean hasNext() + { + // Return true for any remaining bucket, even if cursor is done — we still emit empty results. + return bucketInterval != null; + } + + @Override + public Result next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + // setCurrentOffsets must be called at the start of each vector batch to keep offsets current. + while (!cursor.isDone()) { + granularizer.setCurrentOffsets(bucketInterval); + final int startOffset = granularizer.getStartOffset(); + final int endOffset = granularizer.getEndOffset(); + + if (endOffset > startOffset) { + selector.writeKeys(keySpace, keySize, 0, startOffset, endOffset); + grouper.aggregateVector(keySpace, startOffset, endOffset); + } + + if (!granularizer.advanceCursorWithinBucket()) { + break; + } + } + + final DateTime bucketTimestamp = query.getGranularity().toDateTime(bucketInterval.getStartMillis()); + final TopNResultBuilder resultBuilder = query.getTopNMetricSpec().getResultBuilder( + bucketTimestamp, + query.getDimensionSpec(), + query.getThreshold(), + comparator, + query.getAggregatorSpecs(), + query.getPostAggregatorSpecs() + ); + + try (final CloseableIterator> iter = grouper.iterator()) { + while (iter.hasNext()) { + final Grouper.Entry entry = iter.next(); + final Object dimValue = selector.getDimensionValue(entry.getKey(), 0); + resultBuilder.addEntry(dimValue, dimValue, entry.getValues()); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null; + selector.reset(); + grouper.reset(); + return resultBuilder.build(); + } + + @Override + public void close() throws IOException + { + // Cursor is closed by the CursorHolder, which is closed by the calling sequence's baggage. + grouper.close(); + } + + private VectorGrouper makeGrouper() + { + final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + final List aggFactories = query.getAggregatorSpecs(); + final AggregatorAdapters aggregators = AggregatorAdapters.factorizeVector(columnSelectorFactory, aggFactories); + + // Use BufferArrayGrouper when the dimension is a dictionary-encoded string with small enough known cardinality + // to fit in the processing buffer — avoids hash-table overhead for the most common topN case. + final int cardinalityForArray = getCardinalityForArrayAggregation(aggFactories); + final VectorGrouper grouper; + if (cardinalityForArray >= 0) { + grouper = new BufferArrayGrouper(Suppliers.ofInstance(processingBuffer), aggregators, cardinalityForArray); + } else { + // Heap-backed grouper grows on demand; topN cannot accept partial aggregation, unlike groupBy. + grouper = new HeapVectorGrouper(aggregators, keySize); + } + + grouper.initVectorized(cursor.getMaxVectorSize()); + return grouper; + } + + private int getCardinalityForArrayAggregation(final List aggFactories) + { + final ColumnCapabilities capabilities = cursor.getColumnSelectorFactory() + .getColumnCapabilities(query.getDimensionSpec().getDimension()); + if (!Types.is(capabilities, ValueType.STRING)) { + return -1; + } + + // Virtual columns shadow real columns; a virtual column cannot report its own cardinality. + if (query.getVirtualColumns().exists(query.getDimensionSpec().getDimension())) { + return -1; + } + + final int cardinality = selector.getValueCardinality(); + if (cardinality <= 0) { + return -1; + } + + final long requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( + cardinality, + aggFactories.toArray(new AggregatorFactory[0]) + ); + if (requiredBufferCapacity < 0 || requiredBufferCapacity > processingBuffer.capacity()) { + return -1; + } + + return cardinality; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouperTest.java new file mode 100644 index 000000000000..753cefa4d1b1 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HeapVectorGrouperTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.aggregation.AggregatorAdapters; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.HashSet; +import java.util.Set; + +public class HeapVectorGrouperTest +{ + private static final int KEY_SIZE = Integer.BYTES; + private static final int AGG_SIZE = Long.BYTES; + private static final int MAX_VECTOR_SIZE = 512; + + // -- structural / always-ok tests -- + + @Test + public void testAggregateVectorAlwaysReturnsOk() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + final WritableMemory keySpace = makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, 10); + + AggregateResult result = grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + Assertions.assertTrue(result.isOk()); + } + + @Test + public void testCorrectNumberOfDistinctGroups() + { + final int distinctKeys = 7; + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + final WritableMemory keySpace = makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, distinctKeys); + + grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + + Assertions.assertEquals(distinctKeys, countGroups(grouper)); + } + + @Test + public void testSameKeyProducesSameGroupAcrossBatches() + { + final int distinctKeys = 3; + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + final WritableMemory keySpace = makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, distinctKeys); + + grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + + // Same keys across two batches → same group count + Assertions.assertEquals(distinctKeys, countGroups(grouper)); + } + + @Test + public void testGroupKeyRoundTrip() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + final WritableMemory keySpace = WritableMemory.allocate(KEY_SIZE * MAX_VECTOR_SIZE); + // Single batch with keys 0..4 + for (int i = 0; i < MAX_VECTOR_SIZE; i++) { + keySpace.putInt((long) i * KEY_SIZE, i % 5); + } + grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + + // Iterator must produce exactly the 5 keys we inserted + final Set seen = new HashSet<>(); + try (CloseableIterator> iter = grouper.iterator()) { + while (iter.hasNext()) { + final Grouper.Entry entry = iter.next(); + final int key = entry.getKey().memory().getInt(entry.getKey().position()); + seen.add(key); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + Assertions.assertEquals(Set.of(0, 1, 2, 3, 4), seen); + } + + // -- buffer growth -- + + @Test + public void testBufferGrowsToAccommodateManyGroups() + { + // Use a large aggSize to force growth sooner + final int aggSize = 1024; + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, aggSize); + + // Add enough distinct keys to require several doublings from the 4KB initial size + int key = 0; + final WritableMemory keySpace = WritableMemory.allocate(KEY_SIZE * MAX_VECTOR_SIZE); + for (int batch = 0; batch < 20; batch++) { + for (int i = 0; i < MAX_VECTOR_SIZE; i++) { + keySpace.putInt((long) i * KEY_SIZE, key++); + } + final AggregateResult result = grouper.aggregateVector(keySpace, 0, MAX_VECTOR_SIZE); + Assertions.assertTrue(result.isOk(), "batch " + batch + " must be ok"); + } + // All 20 * MAX_VECTOR_SIZE distinct keys must be present + Assertions.assertEquals(20 * MAX_VECTOR_SIZE, countGroups(grouper)); + } + + // -- reset / close -- + + @Test + public void testResetClearsAllGroups() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + grouper.aggregateVector(makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, 10), 0, MAX_VECTOR_SIZE); + + grouper.reset(); + + Assertions.assertEquals(0, countGroups(grouper)); + } + + @Test + public void testAfterResetNewGroupsCanBeAdded() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + grouper.aggregateVector(makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, 10), 0, MAX_VECTOR_SIZE); + grouper.reset(); + + grouper.aggregateVector(makeKeySpace(MAX_VECTOR_SIZE, KEY_SIZE, 3), 0, MAX_VECTOR_SIZE); + Assertions.assertEquals(3, countGroups(grouper)); + } + + @Test + public void testCloseCallsAggregatorsReset() + { + final AggregatorAdapters aggregators = Mockito.mock(AggregatorAdapters.class); + Mockito.when(aggregators.spaceNeeded()).thenReturn(AGG_SIZE); + + final HeapVectorGrouper grouper = new HeapVectorGrouper(aggregators, KEY_SIZE); + grouper.initVectorized(MAX_VECTOR_SIZE); + grouper.close(); + + Mockito.verify(aggregators, Mockito.times(1)).reset(); + } + + // -- initVectorized contracts -- + + @Test + public void testInitTwiceWithSameSizeIsNoOp() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + grouper.initVectorized(MAX_VECTOR_SIZE); + // Must not throw — second call with same size is idempotent + grouper.initVectorized(MAX_VECTOR_SIZE); + } + + @Test + public void testInitTwiceWithDifferentSizeThrows() + { + final HeapVectorGrouper grouper = makeGrouper(KEY_SIZE, AGG_SIZE); + grouper.initVectorized(512); + Assertions.assertThrows(ISE.class, () -> grouper.initVectorized(256)); + } + + // -- helpers -- + + private HeapVectorGrouper makeGrouper(final int keySize, final int aggSize) + { + final AggregatorAdapters aggregators = Mockito.mock(AggregatorAdapters.class); + Mockito.when(aggregators.spaceNeeded()).thenReturn(aggSize); + + final HeapVectorGrouper grouper = new HeapVectorGrouper(aggregators, keySize); + grouper.initVectorized(MAX_VECTOR_SIZE); + return grouper; + } + + private WritableMemory makeKeySpace(final int numRows, final int keySize, final int distinctKeys) + { + final WritableMemory keySpace = WritableMemory.allocate(keySize * numRows); + for (int i = 0; i < numRows; i++) { + keySpace.putInt((long) i * keySize, i % distinctKeys); + } + return keySpace; + } + + private int countGroups(final HeapVectorGrouper grouper) + { + int count = 0; + try (CloseableIterator> iter = grouper.iterator()) { + while (iter.hasNext()) { + iter.next(); + count++; + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + return count; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactoryTest.java new file mode 100644 index 000000000000..6dca8e2294a5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnProcessorFactoryTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TopNVectorColumnProcessorFactoryTest +{ + private final TopNVectorColumnProcessorFactory factory = TopNVectorColumnProcessorFactory.instance(); + + @Test + public void testUseDictionaryEncodedSelectorTrueForUniqueDictEncodedString() + { + final ColumnCapabilitiesImpl caps = new ColumnCapabilitiesImpl() + .setType(ColumnType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(true) + .setHasMultipleValues(false); + + Assertions.assertTrue(factory.useDictionaryEncodedSelector(caps)); + } + + @Test + public void testUseDictionaryEncodedSelectorFalseForNonUniqueDict() + { + // Non-unique dict (two dict IDs can map to the same string) must route through the object path so the + // DictionaryBuildingSingleValueStringTopNVectorColumnSelector collapses aliased IDs by string value. + final ColumnCapabilitiesImpl caps = new ColumnCapabilitiesImpl() + .setType(ColumnType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(false) + .setHasMultipleValues(false); + + Assertions.assertFalse(factory.useDictionaryEncodedSelector(caps)); + } + + @Test + public void testUseDictionaryEncodedSelectorFalseForNonDictEncodedString() + { + final ColumnCapabilitiesImpl caps = new ColumnCapabilitiesImpl() + .setType(ColumnType.STRING) + .setDictionaryEncoded(false) + .setDictionaryValuesUnique(false) + .setHasMultipleValues(false); + + Assertions.assertFalse(factory.useDictionaryEncodedSelector(caps)); + } + + @Test + public void testDictEncodedUniqueStringProducesDictIdSelector() + { + final ColumnCapabilitiesImpl caps = new ColumnCapabilitiesImpl() + .setType(ColumnType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(true) + .setHasMultipleValues(false); + final SingleValueDimensionVectorSelector selector = Mockito.mock(SingleValueDimensionVectorSelector.class); + + final TopNVectorColumnSelector processor = factory.makeSingleValueDimensionProcessor(caps, selector); + + Assertions.assertTrue(processor instanceof SingleValueStringTopNVectorColumnSelector); + } + + @Test + public void testNonUniqueDictRoutesObjectPathToDictionaryBuildingSelector() + { + // When a non-unique dict routes through makeObjectProcessor, it must produce the dictionary-building selector + // so aliased dict IDs are collapsed by string value — matching the non-vectorized path's semantics. + final ColumnCapabilitiesImpl caps = new ColumnCapabilitiesImpl() + .setType(ColumnType.STRING) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(false) + .setHasMultipleValues(false); + final VectorObjectSelector selector = Mockito.mock(VectorObjectSelector.class); + + final TopNVectorColumnSelector processor = factory.makeObjectProcessor(caps, selector); + + Assertions.assertTrue(processor instanceof DictionaryBuildingSingleValueStringTopNVectorColumnSelector); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelectorTest.java b/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelectorTest.java new file mode 100644 index 000000000000..cc04b3c08220 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/topn/vector/TopNVectorColumnSelectorTest.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn.vector; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.query.groupby.epinephelinae.collection.MemoryPointer; +import org.apache.druid.segment.column.TypeStrategies; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TopNVectorColumnSelectorTest +{ + private static final int NUM_ROWS = 4; + + // -- LongTopNVectorColumnSelector -- + + @Test + public void testLongKeySizeIs8() + { + final LongTopNVectorColumnSelector sel = new LongTopNVectorColumnSelector(mockLongSelector(new long[]{1, 2, 3, 4})); + Assertions.assertEquals(Long.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testLongWriteKeysBulkPath() + { + final long[] longs = {10L, 20L, 30L, 40L}; + final LongTopNVectorColumnSelector sel = new LongTopNVectorColumnSelector(mockLongSelector(longs)); + final WritableMemory keySpace = WritableMemory.allocate(Long.BYTES * NUM_ROWS); + + sel.writeKeys(keySpace, Long.BYTES, 0, 0, NUM_ROWS); + + for (int i = 0; i < NUM_ROWS; i++) { + Assertions.assertEquals(longs[i], keySpace.getLong((long) i * Long.BYTES)); + } + } + + @Test + public void testLongWriteKeysStridedPath() + { + final long[] longs = {10L, 20L, 30L, 40L}; + final LongTopNVectorColumnSelector sel = new LongTopNVectorColumnSelector(mockLongSelector(longs)); + // keySize > Long.BYTES forces the strided path + final int keySize = Long.BYTES + 1; + final WritableMemory keySpace = WritableMemory.allocate(keySize * NUM_ROWS); + + sel.writeKeys(keySpace, keySize, 0, 0, NUM_ROWS); + + for (int i = 0; i < NUM_ROWS; i++) { + Assertions.assertEquals(longs[i], keySpace.getLong((long) i * keySize)); + } + } + + @Test + public void testLongGetDimensionValue() + { + final LongTopNVectorColumnSelector sel = new LongTopNVectorColumnSelector(mockLongSelector(new long[]{42L})); + final WritableMemory mem = WritableMemory.allocate(Long.BYTES); + mem.putLong(0, 42L); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals(42L, sel.getDimensionValue(ptr, 0)); + } + + // -- NullableLongTopNVectorColumnSelector -- + + @Test + public void testNullableLongKeySizeIs9() + { + final NullableLongTopNVectorColumnSelector sel = + new NullableLongTopNVectorColumnSelector(mockLongSelector(new long[]{1})); + Assertions.assertEquals(Byte.BYTES + Long.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testNullableLongWriteKeysWithNullVector() + { + final long[] longs = {0L, 99L}; + final boolean[] nulls = {true, false}; + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getLongVector()).thenReturn(longs); + Mockito.when(mock.getNullVector()).thenReturn(nulls); + + final NullableLongTopNVectorColumnSelector sel = new NullableLongTopNVectorColumnSelector(mock); + final int keySize = Byte.BYTES + Long.BYTES; + final WritableMemory keySpace = WritableMemory.allocate(keySize * 2); + + sel.writeKeys(keySpace, keySize, 0, 0, 2); + + Assertions.assertEquals(TypeStrategies.IS_NULL_BYTE, keySpace.getByte(0)); + Assertions.assertEquals(TypeStrategies.IS_NOT_NULL_BYTE, keySpace.getByte(keySize)); + Assertions.assertEquals(99L, keySpace.getLong(keySize + 1)); + } + + @Test + public void testNullableLongWriteKeysWithoutNullVector() + { + final long[] longs = {7L, 8L}; + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getLongVector()).thenReturn(longs); + Mockito.when(mock.getNullVector()).thenReturn(null); + + final NullableLongTopNVectorColumnSelector sel = new NullableLongTopNVectorColumnSelector(mock); + final int keySize = Byte.BYTES + Long.BYTES; + final WritableMemory keySpace = WritableMemory.allocate(keySize * 2); + + sel.writeKeys(keySpace, keySize, 0, 0, 2); + + Assertions.assertEquals(TypeStrategies.IS_NOT_NULL_BYTE, keySpace.getByte(0)); + Assertions.assertEquals(7L, keySpace.getLong(1)); + Assertions.assertEquals(TypeStrategies.IS_NOT_NULL_BYTE, keySpace.getByte(keySize)); + Assertions.assertEquals(8L, keySpace.getLong(keySize + 1)); + } + + @Test + public void testNullableLongGetDimensionValueNull() + { + final NullableLongTopNVectorColumnSelector sel = + new NullableLongTopNVectorColumnSelector(mockLongSelector(new long[]{0})); + final int keySize = Byte.BYTES + Long.BYTES; + final WritableMemory mem = WritableMemory.allocate(keySize); + mem.putByte(0, TypeStrategies.IS_NULL_BYTE); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertNull(sel.getDimensionValue(ptr, 0)); + } + + @Test + public void testNullableLongGetDimensionValueNonNull() + { + final NullableLongTopNVectorColumnSelector sel = + new NullableLongTopNVectorColumnSelector(mockLongSelector(new long[]{0})); + final int keySize = Byte.BYTES + Long.BYTES; + final WritableMemory mem = WritableMemory.allocate(keySize); + mem.putByte(0, TypeStrategies.IS_NOT_NULL_BYTE); + mem.putLong(1, 55L); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals(55L, sel.getDimensionValue(ptr, 0)); + } + + // -- DoubleTopNVectorColumnSelector -- + + @Test + public void testDoubleKeySizeIs8() + { + final DoubleTopNVectorColumnSelector sel = + new DoubleTopNVectorColumnSelector(mockDoubleSelector(new double[]{1.0})); + Assertions.assertEquals(Double.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testDoubleWriteKeysBulkPath() + { + final double[] doubles = {1.1, 2.2, 3.3}; + final DoubleTopNVectorColumnSelector sel = new DoubleTopNVectorColumnSelector(mockDoubleSelector(doubles)); + final WritableMemory keySpace = WritableMemory.allocate(Double.BYTES * 3); + + sel.writeKeys(keySpace, Double.BYTES, 0, 0, 3); + + for (int i = 0; i < 3; i++) { + Assertions.assertEquals(doubles[i], keySpace.getDouble((long) i * Double.BYTES), 0.0); + } + } + + @Test + public void testDoubleGetDimensionValue() + { + final DoubleTopNVectorColumnSelector sel = + new DoubleTopNVectorColumnSelector(mockDoubleSelector(new double[]{0})); + final WritableMemory mem = WritableMemory.allocate(Double.BYTES); + mem.putDouble(0, 3.14); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals(3.14, (double) sel.getDimensionValue(ptr, 0), 0.0); + } + + // -- NullableDoubleTopNVectorColumnSelector -- + + @Test + public void testNullableDoubleKeySizeIs9() + { + final NullableDoubleTopNVectorColumnSelector sel = + new NullableDoubleTopNVectorColumnSelector(mockDoubleSelector(new double[]{1.0})); + Assertions.assertEquals(Byte.BYTES + Double.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testNullableDoubleWriteKeysWithNullVector() + { + final double[] doubles = {0.0, 2.5}; + final boolean[] nulls = {true, false}; + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getDoubleVector()).thenReturn(doubles); + Mockito.when(mock.getNullVector()).thenReturn(nulls); + + final NullableDoubleTopNVectorColumnSelector sel = new NullableDoubleTopNVectorColumnSelector(mock); + final int keySize = Byte.BYTES + Double.BYTES; + final WritableMemory keySpace = WritableMemory.allocate(keySize * 2); + + sel.writeKeys(keySpace, keySize, 0, 0, 2); + + Assertions.assertEquals(TypeStrategies.IS_NULL_BYTE, keySpace.getByte(0)); + Assertions.assertEquals(TypeStrategies.IS_NOT_NULL_BYTE, keySpace.getByte(keySize)); + Assertions.assertEquals(2.5, keySpace.getDouble(keySize + 1), 0.0); + } + + @Test + public void testNullableDoubleGetDimensionValueNull() + { + final NullableDoubleTopNVectorColumnSelector sel = + new NullableDoubleTopNVectorColumnSelector(mockDoubleSelector(new double[]{0})); + final int keySize = Byte.BYTES + Double.BYTES; + final WritableMemory mem = WritableMemory.allocate(keySize); + mem.putByte(0, TypeStrategies.IS_NULL_BYTE); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertNull(sel.getDimensionValue(ptr, 0)); + } + + // -- FloatTopNVectorColumnSelector -- + + @Test + public void testFloatKeySizeIs4() + { + final FloatTopNVectorColumnSelector sel = + new FloatTopNVectorColumnSelector(mockFloatSelector(new float[]{1.0f})); + Assertions.assertEquals(Float.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testFloatWriteKeysBulkPath() + { + final float[] floats = {1.1f, 2.2f, 3.3f}; + final FloatTopNVectorColumnSelector sel = new FloatTopNVectorColumnSelector(mockFloatSelector(floats)); + final WritableMemory keySpace = WritableMemory.allocate(Float.BYTES * 3); + + sel.writeKeys(keySpace, Float.BYTES, 0, 0, 3); + + for (int i = 0; i < 3; i++) { + Assertions.assertEquals(floats[i], keySpace.getFloat((long) i * Float.BYTES), 0.0f); + } + } + + @Test + public void testFloatGetDimensionValue() + { + final FloatTopNVectorColumnSelector sel = + new FloatTopNVectorColumnSelector(mockFloatSelector(new float[]{0})); + final WritableMemory mem = WritableMemory.allocate(Float.BYTES); + mem.putFloat(0, 1.5f); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals(1.5f, (float) sel.getDimensionValue(ptr, 0), 0.0f); + } + + // -- NullableFloatTopNVectorColumnSelector -- + + @Test + public void testNullableFloatKeySizeIs5() + { + final NullableFloatTopNVectorColumnSelector sel = + new NullableFloatTopNVectorColumnSelector(mockFloatSelector(new float[]{1.0f})); + Assertions.assertEquals(Byte.BYTES + Float.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testNullableFloatWriteKeysWithNullVector() + { + final float[] floats = {0.0f, 9.9f}; + final boolean[] nulls = {true, false}; + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getFloatVector()).thenReturn(floats); + Mockito.when(mock.getNullVector()).thenReturn(nulls); + + final NullableFloatTopNVectorColumnSelector sel = new NullableFloatTopNVectorColumnSelector(mock); + final int keySize = Byte.BYTES + Float.BYTES; + final WritableMemory keySpace = WritableMemory.allocate(keySize * 2); + + sel.writeKeys(keySpace, keySize, 0, 0, 2); + + Assertions.assertEquals(TypeStrategies.IS_NULL_BYTE, keySpace.getByte(0)); + Assertions.assertEquals(TypeStrategies.IS_NOT_NULL_BYTE, keySpace.getByte(keySize)); + Assertions.assertEquals(9.9f, keySpace.getFloat(keySize + 1), 0.0f); + } + + @Test + public void testNullableFloatGetDimensionValueNull() + { + final NullableFloatTopNVectorColumnSelector sel = + new NullableFloatTopNVectorColumnSelector(mockFloatSelector(new float[]{0})); + final int keySize = Byte.BYTES + Float.BYTES; + final WritableMemory mem = WritableMemory.allocate(keySize); + mem.putByte(0, TypeStrategies.IS_NULL_BYTE); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertNull(sel.getDimensionValue(ptr, 0)); + } + + @Test + public void testNullableFloatGetDimensionValueNonNull() + { + final NullableFloatTopNVectorColumnSelector sel = + new NullableFloatTopNVectorColumnSelector(mockFloatSelector(new float[]{0})); + final int keySize = Byte.BYTES + Float.BYTES; + final WritableMemory mem = WritableMemory.allocate(keySize); + mem.putByte(0, TypeStrategies.IS_NOT_NULL_BYTE); + mem.putFloat(1, 7.7f); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals(7.7f, (float) sel.getDimensionValue(ptr, 0), 0.0f); + } + + // -- SingleValueStringTopNVectorColumnSelector -- + + @Test + public void testStringKeySizeIs4() + { + final SingleValueDimensionVectorSelector mock = Mockito.mock(SingleValueDimensionVectorSelector.class); + final SingleValueStringTopNVectorColumnSelector sel = new SingleValueStringTopNVectorColumnSelector(mock); + Assertions.assertEquals(Integer.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testStringWriteKeysBulkPath() + { + final int[] dictIds = {2, 0, 1, 3}; + final SingleValueDimensionVectorSelector mock = Mockito.mock(SingleValueDimensionVectorSelector.class); + Mockito.when(mock.getRowVector()).thenReturn(dictIds); + + final SingleValueStringTopNVectorColumnSelector sel = new SingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory keySpace = WritableMemory.allocate(Integer.BYTES * NUM_ROWS); + + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, NUM_ROWS); + + for (int i = 0; i < NUM_ROWS; i++) { + Assertions.assertEquals(dictIds[i], keySpace.getInt((long) i * Integer.BYTES)); + } + } + + @Test + public void testStringWriteKeysStridedPath() + { + final int[] dictIds = {5, 3}; + final SingleValueDimensionVectorSelector mock = Mockito.mock(SingleValueDimensionVectorSelector.class); + Mockito.when(mock.getRowVector()).thenReturn(dictIds); + + final SingleValueStringTopNVectorColumnSelector sel = new SingleValueStringTopNVectorColumnSelector(mock); + // keySize > Integer.BYTES forces the strided path + final int keySize = Integer.BYTES + 1; + final WritableMemory keySpace = WritableMemory.allocate(keySize * 2); + + sel.writeKeys(keySpace, keySize, 0, 0, 2); + + Assertions.assertEquals(5, keySpace.getInt(0)); + Assertions.assertEquals(3, keySpace.getInt(keySize)); + } + + @Test + public void testStringGetDimensionValue() + { + final SingleValueDimensionVectorSelector mock = Mockito.mock(SingleValueDimensionVectorSelector.class); + Mockito.when(mock.lookupName(2)).thenReturn("hello"); + + final SingleValueStringTopNVectorColumnSelector sel = new SingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory mem = WritableMemory.allocate(Integer.BYTES); + mem.putInt(0, 2); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals("hello", sel.getDimensionValue(ptr, 0)); + } + + @Test + public void testStringGetValueCardinality() + { + final SingleValueDimensionVectorSelector mock = Mockito.mock(SingleValueDimensionVectorSelector.class); + Mockito.when(mock.getValueCardinality()).thenReturn(42); + + final SingleValueStringTopNVectorColumnSelector sel = new SingleValueStringTopNVectorColumnSelector(mock); + Assertions.assertEquals(42, sel.getValueCardinality()); + } + + // -- DictionaryBuildingSingleValueStringTopNVectorColumnSelector -- + + @Test + public void testDictBuildingKeySizeIs4() + { + final DictionaryBuildingSingleValueStringTopNVectorColumnSelector sel = + new DictionaryBuildingSingleValueStringTopNVectorColumnSelector( + Mockito.mock(org.apache.druid.segment.vector.VectorObjectSelector.class) + ); + Assertions.assertEquals(Integer.BYTES, sel.getGroupingKeySize()); + } + + @Test + public void testDictBuildingWriteKeysBuildsDictionary() + { + final Object[] objects = {"foo", "bar", "foo", "baz"}; + final org.apache.druid.segment.vector.VectorObjectSelector mock = + Mockito.mock(org.apache.druid.segment.vector.VectorObjectSelector.class); + Mockito.when(mock.getObjectVector()).thenReturn(objects); + + final DictionaryBuildingSingleValueStringTopNVectorColumnSelector sel = + new DictionaryBuildingSingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory keySpace = WritableMemory.allocate(Integer.BYTES * 4); + + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, 4); + + // "foo" should get id 0, "bar" id 1, "baz" id 2 + final int fooId = keySpace.getInt(0); + final int barId = keySpace.getInt(Integer.BYTES); + final int baz = keySpace.getInt(Integer.BYTES * 3); + + Assertions.assertEquals(fooId, keySpace.getInt(Integer.BYTES * 2)); // second "foo" same id + Assertions.assertNotEquals(fooId, barId); + Assertions.assertNotEquals(fooId, baz); + Assertions.assertNotEquals(barId, baz); + } + + @Test + public void testDictBuildingGetDimensionValue() + { + final Object[] objects = {"alpha", "beta"}; + final org.apache.druid.segment.vector.VectorObjectSelector mock = + Mockito.mock(org.apache.druid.segment.vector.VectorObjectSelector.class); + Mockito.when(mock.getObjectVector()).thenReturn(objects); + + final DictionaryBuildingSingleValueStringTopNVectorColumnSelector sel = + new DictionaryBuildingSingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory keySpace = WritableMemory.allocate(Integer.BYTES * 2); + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, 2); + + // Look up the id that was assigned to "alpha" + final int alphaId = keySpace.getInt(0); + final WritableMemory mem = WritableMemory.allocate(Integer.BYTES); + mem.putInt(0, alphaId); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertEquals("alpha", sel.getDimensionValue(ptr, 0)); + } + + @Test + public void testDictBuildingResetClearsDictionary() + { + final Object[] objects = {"x"}; + final org.apache.druid.segment.vector.VectorObjectSelector mock = + Mockito.mock(org.apache.druid.segment.vector.VectorObjectSelector.class); + Mockito.when(mock.getObjectVector()).thenReturn(objects); + + final DictionaryBuildingSingleValueStringTopNVectorColumnSelector sel = + new DictionaryBuildingSingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory keySpace = WritableMemory.allocate(Integer.BYTES); + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, 1); + + sel.reset(); + + // After reset, writing the same value again should assign id 0 again (dictionary was cleared). + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, 1); + Assertions.assertEquals(0, keySpace.getInt(0)); + } + + @Test + public void testDictBuildingNullValueRoundTrip() + { + final Object[] objects = {null, "y"}; + final org.apache.druid.segment.vector.VectorObjectSelector mock = + Mockito.mock(org.apache.druid.segment.vector.VectorObjectSelector.class); + Mockito.when(mock.getObjectVector()).thenReturn(objects); + + final DictionaryBuildingSingleValueStringTopNVectorColumnSelector sel = + new DictionaryBuildingSingleValueStringTopNVectorColumnSelector(mock); + final WritableMemory keySpace = WritableMemory.allocate(Integer.BYTES * 2); + sel.writeKeys(keySpace, Integer.BYTES, 0, 0, 2); + + final int nullId = keySpace.getInt(0); + final WritableMemory mem = WritableMemory.allocate(Integer.BYTES); + mem.putInt(0, nullId); + + final MemoryPointer ptr = new MemoryPointer(); + ptr.set(mem, 0); + + Assertions.assertNull(sel.getDimensionValue(ptr, 0)); + } + + // -- helpers -- + + private VectorValueSelector mockLongSelector(final long[] values) + { + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getLongVector()).thenReturn(values); + Mockito.when(mock.getNullVector()).thenReturn(null); + return mock; + } + + private VectorValueSelector mockDoubleSelector(final double[] values) + { + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getDoubleVector()).thenReturn(values); + Mockito.when(mock.getNullVector()).thenReturn(null); + return mock; + } + + private VectorValueSelector mockFloatSelector(final float[] values) + { + final VectorValueSelector mock = Mockito.mock(VectorValueSelector.class); + Mockito.when(mock.getFloatVector()).thenReturn(values); + Mockito.when(mock.getNullVector()).thenReturn(null); + return mock; + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 2ca95310e1cc..1beea5684d95 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -3495,6 +3495,8 @@ public void testArrayAggAsArrayFromJoin() @Test public void testArrayAggGroupByArrayAggFromSubquery() { + cannotVectorize(); + final Map context = QueryContexts.override(QUERY_CONTEXT_NO_STRINGIFY_ARRAY, PlannerConfig.CTX_KEY_USE_LEXICOGRAPHIC_TOPN, true); testQuery( @@ -5002,6 +5004,8 @@ public void testUnnestWithGroupByOrderBy() @Test public void testUnnestWithGroupByOrderByWithLimit() { + cannotVectorize(); + final Map context = QueryContexts.override(QUERY_CONTEXT_UNNEST, PlannerConfig.CTX_KEY_USE_LEXICOGRAPHIC_TOPN, true); testQuery( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index f5ace8e84ebb..35187b4bc7c5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -5128,6 +5128,9 @@ public void testCountOnSemiJoinSingleColumn(Map queryContext) @ParameterizedTest(name = "{0}") public void testJoinOnRestrictedBroadcast(Map queryContext) { + // Join cursor does not support vectorization. + cannotVectorize(); + String sql = "SELECT druid.restrictedBroadcastDatasource_m1_is_6.dim4, COUNT(*)\n" + "FROM druid.numfoo\n" + "INNER JOIN druid.restrictedBroadcastDatasource_m1_is_6 ON numfoo.dim4 = restrictedBroadcastDatasource_m1_is_6.dim4\n" @@ -5188,6 +5191,9 @@ public void testJoinOnRestrictedBroadcast(Map queryContext) @DecoupledTestConfig(quidemReason = QuidemTestCaseReason.EQUIV_PLAN) public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map queryContext) { + // Join cursor does not support vectorization. + cannotVectorize(); + testQuery( "SELECT druid.broadcast.dim4, COUNT(*)\n" + "FROM druid.numfoo\n" @@ -5230,6 +5236,9 @@ public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map queryContext) { + // Join cursor does not support vectorization. + cannotVectorize(); + final Map contextWithLexicographicTopN = QueryContexts.override(queryContext, PlannerConfig.CTX_KEY_USE_LEXICOGRAPHIC_TOPN, true); testQuery(