Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
Expand All @@ -41,6 +42,8 @@
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.segment.ColumnSelectorFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -72,6 +75,17 @@
"Maximum number of spill files reached for this query. Try raising druid.query.groupBy.maxSpillFileCount."
);

/**
* Minimum number of serialized bytes that must accumulate across pending in-memory spill runs before they are
* flushed as a single file to disk. Aggregators like ThetaSketch pre-allocate a large fixed buffer per row
* (e.g. ~131KB for ThetaSketch(K=16384)), causing the in-memory grouper to flush frequently. However, when
* each key has been seen only a few times, the sketch serializes to just a handful of bytes in compact form.
* Without batching, this produces thousands of tiny spill files. By accumulating runs in heap memory first
* and writing to disk only once this threshold is reached, we avoid that explosion in file count without any
* extra disk I/O for small spills.
*/
private static final long MIN_SPILL_FILE_BYTES = 1024 * 1024L; // 1MB

private final AbstractBufferHashGrouper<KeyType> grouper;
private final KeySerde<KeyType> keySerde;
private final LimitedTemporaryStorage temporaryStorage;
Expand All @@ -85,6 +99,14 @@
private final List<File> dictionaryFiles = new ArrayList<>();
private final boolean sortHasNonGroupingFields;

// Pending spill runs not yet written to disk. Each entry is one buffer flush serialized as a
// LZ4-compressed JSON byte array — the same format as an on-disk spill file, so it can be
// re-read with the same read() path. Runs are held in heap memory and merged into a single
// sorted file only when pendingSpillBytes reaches MIN_SPILL_FILE_BYTES.
private final List<byte[]> pendingSpillRuns = new ArrayList<>();
private final Set<String> pendingDictionaryEntries = new HashSet<>();
private long pendingSpillBytes = 0;

private boolean diskFull = false;
private boolean maxFileCount = false;
private boolean spillingAllowed;
Expand Down Expand Up @@ -225,6 +247,9 @@
public void reset()
{
grouper.reset();
pendingSpillRuns.clear();
pendingSpillBytes = 0;
pendingDictionaryEntries.clear();
deleteFiles();
}

Expand All @@ -235,6 +260,9 @@
perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes());
grouper.close();
keySerde.reset();
pendingSpillRuns.clear();
pendingSpillBytes = 0;
pendingDictionaryEntries.clear();
deleteFiles();
}

Expand Down Expand Up @@ -293,6 +321,14 @@
@Override
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
// Flush any runs that did not reach MIN_SPILL_FILE_BYTES during the spill phase.
try {
flushPendingRunsToDisk();
}
catch (IOException e) {
throw new RuntimeException(e);
}

final List<CloseableIterator<Entry<KeyType>>> iterators = new ArrayList<>(1 + files.size());

iterators.add(grouper.iterator(sorted));
Expand Down Expand Up @@ -345,12 +381,173 @@

private void spill() throws IOException
{
// Serialize the buffer flush to heap memory rather than directly to disk. When the pre-allocated
// in-memory size per row (e.g. ~131KB for ThetaSketch) is much larger than the compact serialized
// size (e.g. ~24 bytes when each key has been seen only a few times), each flush produces a tiny
// byte array. Batching these in memory until MIN_SPILL_FILE_BYTES is reached avoids creating
// thousands of tiny disk files, which would each require a live file handle during the merge phase.
final byte[] runBytes;
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
files.add(spill(iterator));
dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
runBytes = serializeToBytes(iterator);
}
pendingSpillRuns.add(runBytes);
pendingSpillBytes += runBytes.length;
pendingDictionaryEntries.addAll(keySerde.getDictionary());
grouper.reset();

if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) {
flushPendingRunsToDisk();
}
}

