From 0b6d2d6967975072750ff1c43c546815ec6f23c9 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 17:23:10 -0700 Subject: [PATCH 1/6] optimizes SpillingGrouper --- .../epinephelinae/SpillingGrouper.java | 221 +++++++++++++++++- 1 file changed, 215 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 904a7ef88646..5926b1dc4bed 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -41,6 +41,8 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.segment.ColumnSelectorFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -72,6 +74,26 @@ public class SpillingGrouper implements Grouper "Maximum number of spill files reached for this query. Try raising druid.query.groupBy.maxSpillFileCount." ); + /** + * Minimum number of serialized bytes that must accumulate across pending in-memory spill runs before they are + * flushed as a single file to disk. Aggregators like ThetaSketch pre-allocate a large fixed buffer per row + * (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to flush frequently. However, when + * each key has been seen only a few times, the sketch serializes to just a handful of bytes in compact form. + * Without batching, this produces thousands of tiny spill files. By accumulating runs in heap memory first + * and writing to disk only once this threshold is reached, we avoid that explosion in file count without any + * extra disk I/O for small spills. + */ + private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB + + /** + * Maximum number of spill files to open simultaneously during the merge in {@link #iterator}. When the number + * of spill files exceeds this value, cascading merge passes are performed — each pass opens at most + * MERGE_FAN_IN files at once, merges them into a single intermediate file, and closes all readers before + * opening the next batch. This bounds the number of simultaneously live file-handle objects + * (LZ4BlockInputStream, SmileParser, MappingIterator, etc.) regardless of total spill file count. + */ + private static final int MERGE_FAN_IN = 32; + private final AbstractBufferHashGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; @@ -85,6 +107,14 @@ public class SpillingGrouper implements Grouper private final List dictionaryFiles = new ArrayList<>(); private final boolean sortHasNonGroupingFields; + // Pending spill runs not yet written to disk. Each entry is one buffer flush serialized as a + // LZ4-compressed JSON byte array — the same format as an on-disk spill file, so it can be + // re-read with the same read() path. Runs are held in heap memory and merged into a single + // sorted file only when pendingSpillBytes reaches MIN_SPILL_FILE_BYTES. + private final List pendingSpillRuns = new ArrayList<>(); + private final Set pendingDictionaryEntries = new HashSet<>(); + private long pendingSpillBytes = 0; + private boolean diskFull = false; private boolean maxFileCount = false; private boolean spillingAllowed; @@ -225,6 +255,9 @@ public AggregateResult aggregate(KeyType key, int keyHash) public void reset() { grouper.reset(); + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); deleteFiles(); } @@ -235,6 +268,9 @@ public void close() perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes()); grouper.close(); keySerde.reset(); + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); deleteFiles(); } @@ -293,12 +329,45 @@ public void setSpillingAllowed(final boolean spillingAllowed) @Override public CloseableIterator> iterator(final boolean sorted) { - final List>> iterators = new ArrayList<>(1 + files.size()); + final Closer closer = Closer.create(); + + // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. + try { + flushPendingRunsToDisk(); + } + catch (IOException e) { + try { + closer.close(); + } + catch (IOException ce) { + e.addSuppressed(ce); + } + throw new RuntimeException(e); + } + + // If there are more spill files than MERGE_FAN_IN, reduce them to at most MERGE_FAN_IN via cascading + // merge passes before opening the final set of iterators. Each pass opens at most MERGE_FAN_IN files + // simultaneously, merges them into one intermediate file, then closes all readers before moving on. + // Intermediate files are registered with closer for deletion when the caller finishes with the iterator. + final List filesToMerge; + try { + filesToMerge = reduceToFanIn(files, sorted, closer); + } + catch (IOException e) { + try { + closer.close(); + } + catch (IOException ce) { + e.addSuppressed(ce); + } + throw new RuntimeException(e); + } + + final List>> iterators = new ArrayList<>(1 + filesToMerge.size()); iterators.add(grouper.iterator(sorted)); - final Closer closer = Closer.create(); - for (final File file : files) { + for (final File file : filesToMerge) { final MappingIterator> fileIterator = read(file, keySerde.keyClazz()); iterators.add( @@ -343,14 +412,154 @@ public Entry apply(Entry entry) return CloseableIterators.wrap(baseIterator, closer); } + /** + * Reduces {@code inputFiles} to at most {@link #MERGE_FAN_IN} files by performing as many cascading merge + * passes as needed. Each pass groups the current file list into batches of MERGE_FAN_IN, merges each batch + * into a single file (opening at most MERGE_FAN_IN file handles at once), and replaces the batch with that + * one file. Intermediate files are registered with {@code closer} for deletion when the outer iterator closes. + */ + private List reduceToFanIn(final List inputFiles, final boolean sorted, final Closer closer) + throws IOException + { + if (inputFiles.size() <= MERGE_FAN_IN) { + return inputFiles; + } + + final Comparator> comparator = + sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; + + List current = new ArrayList<>(inputFiles); + + while (current.size() > MERGE_FAN_IN) { + final List nextRound = new ArrayList<>(); + + for (int i = 0; i < current.size(); i += MERGE_FAN_IN) { + final List batch = current.subList(i, Math.min(i + MERGE_FAN_IN, current.size())); + + if (batch.size() == 1) { + nextRound.add(batch.get(0)); + continue; + } + + final List>> readers = new ArrayList<>(batch.size()); + final File mergedFile; + try { + for (final File file : batch) { + readers.add(read(file, keySerde.keyClazz())); + } + final List>> batchIterators = new ArrayList<>(readers.size()); + for (final MappingIterator> reader : readers) { + batchIterators.add(CloseableIterators.withEmptyBaggage(reader)); + } + final Iterator> merged = sorted || sortHasNonGroupingFields + ? CloseableIterators.mergeSorted(batchIterators, comparator) + : CloseableIterators.concat(batchIterators); + mergedFile = spill(merged); + } + finally { + for (final MappingIterator> reader : readers) { + try { + reader.close(); + } + catch (IOException e) { + log.warn(e, "Failed to close reader during fan-in reduction"); + } + } + } + + nextRound.add(mergedFile); + closer.register(() -> temporaryStorage.delete(mergedFile)); + } + + current = nextRound; + } + + return current; + } + private void spill() throws IOException { + // Serialize the buffer flush to heap memory rather than directly to disk. When the pre-allocated + // in-memory size per row (e.g. ~131KB for ThetaSketch) is much larger than the compact serialized + // size (e.g. ~24 bytes when each key has been seen only a few times), each flush produces a tiny + // byte array. Batching these in memory until MIN_SPILL_FILE_BYTES is reached avoids creating + // thousands of tiny disk files, which would each require a live file handle during the merge phase. + final byte[] runBytes; try (CloseableIterator> iterator = grouper.iterator(true)) { - files.add(spill(iterator)); - dictionaryFiles.add(spill(keySerde.getDictionary().iterator())); + runBytes = serializeToBytes(iterator); + } + pendingSpillRuns.add(runBytes); + pendingSpillBytes += runBytes.length; + pendingDictionaryEntries.addAll(keySerde.getDictionary()); + grouper.reset(); + + if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) { + flushPendingRunsToDisk(); + } + } + + /** + * Merge-sorts all pending in-memory spill runs and writes them as a single sorted file to disk. + * Reading from heap memory (ByteArrayInputStream) is orders of magnitude faster than reading + * from disk, so the merge cost here is negligible even for many accumulated runs. + */ + private void flushPendingRunsToDisk() throws IOException + { + if (pendingSpillRuns.isEmpty()) { + return; + } + + final Comparator> comparator = + sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; + + final List>> readers = new ArrayList<>(pendingSpillRuns.size()); + try { + for (final byte[] runBytes : pendingSpillRuns) { + readers.add(spillMapper.readValues( + spillMapper.getFactory().createParser(new LZ4BlockInputStream(new ByteArrayInputStream(runBytes))), + spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keySerde.keyClazz()) + )); + } + final List>> iterators = new ArrayList<>(readers.size()); + for (final MappingIterator> reader : readers) { + iterators.add(CloseableIterators.withEmptyBaggage(reader)); + } + files.add(spill(CloseableIterators.mergeSorted(iterators, comparator))); + dictionaryFiles.add(spill(pendingDictionaryEntries.iterator())); + } + finally { + for (final MappingIterator> reader : readers) { + try { + reader.close(); + } + catch (IOException e) { + log.warn(e, "Failed to close reader while flushing pending spill runs"); + } + } + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); + } + } - grouper.reset(); + /** + * Serializes the iterator's entries to a compressed in-memory byte array using the same LZ4+JSON + * format as on-disk spill files, so that the bytes can later be re-read via the same read() path. + */ + private byte[] serializeToBytes(final Iterator> iterator) throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try ( + final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(baos); + final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut) + ) { + final SerializerProvider serializers = spillMapper.getSerializerProviderInstance(); + while (iterator.hasNext()) { + BaseQuery.checkInterrupted(); + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, iterator.next()); + } } + return baos.toByteArray(); } private File spill(Iterator iterator) throws IOException From ce5f94ff76ef3849e8c7ae08e064eb0d43ea94be Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 17:29:06 -0700 Subject: [PATCH 2/6] optimizes SpillingGrouper --- .../epinephelinae/SpillingGrouper.java | 105 +----------------- 1 file changed, 3 insertions(+), 102 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 5926b1dc4bed..1b5ab61c43b8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -85,15 +85,6 @@ public class SpillingGrouper implements Grouper */ private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB - /** - * Maximum number of spill files to open simultaneously during the merge in {@link #iterator}. When the number - * of spill files exceeds this value, cascading merge passes are performed — each pass opens at most - * MERGE_FAN_IN files at once, merges them into a single intermediate file, and closes all readers before - * opening the next batch. This bounds the number of simultaneously live file-handle objects - * (LZ4BlockInputStream, SmileParser, MappingIterator, etc.) regardless of total spill file count. - */ - private static final int MERGE_FAN_IN = 32; - private final AbstractBufferHashGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; @@ -329,45 +320,20 @@ public void setSpillingAllowed(final boolean spillingAllowed) @Override public CloseableIterator> iterator(final boolean sorted) { - final Closer closer = Closer.create(); - // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. try { flushPendingRunsToDisk(); } catch (IOException e) { - try { - closer.close(); - } - catch (IOException ce) { - e.addSuppressed(ce); - } throw new RuntimeException(e); } - // If there are more spill files than MERGE_FAN_IN, reduce them to at most MERGE_FAN_IN via cascading - // merge passes before opening the final set of iterators. Each pass opens at most MERGE_FAN_IN files - // simultaneously, merges them into one intermediate file, then closes all readers before moving on. - // Intermediate files are registered with closer for deletion when the caller finishes with the iterator. - final List filesToMerge; - try { - filesToMerge = reduceToFanIn(files, sorted, closer); - } - catch (IOException e) { - try { - closer.close(); - } - catch (IOException ce) { - e.addSuppressed(ce); - } - throw new RuntimeException(e); - } - - final List>> iterators = new ArrayList<>(1 + filesToMerge.size()); + final List>> iterators = new ArrayList<>(1 + files.size()); iterators.add(grouper.iterator(sorted)); - for (final File file : filesToMerge) { + final Closer closer = Closer.create(); + for (final File file : files) { final MappingIterator> fileIterator = read(file, keySerde.keyClazz()); iterators.add( @@ -412,71 +378,6 @@ public Entry apply(Entry entry) return CloseableIterators.wrap(baseIterator, closer); } - /** - * Reduces {@code inputFiles} to at most {@link #MERGE_FAN_IN} files by performing as many cascading merge - * passes as needed. Each pass groups the current file list into batches of MERGE_FAN_IN, merges each batch - * into a single file (opening at most MERGE_FAN_IN file handles at once), and replaces the batch with that - * one file. Intermediate files are registered with {@code closer} for deletion when the outer iterator closes. - */ - private List reduceToFanIn(final List inputFiles, final boolean sorted, final Closer closer) - throws IOException - { - if (inputFiles.size() <= MERGE_FAN_IN) { - return inputFiles; - } - - final Comparator> comparator = - sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; - - List current = new ArrayList<>(inputFiles); - - while (current.size() > MERGE_FAN_IN) { - final List nextRound = new ArrayList<>(); - - for (int i = 0; i < current.size(); i += MERGE_FAN_IN) { - final List batch = current.subList(i, Math.min(i + MERGE_FAN_IN, current.size())); - - if (batch.size() == 1) { - nextRound.add(batch.get(0)); - continue; - } - - final List>> readers = new ArrayList<>(batch.size()); - final File mergedFile; - try { - for (final File file : batch) { - readers.add(read(file, keySerde.keyClazz())); - } - final List>> batchIterators = new ArrayList<>(readers.size()); - for (final MappingIterator> reader : readers) { - batchIterators.add(CloseableIterators.withEmptyBaggage(reader)); - } - final Iterator> merged = sorted || sortHasNonGroupingFields - ? CloseableIterators.mergeSorted(batchIterators, comparator) - : CloseableIterators.concat(batchIterators); - mergedFile = spill(merged); - } - finally { - for (final MappingIterator> reader : readers) { - try { - reader.close(); - } - catch (IOException e) { - log.warn(e, "Failed to close reader during fan-in reduction"); - } - } - } - - nextRound.add(mergedFile); - closer.register(() -> temporaryStorage.delete(mergedFile)); - } - - current = nextRound; - } - - return current; - } - private void spill() throws IOException { // Serialize the buffer flush to heap memory rather than directly to disk. When the pre-allocated From d9142574f7e340edef2df08c17e1c1dd3ee7030a Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 17:51:40 -0700 Subject: [PATCH 3/6] optimizes SpillingGrouper --- .../epinephelinae/SpillingGrouper.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 1b5ab61c43b8..23b5723dec39 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -322,7 +322,7 @@ public CloseableIterator> iterator(final boolean sorted) { // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. try { - flushPendingRunsToDisk(); + flushPendingRunsToDisk(sorted); } catch (IOException e) { throw new RuntimeException(e); @@ -395,7 +395,7 @@ private void spill() throws IOException grouper.reset(); if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) { - flushPendingRunsToDisk(); + flushPendingRunsToDisk(true); } } @@ -404,15 +404,12 @@ private void spill() throws IOException * Reading from heap memory (ByteArrayInputStream) is orders of magnitude faster than reading * from disk, so the merge cost here is negligible even for many accumulated runs. */ - private void flushPendingRunsToDisk() throws IOException + private void flushPendingRunsToDisk(final boolean sorted) throws IOException { if (pendingSpillRuns.isEmpty()) { return; } - final Comparator> comparator = - sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; - final List>> readers = new ArrayList<>(pendingSpillRuns.size()); try { for (final byte[] runBytes : pendingSpillRuns) { @@ -421,11 +418,49 @@ private void flushPendingRunsToDisk() throws IOException spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keySerde.keyClazz()) )); } + final boolean needsMergeSort = sorted || sortHasNonGroupingFields; final List>> iterators = new ArrayList<>(readers.size()); for (final MappingIterator> reader : readers) { - iterators.add(CloseableIterators.withEmptyBaggage(reader)); + if (needsMergeSort) { + iterators.add( + CloseableIterators.withEmptyBaggage( + Iterators.transform( + reader, + new Function<>() + { + final ReusableEntry reusableEntry = + ReusableEntry.create(keySerde, aggregatorFactories.length); + + @Override + public Entry apply(Entry entry) + { + final Object[] deserializedValues = reusableEntry.getValues(); + for (int i = 0; i < deserializedValues.length; i++) { + deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]); + if (deserializedValues[i] instanceof Integer) { + deserializedValues[i] = ((Integer) deserializedValues[i]).longValue(); + } + } + reusableEntry.setKey(entry.getKey()); + return reusableEntry; + } + } + ) + ) + ); + } else { + iterators.add(CloseableIterators.withEmptyBaggage(reader)); + } + } + final Iterator> merged; + if (sortHasNonGroupingFields) { + merged = CloseableIterators.mergeSorted(iterators, defaultOrderKeyObjComparator); + } else if (sorted) { + merged = CloseableIterators.mergeSorted(iterators, keyObjComparator); + } else { + merged = CloseableIterators.concat(iterators); } - files.add(spill(CloseableIterators.mergeSorted(iterators, comparator))); + files.add(spill(merged)); dictionaryFiles.add(spill(pendingDictionaryEntries.iterator())); } finally { From cbe362921571a1b4737496e8cee516197422c851 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 22:48:13 -0700 Subject: [PATCH 4/6] optimizes SpillingGrouper --- .../epinephelinae/SpillingGrouper.java | 114 ++++++++++++------ 1 file changed, 76 insertions(+), 38 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 23b5723dec39..729e6bf192f7 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializerProvider; +import com.google.common.collect.AbstractIterator; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -322,7 +323,7 @@ public CloseableIterator> iterator(final boolean sorted) { // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. try { - flushPendingRunsToDisk(sorted); + flushPendingRunsToDisk(); } catch (IOException e) { throw new RuntimeException(e); @@ -395,7 +396,7 @@ private void spill() throws IOException grouper.reset(); if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) { - flushPendingRunsToDisk(true); + flushPendingRunsToDisk(); } } @@ -404,12 +405,15 @@ private void spill() throws IOException * Reading from heap memory (ByteArrayInputStream) is orders of magnitude faster than reading * from disk, so the merge cost here is negligible even for many accumulated runs. */ - private void flushPendingRunsToDisk(final boolean sorted) throws IOException + private void flushPendingRunsToDisk() throws IOException { if (pendingSpillRuns.isEmpty()) { return; } + final Comparator> sortComparator = + sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; + final List>> readers = new ArrayList<>(pendingSpillRuns.size()); try { for (final byte[] runBytes : pendingSpillRuns) { @@ -418,48 +422,39 @@ private void flushPendingRunsToDisk(final boolean sorted) throws IOException spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keySerde.keyClazz()) )); } - final boolean needsMergeSort = sorted || sortHasNonGroupingFields; final List>> iterators = new ArrayList<>(readers.size()); for (final MappingIterator> reader : readers) { - if (needsMergeSort) { - iterators.add( - CloseableIterators.withEmptyBaggage( - Iterators.transform( - reader, - new Function<>() + iterators.add( + CloseableIterators.withEmptyBaggage( + Iterators.transform( + reader, + new Function<>() + { + final ReusableEntry reusableEntry = + ReusableEntry.create(keySerde, aggregatorFactories.length); + + @Override + public Entry apply(Entry entry) { - final ReusableEntry reusableEntry = - ReusableEntry.create(keySerde, aggregatorFactories.length); - - @Override - public Entry apply(Entry entry) - { - final Object[] deserializedValues = reusableEntry.getValues(); - for (int i = 0; i < deserializedValues.length; i++) { - deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]); - if (deserializedValues[i] instanceof Integer) { - deserializedValues[i] = ((Integer) deserializedValues[i]).longValue(); - } + final Object[] deserializedValues = reusableEntry.getValues(); + for (int i = 0; i < deserializedValues.length; i++) { + deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]); + if (deserializedValues[i] instanceof Integer) { + deserializedValues[i] = ((Integer) deserializedValues[i]).longValue(); } - reusableEntry.setKey(entry.getKey()); - return reusableEntry; } + reusableEntry.setKey(entry.getKey()); + return reusableEntry; } - ) - ) - ); - } else { - iterators.add(CloseableIterators.withEmptyBaggage(reader)); - } - } - final Iterator> merged; - if (sortHasNonGroupingFields) { - merged = CloseableIterators.mergeSorted(iterators, defaultOrderKeyObjComparator); - } else if (sorted) { - merged = CloseableIterators.mergeSorted(iterators, keyObjComparator); - } else { - merged = CloseableIterators.concat(iterators); + } + ) + ) + ); } + final Iterator> merged = combineByKey( + CloseableIterators.mergeSorted(iterators, sortComparator), + defaultOrderKeyObjComparator + ); files.add(spill(merged)); dictionaryFiles.add(spill(pendingDictionaryEntries.iterator())); } @@ -478,6 +473,49 @@ public Entry apply(Entry entry) } } + /** + * Wraps a sorted iterator, combining consecutive entries with the same key by folding their + * aggregator values via {@link AggregatorFactory#combine}. Only one combined entry is held in + * memory at a time, so this is safe regardless of group count. + */ + private Iterator> combineByKey( + final Iterator> sortedIterator, + final Comparator> comparator + ) + { + return new AbstractIterator<>() + { + private Entry current = sortedIterator.hasNext() ? sortedIterator.next() : null; + + @Override + protected Entry computeNext() + { + if (current == null) { + return endOfData(); + } + + final ReusableEntry combined = ReusableEntry.create(keySerde, aggregatorFactories.length); + combined.setKey(current.getKey()); + final Object[] combinedValues = combined.getValues(); + System.arraycopy(current.getValues(), 0, combinedValues, 0, combinedValues.length); + + while (sortedIterator.hasNext()) { + final Entry next = sortedIterator.next(); + if (comparator.compare(current, next) != 0) { + current = next; + return combined; + } + for (int i = 0; i < combinedValues.length; i++) { + combinedValues[i] = aggregatorFactories[i].combine(combinedValues[i], next.getValues()[i]); + } + } + + current = null; + return combined; + } + }; + } + /** * Serializes the iterator's entries to a compressed in-memory byte array using the same LZ4+JSON * format as on-disk spill files, so that the bytes can later be re-read via the same read() path. From ebc8fad0838f52bdc29066b4dc8a6cca8d37484c Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 22:57:25 -0700 Subject: [PATCH 5/6] optimizes SpillingGrouper --- .../epinephelinae/SpillingGrouper.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 729e6bf192f7..c8695771da35 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -485,12 +485,20 @@ private Iterator> combineByKey( { return new AbstractIterator<>() { - private Entry current = sortedIterator.hasNext() ? sortedIterator.next() : null; + private final ReusableEntry current = ReusableEntry.create(keySerde, aggregatorFactories.length); + private boolean hasCurrent = false; + + { + if (sortedIterator.hasNext()) { + copyEntry(sortedIterator.next(), current); + hasCurrent = true; + } + } @Override protected Entry computeNext() { - if (current == null) { + if (!hasCurrent) { return endOfData(); } @@ -501,8 +509,8 @@ protected Entry computeNext() while (sortedIterator.hasNext()) { final Entry next = sortedIterator.next(); - if (comparator.compare(current, next) != 0) { - current = next; + if (comparator.compare(combined, next) != 0) { + copyEntry(next, current); return combined; } for (int i = 0; i < combinedValues.length; i++) { @@ -510,9 +518,15 @@ protected Entry computeNext() } } - current = null; + hasCurrent = false; return combined; } + + private void copyEntry(Entry from, ReusableEntry to) + { + to.setKey(from.getKey()); + System.arraycopy(from.getValues(), 0, to.getValues(), 0, aggregatorFactories.length); + } }; } From 35586f00b10d8a6799b126313de4e289c92be3da Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 20 Apr 2026 22:59:54 -0700 Subject: [PATCH 6/6] optimizes SpillingGrouper --- .../druid/query/groupby/epinephelinae/SpillingGrouper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index c8695771da35..67665c5a5a6d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializerProvider; -import com.google.common.collect.AbstractIterator; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream;