diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 904a7ef88646..67665c5a5a6d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; @@ -41,6 +42,8 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.segment.ColumnSelectorFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -72,6 +75,17 @@ public class SpillingGrouper implements Grouper "Maximum number of spill files reached for this query. Try raising druid.query.groupBy.maxSpillFileCount." ); + /** + * Minimum number of serialized bytes that must accumulate across pending in-memory spill runs before they are + * flushed as a single file to disk. Aggregators like ThetaSketch pre-allocate a large fixed buffer per row + * (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to flush frequently. However, when + * each key has been seen only a few times, the sketch serializes to just a handful of bytes in compact form. + * Without batching, this produces thousands of tiny spill files. By accumulating runs in heap memory first + * and writing to disk only once this threshold is reached, we avoid that explosion in file count without any + * extra disk I/O for small spills. + */ + private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB + private final AbstractBufferHashGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; @@ -85,6 +99,14 @@ public class SpillingGrouper implements Grouper private final List dictionaryFiles = new ArrayList<>(); private final boolean sortHasNonGroupingFields; + // Pending spill runs not yet written to disk. Each entry is one buffer flush serialized as a + // LZ4-compressed JSON byte array — the same format as an on-disk spill file, so it can be + // re-read with the same read() path. Runs are held in heap memory and merged into a single + // sorted file only when pendingSpillBytes reaches MIN_SPILL_FILE_BYTES. + private final List pendingSpillRuns = new ArrayList<>(); + private final Set pendingDictionaryEntries = new HashSet<>(); + private long pendingSpillBytes = 0; + private boolean diskFull = false; private boolean maxFileCount = false; private boolean spillingAllowed; @@ -225,6 +247,9 @@ public AggregateResult aggregate(KeyType key, int keyHash) public void reset() { grouper.reset(); + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); deleteFiles(); } @@ -235,6 +260,9 @@ public void close() perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes()); grouper.close(); keySerde.reset(); + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); deleteFiles(); } @@ -293,6 +321,14 @@ public void setSpillingAllowed(final boolean spillingAllowed) @Override public CloseableIterator> iterator(final boolean sorted) { + // Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase. + try { + flushPendingRunsToDisk(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + final List>> iterators = new ArrayList<>(1 + files.size()); iterators.add(grouper.iterator(sorted)); @@ -345,12 +381,173 @@ public Entry apply(Entry entry) private void spill() throws IOException { + // Serialize the buffer flush to heap memory rather than directly to disk. When the pre-allocated + // in-memory size per row (e.g. ~131KB for ThetaSketch) is much larger than the compact serialized + // size (e.g. ~24 bytes when each key has been seen only a few times), each flush produces a tiny + // byte array. Batching these in memory until MIN_SPILL_FILE_BYTES is reached avoids creating + // thousands of tiny disk files, which would each require a live file handle during the merge phase. + final byte[] runBytes; try (CloseableIterator> iterator = grouper.iterator(true)) { - files.add(spill(iterator)); - dictionaryFiles.add(spill(keySerde.getDictionary().iterator())); + runBytes = serializeToBytes(iterator); + } + pendingSpillRuns.add(runBytes); + pendingSpillBytes += runBytes.length; + pendingDictionaryEntries.addAll(keySerde.getDictionary()); + grouper.reset(); + + if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) { + flushPendingRunsToDisk(); + } + } + + /** + * Merge-sorts all pending in-memory spill runs and writes them as a single sorted file to disk. + * Reading from heap memory (ByteArrayInputStream) is orders of magnitude faster than reading + * from disk, so the merge cost here is negligible even for many accumulated runs. + */ + private void flushPendingRunsToDisk() throws IOException + { + if (pendingSpillRuns.isEmpty()) { + return; + } + + final Comparator> sortComparator = + sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator; + + final List>> readers = new ArrayList<>(pendingSpillRuns.size()); + try { + for (final byte[] runBytes : pendingSpillRuns) { + readers.add(spillMapper.readValues( + spillMapper.getFactory().createParser(new LZ4BlockInputStream(new ByteArrayInputStream(runBytes))), + spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keySerde.keyClazz()) + )); + } + final List>> iterators = new ArrayList<>(readers.size()); + for (final MappingIterator> reader : readers) { + iterators.add( + CloseableIterators.withEmptyBaggage( + Iterators.transform( + reader, + new Function<>() + { + final ReusableEntry reusableEntry = + ReusableEntry.create(keySerde, aggregatorFactories.length); + + @Override + public Entry apply(Entry entry) + { + final Object[] deserializedValues = reusableEntry.getValues(); + for (int i = 0; i < deserializedValues.length; i++) { + deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]); + if (deserializedValues[i] instanceof Integer) { + deserializedValues[i] = ((Integer) deserializedValues[i]).longValue(); + } + } + reusableEntry.setKey(entry.getKey()); + return reusableEntry; + } + } + ) + ) + ); + } + final Iterator> merged = combineByKey( + CloseableIterators.mergeSorted(iterators, sortComparator), + defaultOrderKeyObjComparator + ); + files.add(spill(merged)); + dictionaryFiles.add(spill(pendingDictionaryEntries.iterator())); + } + finally { + for (final MappingIterator> reader : readers) { + try { + reader.close(); + } + catch (IOException e) { + log.warn(e, "Failed to close reader while flushing pending spill runs"); + } + } + pendingSpillRuns.clear(); + pendingSpillBytes = 0; + pendingDictionaryEntries.clear(); + } + } + + /** + * Wraps a sorted iterator, combining consecutive entries with the same key by folding their + * aggregator values via {@link AggregatorFactory#combine}. Only one combined entry is held in + * memory at a time, so this is safe regardless of group count. + */ + private Iterator> combineByKey( + final Iterator> sortedIterator, + final Comparator> comparator + ) + { + return new AbstractIterator<>() + { + private final ReusableEntry current = ReusableEntry.create(keySerde, aggregatorFactories.length); + private boolean hasCurrent = false; + + { + if (sortedIterator.hasNext()) { + copyEntry(sortedIterator.next(), current); + hasCurrent = true; + } + } + + @Override + protected Entry computeNext() + { + if (!hasCurrent) { + return endOfData(); + } + + final ReusableEntry combined = ReusableEntry.create(keySerde, aggregatorFactories.length); + combined.setKey(current.getKey()); + final Object[] combinedValues = combined.getValues(); + System.arraycopy(current.getValues(), 0, combinedValues, 0, combinedValues.length); + + while (sortedIterator.hasNext()) { + final Entry next = sortedIterator.next(); + if (comparator.compare(combined, next) != 0) { + copyEntry(next, current); + return combined; + } + for (int i = 0; i < combinedValues.length; i++) { + combinedValues[i] = aggregatorFactories[i].combine(combinedValues[i], next.getValues()[i]); + } + } - grouper.reset(); + hasCurrent = false; + return combined; + } + + private void copyEntry(Entry from, ReusableEntry to) + { + to.setKey(from.getKey()); + System.arraycopy(from.getValues(), 0, to.getValues(), 0, aggregatorFactories.length); + } + }; + } + + /** + * Serializes the iterator's entries to a compressed in-memory byte array using the same LZ4+JSON + * format as on-disk spill files, so that the bytes can later be re-read via the same read() path. + */ + private byte[] serializeToBytes(final Iterator> iterator) throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try ( + final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(baos); + final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut) + ) { + final SerializerProvider serializers = spillMapper.getSerializerProviderInstance(); + while (iterator.hasNext()) { + BaseQuery.checkInterrupted(); + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, iterator.next()); + } } + return baos.toByteArray(); } private File spill(Iterator iterator) throws IOException