/**
* Merge-sorts all pending in-memory spill runs and writes them as a single sorted file to disk.
* Reading from heap memory (ByteArrayInputStream) is orders of magnitude faster than reading
* from disk, so the merge cost here is negligible even for many accumulated runs.
*/
private void flushPendingRunsToDisk() throws IOException
{
if (pendingSpillRuns.isEmpty()) {
return;
}

final Comparator<Entry<KeyType>> sortComparator =
sortHasNonGroupingFields ? defaultOrderKeyObjComparator : keyObjComparator;

final List<MappingIterator<Entry<KeyType>>> readers = new ArrayList<>(pendingSpillRuns.size());

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
LZ4BlockInputStream.LZ4BlockInputStream
should be avoided because it has been deprecated.
try {
for (final byte[] runBytes : pendingSpillRuns) {
readers.add(spillMapper.readValues(
spillMapper.getFactory().createParser(new LZ4BlockInputStream(new ByteArrayInputStream(runBytes))),
spillMapper.getTypeFactory().constructParametricType(ReusableEntry.class, keySerde.keyClazz())
));
}
final List<CloseableIterator<Entry<KeyType>>> iterators = new ArrayList<>(readers.size());
for (final MappingIterator<Entry<KeyType>> reader : readers) {
iterators.add(
CloseableIterators.withEmptyBaggage(
Iterators.transform(
reader,
new Function<>()
{
final ReusableEntry<KeyType> reusableEntry =
ReusableEntry.create(keySerde, aggregatorFactories.length);

@Override
public Entry<KeyType> apply(Entry<KeyType> entry)
{
final Object[] deserializedValues = reusableEntry.getValues();
for (int i = 0; i < deserializedValues.length; i++) {
deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
if (deserializedValues[i] instanceof Integer) {
deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
}
}
reusableEntry.setKey(entry.getKey());
return reusableEntry;
}
}
)
)
);
}
final Iterator<Entry<KeyType>> merged = combineByKey(
CloseableIterators.mergeSorted(iterators, sortComparator),
defaultOrderKeyObjComparator
);
files.add(spill(merged));
dictionaryFiles.add(spill(pendingDictionaryEntries.iterator()));
}
finally {
for (final MappingIterator<Entry<KeyType>> reader : readers) {
try {
reader.close();
}
catch (IOException e) {
log.warn(e, "Failed to close reader while flushing pending spill runs");
}
}
pendingSpillRuns.clear();
pendingSpillBytes = 0;
pendingDictionaryEntries.clear();
}
}

/**
* Wraps a sorted iterator, combining consecutive entries with the same key by folding their
* aggregator values via {@link AggregatorFactory#combine}. Only one combined entry is held in
* memory at a time, so this is safe regardless of group count.
*/
private Iterator<Entry<KeyType>> combineByKey(
final Iterator<Entry<KeyType>> sortedIterator,
final Comparator<Entry<KeyType>> comparator
)
{
return new AbstractIterator<>()
{
private final ReusableEntry<KeyType> current = ReusableEntry.create(keySerde, aggregatorFactories.length);
private boolean hasCurrent = false;

{
if (sortedIterator.hasNext()) {
copyEntry(sortedIterator.next(), current);
hasCurrent = true;
}
}

@Override
protected Entry<KeyType> computeNext()
{
if (!hasCurrent) {
return endOfData();
}

final ReusableEntry<KeyType> combined = ReusableEntry.create(keySerde, aggregatorFactories.length);
combined.setKey(current.getKey());
final Object[] combinedValues = combined.getValues();
System.arraycopy(current.getValues(), 0, combinedValues, 0, combinedValues.length);

while (sortedIterator.hasNext()) {
final Entry<KeyType> next = sortedIterator.next();
if (comparator.compare(combined, next) != 0) {
copyEntry(next, current);
return combined;
}
for (int i = 0; i < combinedValues.length; i++) {
combinedValues[i] = aggregatorFactories[i].combine(combinedValues[i], next.getValues()[i]);
}
}

grouper.reset();
hasCurrent = false;
return combined;
}

private void copyEntry(Entry<KeyType> from, ReusableEntry<KeyType> to)
{
to.setKey(from.getKey());
System.arraycopy(from.getValues(), 0, to.getValues(), 0, aggregatorFactories.length);
}
};
}

/**
* Serializes the iterator's entries to a compressed in-memory byte array using the same LZ4+JSON
* format as on-disk spill files, so that the bytes can later be re-read via the same read() path.
*/
private byte[] serializeToBytes(final Iterator<Entry<KeyType>> iterator) throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (
final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(baos);
final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut)
) {
final SerializerProvider serializers = spillMapper.getSerializerProviderInstance();
while (iterator.hasNext()) {
BaseQuery.checkInterrupted();
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, iterator.next());
}
}
return baos.toByteArray();
}

private <T> File spill(Iterator<T> iterator) throws IOException
Expand Down
Loading