From 36c0a6393426cf74d1a5bd92210978ce7a82fa5f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Apr 2026 12:47:09 -0700 Subject: [PATCH 1/2] feat: rework SegmentFileBuilderV10 to organize internal containers by projection to be friendly to partial loading changes: * adds new method `SegmentFileBuilder.startFileGroup(@Nullable String)` which is a hint to the file builder about how to group the files a caller is adding * `SegmentFileBuilderV10` replaced the internal `FileSmoosher` with a purpose-built container writer that rolls on group changes or when a file won't fit, organizing containers by the new file group concept * `IndexMergerV10`/`IndexMergerBase` use the new file group method to organize v10 segments by projection * removed stuff from `FileSmoosher` that was only needed by v10 builder delegating to it * `AggregateProjectionSpec` validates that the projection cannot be named `__base` --- .../input/impl/AggregateProjectionSpec.java | 4 + .../util/common/io/smoosh/FileSmoosher.java | 97 +--- .../apache/druid/segment/IndexMergerBase.java | 1 + .../apache/druid/segment/IndexMergerV10.java | 1 + .../segment/file/SegmentFileBuilder.java | 18 + .../segment/file/SegmentFileBuilderV10.java | 458 +++++++++++++++++- .../file/SegmentFileBuilderV10Test.java | 317 ++++++++++++ 7 files changed, 796 insertions(+), 100 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java index 778a6ae734f2..60031a6fb259 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java @@ -39,6 +39,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.projections.AggregateProjectionSchema; +import org.apache.druid.segment.projections.Projections; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTimeZone; @@ -102,6 +103,9 @@ public AggregateProjectionSpec( if (name == null || name.isEmpty()) { throw InvalidInput.exception("projection name cannot be null or empty"); } + if (Projections.BASE_TABLE_PROJECTION_NAME.equals(name)) { + throw InvalidInput.exception("projection cannot use reserved name[%s]", Projections.BASE_TABLE_PROJECTION_NAME); + } this.name = name; if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) { throw InvalidInput.exception( diff --git a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index dd3842f10116..f6166e65648c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -35,11 +35,8 @@ import org.apache.druid.segment.column.ColumnDescriptor; import org.apache.druid.segment.file.SegmentFileBuilder; import org.apache.druid.segment.file.SegmentFileChannel; -import org.apache.druid.segment.file.SegmentFileContainerMetadata; -import org.apache.druid.segment.file.SegmentInternalFileMetadata; import org.apache.druid.utils.CloseableUtils; -import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; @@ -99,35 +96,20 @@ public class FileSmoosher implements SegmentFileBuilder private Outer currOut = null; private boolean writerCurrentlyInUse = false; - // helper for SegmentFileBuilderV10 to have control over naming of smoosh output files; if this is non-null - // meta.smoosh is not written - @Nullable - private final String outputFileName; - public FileSmoosher( File baseDir ) { - this(baseDir, Integer.MAX_VALUE, null); + this(baseDir, Integer.MAX_VALUE); } public FileSmoosher( File baseDir, int maxChunkSize ) - { - this(baseDir, maxChunkSize, null); - } - - public FileSmoosher( - File baseDir, - int maxChunkSize, - @Nullable String outputFileName - ) { this.baseDir = baseDir; this.maxChunkSize = maxChunkSize; - this.outputFileName = outputFileName; this.delegateFileNameMap = new HashMap<>(); Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); @@ -143,43 +125,6 @@ static File makeChunkFile(File baseDir, int i) return new File(baseDir, StringUtils.format("%05d.%s", i, FILE_EXTENSION)); } - static File makeChunkFile(File baseDir, String prefix, int i) - { - return new File(baseDir, StringUtils.format("%s-%05d.%s", prefix, i, FILE_EXTENSION)); - } - - public List getOutFiles() - { - return outFiles; - } - - public List getContainers() - { - List smooshContainers = new ArrayList<>(); - long offset = 0; - for (File f : outFiles) { - smooshContainers.add(new SegmentFileContainerMetadata(offset, f.length())); - offset += f.length(); - } - return smooshContainers; - } - - public Map getInternalFiles() - { - Map smooshFileMetadata = new TreeMap<>(); - for (Map.Entry entry : internalFiles.entrySet()) { - smooshFileMetadata.put( - entry.getKey(), - new SegmentInternalFileMetadata( - entry.getValue().getFileNum(), - entry.getValue().getStartOffset(), - entry.getValue().getEndOffset() - entry.getValue().getStartOffset() - ) - ); - } - return smooshFileMetadata; - } - @Override public void addColumn(String name, ColumnDescriptor columnDescriptor) { @@ -456,26 +401,24 @@ public void close() throws IOException currOut.close(); } - if (outputFileName == null) { - File metaFile = metaFile(baseDir); - - try (Writer out = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), StandardCharsets.UTF_8))) { - out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size())); + File metaFile = metaFile(baseDir); + + try (Writer out = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), StandardCharsets.UTF_8))) { + out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size())); + out.write("\n"); + + for (Map.Entry entry : internalFiles.entrySet()) { + final Metadata metadata = entry.getValue(); + out.write( + JOINER.join( + entry.getKey(), + metadata.getFileNum(), + metadata.getStartOffset(), + metadata.getEndOffset() + ) + ); out.write("\n"); - - for (Map.Entry entry : internalFiles.entrySet()) { - final Metadata metadata = entry.getValue(); - out.write( - JOINER.join( - entry.getKey(), - metadata.getFileNum(), - metadata.getStartOffset(), - metadata.getEndOffset() - ) - ); - out.write("\n"); - } } } } @@ -483,9 +426,7 @@ public void close() throws IOException private Outer getNewCurrOut() throws FileNotFoundException { final int fileNum = outFiles.size(); - File outFile = outputFileName != null - ? makeChunkFile(baseDir, outputFileName, fileNum) - : makeChunkFile(baseDir, fileNum); + File outFile = makeChunkFile(baseDir, fileNum); outFiles.add(outFile); return new Outer(fileNum, outFile, maxChunkSize); } diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java index 2f3110153595..4059ed38c4df 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java @@ -630,6 +630,7 @@ protected Metadata makeProjections( final String section2 = "build projection[" + projectionSchema.getName() + "] inverted index and columns"; progress.startSection(section2); + segmentFileBuilder.startFileGroup(projectionSchema.getName()); if (projectionSchema.getTimeColumnName() != null) { makeTimeColumn( segmentFileBuilder, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java index 7d3661886a06..8c8a6d862d5e 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java @@ -218,6 +218,7 @@ protected File makeIndexFiles( /************ Create Inverted Indexes and Finalize Build Columns *************/ final String section = "build inverted index and columns"; progress.startSection(section); + v10Smoosher.startFileGroup(Projections.BASE_TABLE_PROJECTION_NAME); makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix + ColumnHolder.TIME_COLUMN_NAME); makeMetricsColumns( v10Smoosher, diff --git a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java index 5d5ac10e1d47..6d5aea47374c 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java @@ -21,6 +21,7 @@ import org.apache.druid.segment.column.ColumnDescriptor; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -45,6 +46,23 @@ public interface SegmentFileBuilder extends Closeable */ void addColumn(String name, ColumnDescriptor columnDescriptor); + /** + * Declare that subsequent writes belong to a named group of files that should be stored together. This is a hint + * about physical layout, it does not constrain the names of files subsequently added, and implementations are free + * to ignore it entirely (the default is a no-op for formats that don't organize data into coarse-grained + * groupings). Projections are the primary caller today, but the mechanism is generic, it's equally applicable to + * grouping internal metadata, data shared across columns, etc. + *

+ * Callers should invoke this before writing each group's files; passing {@code null} clears the current group. + * Callers should not invoke this while a writer returned by {@link #addWithChannel} is still open (implementations + * may reject such calls). + * + * @see SegmentFileBuilderV10#startFileGroup(String) for the V10 semantics + */ + default void startFileGroup(@Nullable String groupName) + { + } + /** * Add a {@link File} to the segment file as the specified name */ diff --git a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java index 6986a2759fe3..439988dc8758 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java +++ b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java @@ -23,13 +23,19 @@ import com.google.common.primitives.Ints; import org.apache.druid.error.DruidException; import org.apache.druid.io.Channels; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnDescriptor; import org.apache.druid.segment.data.BitmapSerdeFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.projections.ProjectionMetadata; +import org.apache.druid.utils.CloseableUtils; import javax.annotation.Nullable; import java.io.File; @@ -39,20 +45,42 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TreeMap; /** - * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a {@link FileSmoosher} underneath to build - * V9 smoosh files and collect the metadata about the offsets in those containers, and then appends them into the V10 - * consolidated segment file after the header and {@link SegmentFileMetadata} is written. + * {@link SegmentFileBuilder} for V10 format segments. Files are written into 'container' chunk files in {@link #baseDir} + * and are concatenated after the header and {@link SegmentFileMetadata} on {@link #close()} to produce the final + * consolidated segment file. *

* V10 file format: * | version (byte) | meta compression (byte) | meta length (int) | meta json | container 0 | ... | container n | + *

+ * Containers are scoped to at most one declared file group. Callers declare which group they are writing via + * {@link #startFileGroup(String)} before writing its files; a new container is started when the declared group + * changes or the current container would exceed {@link #maxContainerSize}. A group whose total size exceeds the max + * container size spans multiple containers, all tagged with the same group. This gives readers a clean 1:1 (or 1:N) + * mapping between groups and containers, which supports per-group partial loading without any read-side reorganization. + * Projections are the primary caller today, but the mechanism is equally usable for other organizational needs + * (shared data across columns, internal metadata, etc.). + *

+ * Callers that never invoke {@link #startFileGroup(String)} are mapped to a null-group container. + *

+ * Much of the logic here was ported from {@link org.apache.druid.java.util.common.io.smoosh.FileSmoosher} of the V9 + * format and there is a fair bit of overlap. In fact, the initial implementation of this class wrapped a V9 smoosher + * to build the files before combining them into the V10 format. The main difference is that V9 fills each container to + * the max while here we organize with file groups. */ public class SegmentFileBuilderV10 implements SegmentFileBuilder { + private static final Logger LOG = new Logger(SegmentFileBuilderV10.class); + public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File baseDir) { return create(jsonMapper, baseDir, CompressionStrategy.NONE); @@ -72,12 +100,31 @@ public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File baseDir private final ObjectMapper jsonMapper; private final String outputFileName; private final File baseDir; - private final long maxChunkSize; + private final long maxContainerSize; private final CompressionStrategy metadataCompression; - private final FileSmoosher smoosher; private final Map externalSegmentFileBuilders; private final Map columns = new TreeMap<>(); + private final List containers = new ArrayList<>(); + private final Map internalFiles = new TreeMap<>(); + + // Nested addWithChannel calls (for example a serializer that, while being written, emits sub-files for its own + // columnar parts) can't write into the current container concurrently with the outer writer. These nested writes are + // redirected to temporary files and merged back into container(s) once the outer writer completes. + private final List completedDelegateFiles = new ArrayList<>(); + private final List inProgressDelegateFiles = new ArrayList<>(); + private final Map delegateFileNameMap = new HashMap<>(); + private long delegateFileCounter = 0; + + @Nullable + private ContainerWriter currentContainer = null; + private boolean writerCurrentlyInUse = false; + // The file group declared by the most recent {@link #startFileGroup} call. Writes are routed into containers + // tagged with this group. Remains {@code null} if the caller never declares one, in which case all writes share + // a single null-group container. + @Nullable + private String currentFileGroup = null; + @Nullable private String interval = null; @Nullable @@ -89,35 +136,141 @@ private SegmentFileBuilderV10( ObjectMapper jsonMapper, String outputFileName, File baseDir, - long maxChunkSize, + long maxContainerSize, CompressionStrategy metadataCompression ) { this.jsonMapper = jsonMapper; this.outputFileName = outputFileName; this.baseDir = baseDir; - this.maxChunkSize = maxChunkSize; + this.maxContainerSize = maxContainerSize; this.metadataCompression = metadataCompression; - this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize), outputFileName); this.externalSegmentFileBuilders = new TreeMap<>(); } @Override public void add(String name, File fileToAdd) throws IOException { - smoosher.add(name, fileToAdd); + try (FileInputStream fis = new FileInputStream(fileToAdd); + FileChannel src = fis.getChannel()) { + final long size = src.size(); + try (SegmentFileChannel out = addWithChannel(name, size)) { + long position = 0; + while (position < size) { + final long transferred = src.transferTo(position, size - position, out); + if (transferred <= 0) { + throw new IOE("Unable to transfer bytes from file[%s] at position[%,d]", fileToAdd, position); + } + position += transferred; + } + } + } } @Override public void add(String name, ByteBuffer bufferToAdd) throws IOException { - smoosher.add(name, bufferToAdd); + try (SegmentFileChannel out = addWithChannel(name, bufferToAdd.remaining())) { + out.write(bufferToAdd); + } } @Override - public SegmentFileChannel addWithChannel(String name, long size) throws IOException + public SegmentFileChannel addWithChannel(final String name, final long size) throws IOException { - return smoosher.addWithChannel(name, size); + if (name.contains(",")) { + throw new IAE("Cannot have a comma in the name of a file, got[%s].", name); + } + if (internalFiles.containsKey(name)) { + throw new IAE("Cannot add files of the same name, already have [%s]", name); + } + if (size > maxContainerSize) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + "Serialized buffer size[%,d] for column[%s] exceeds the maximum[%,d]. " + + "Consider adjusting the tuningConfig - for example, reduce maxRowsPerSegment, " + + "or partition your data further.", + size, name, maxContainerSize + ); + } + + // If an outer writer is mid-write we can't append to the current container concurrently, route through a temp + // file that will be merged back into a container once the outer writer releases. + if (writerCurrentlyInUse) { + return delegateChannel(name, size); + } + + ensureContainer(currentFileGroup, size); + final ContainerWriter target = currentContainer; + final long startOffset = target.currOffset; + writerCurrentlyInUse = true; + + return new SegmentFileChannel() + { + private boolean open = true; + private long bytesWritten = 0; + + @Override + public int write(ByteBuffer src) throws IOException + { + return Ints.checkedCast(verifySize(target.write(src))); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return verifySize(target.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return verifySize(target.write(srcs)); + } + + private long verifySize(long bytesWrittenInChunk) + { + bytesWritten += bytesWrittenInChunk; + + if (bytesWritten != target.currOffset - startOffset) { + throw new ISE("Perhaps there is some concurrent modification going on?"); + } + if (bytesWritten > size) { + throw new ISE("Wrote[%,d] bytes for something of size[%,d]. Liar!!!", bytesWritten, size); + } + + return bytesWrittenInChunk; + } + + @Override + public boolean isOpen() + { + return open; + } + + @Override + public void close() throws IOException + { + if (!open) { + return; + } + open = false; + writerCurrentlyInUse = false; + + if (bytesWritten != target.currOffset - startOffset) { + throw new ISE("Perhaps there is some concurrent modification going on?"); + } + if (bytesWritten != size) { + throw new IOE("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten); + } + internalFiles.put( + name, + new SegmentInternalFileMetadata(target.fileNum, startOffset, target.currOffset - startOffset) + ); + mergeDelegatedFiles(); + } + }; } @Override @@ -125,7 +278,7 @@ public SegmentFileBuilder getExternalBuilder(String externalFile) { return externalSegmentFileBuilders.computeIfAbsent( externalFile, - (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir, maxChunkSize, metadataCompression) + (k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir, maxContainerSize, metadataCompression) ); } @@ -135,6 +288,31 @@ public void addColumn(String name, ColumnDescriptor columnDescriptor) this.columns.put(name, columnDescriptor); } + /** + * Declare the file group that subsequent writes belong to. Writes are routed into a container tagged with the + * declared group; a new container is rolled when the group changes or the incoming file won't fit. A group whose + * total size exceeds {@link #maxContainerSize} is split across multiple consecutive containers, all tagged with + * the same group. Passing {@code null} clears the current group; subsequent writes are then routed into a + * null-group container until the next call. + *

+ * Current V10-specific limitations worth knowing: + *

    + *
  • Groups cannot be re-entered. Once a different group (or {@code null}) has been declared, the previous + * group's container is closed, and you cannot go back and append more files to it, any such writes would + * open a fresh container for the re-declared group, so the group's files would end up in non-contiguous + * containers. If all of a group's files must land in the same container(s), write them contiguously.
  • + *
  • Throws if called while a writer returned by {@link #addWithChannel} is still open.
  • + *
+ */ + @Override + public void startFileGroup(@Nullable String groupName) + { + if (writerCurrentlyInUse) { + throw DruidException.defensive("Cannot start file group[%s] while a writer is in progress", groupName); + } + this.currentFileGroup = groupName; + } + public void addInterval(String interval) { this.interval = interval; @@ -153,7 +331,9 @@ public void addProjections(List projections) @Override public void abort() { - smoosher.abort(); + if (currentContainer != null) { + CloseableUtils.closeAndWrapExceptions(currentContainer); + } } @Override @@ -163,11 +343,21 @@ public void close() throws IOException externalBuilder.close(); } - smoosher.close(); + if (!completedDelegateFiles.isEmpty() || !inProgressDelegateFiles.isEmpty()) { + abort(); + throw new ISE( + "[%d] writers in progress and [%d] completed writers needs to be closed before closing builder.", + inProgressDelegateFiles.size(), completedDelegateFiles.size() + ); + } + + if (currentContainer != null) { + currentContainer.close(); + } - SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata( - smoosher.getContainers(), - smoosher.getInternalFiles(), + final SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata( + buildContainerMetadata(), + internalFiles, interval, columns.isEmpty() ? null : columns, projections, @@ -222,7 +412,8 @@ public void close() throws IOException Channels.writeFully(channel, compressed); } - for (File f : smoosher.getOutFiles()) { + for (ContainerWriter container : containers) { + final File f = container.file; try (FileInputStream fis = new FileInputStream(f)) { byte[] buffer = new byte[4096]; int bytesRead; @@ -230,13 +421,236 @@ public void close() throws IOException outputStream.write(buffer, 0, bytesRead); } } - // delete all the old 00000.smoosh + // delete all the old container files DruidException.conditionalDefensive( f.delete(), - "Failed to delete temporary file[%s]", + "Failed to delete temporary container file[%s]", f ); } } } + + private List buildContainerMetadata() + { + final List result = new ArrayList<>(containers.size()); + long offset = 0; + for (ContainerWriter container : containers) { + final long length = container.file.length(); + result.add(new SegmentFileContainerMetadata(offset, length)); + offset += length; + } + return result; + } + + /** + * Ensure that {@link #currentContainer} is ready to accept {@code size} bytes of a file belonging to {@code group}. + * Rolls the current container and starts a new one when: + *
    + *
  • there is no current container, or
  • + *
  • the current container is for a different group, or
  • + *
  • the current container cannot fit the incoming bytes within {@link #maxContainerSize}.
  • + *
+ */ + private void ensureContainer(@Nullable String group, long size) throws IOException + { + if (currentContainer == null + || !Objects.equals(currentContainer.group, group) + || !currentContainer.canFit(size)) { + if (currentContainer != null) { + currentContainer.close(); + } + currentContainer = openNewContainer(group); + containers.add(currentContainer); + } + } + + private ContainerWriter openNewContainer(@Nullable String group) throws IOException + { + FileUtils.mkdirp(baseDir); + final int fileNum = containers.size(); + final File containerFile = new File( + baseDir, + StringUtils.format("%s-%05d.container", outputFileName, fileNum) + ); + return new ContainerWriter(fileNum, containerFile, group, maxContainerSize); + } + + private SegmentFileChannel delegateChannel(final String name, final long size) throws IOException + { + final String delegateName = nextDelegateFileName(name); + final File tmpFile = new File(baseDir, delegateName); + inProgressDelegateFiles.add(tmpFile); + + return new SegmentFileChannel() + { + private final FileChannel channel = FileChannel.open( + tmpFile.toPath(), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING + ); + + private long bytesWritten = 0; + + @Override + public int write(ByteBuffer src) throws IOException + { + return Ints.checkedCast(addBytes(channel.write(src))); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return addBytes(channel.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return addBytes(channel.write(srcs)); + } + + private long addBytes(long n) + { + if (n > size - bytesWritten) { + throw new ISE( + "Wrote more bytes[%,d] than expected[%,d] for delegated file[%s]", + bytesWritten + n, size, name + ); + } + bytesWritten += n; + return n; + } + + @Override + public boolean isOpen() + { + return channel.isOpen(); + } + + @Override + public void close() throws IOException + { + channel.close(); + completedDelegateFiles.add(tmpFile); + inProgressDelegateFiles.remove(tmpFile); + if (!writerCurrentlyInUse) { + mergeDelegatedFiles(); + } + } + }; + } + + /** + * Move completed delegate temp files into containers by replaying them as regular {@link #add} calls. Only called + * when no outer writer is currently holding the builder. + */ + private void mergeDelegatedFiles() throws IOException + { + if (completedDelegateFiles.isEmpty()) { + return; + } + final List toProcess = new ArrayList<>(completedDelegateFiles); + final Map nameMap = new HashMap<>(delegateFileNameMap); + completedDelegateFiles.clear(); + delegateFileNameMap.clear(); + for (File file : toProcess) { + final String name = nameMap.get(file.getName()); + add(name, file); + if (!file.delete()) { + LOG.warn("Unable to delete delegate file[%s]", file); + } + } + } + + /** + * Generate a unique temp file name for a delegated nested write. Prefixed with {@link #outputFileName} so that + * delegate files from a main builder and its externals (which share {@link #baseDir}) cannot collide as main and + * external always have distinct output file names. + */ + private String nextDelegateFileName(String name) + { + final String delegateName = StringUtils.format("%s-delegate-%d", outputFileName, delegateFileCounter++); + delegateFileNameMap.put(delegateName, name); + return delegateName; + } + + /** + * Low-level writer for a single container chunk file. One container holds internal files from at most one group. + */ + private static class ContainerWriter implements GatheringByteChannel + { + private final int fileNum; + private final File file; + @Nullable + private final String group; + private final long maxSize; + private final Closer closer = Closer.create(); + private final GatheringByteChannel channel; + private long currOffset = 0; + + ContainerWriter(int fileNum, File file, @Nullable String group, long maxSize) throws IOException + { + this.fileNum = fileNum; + this.file = file; + this.group = group; + this.maxSize = maxSize; + final FileOutputStream outStream = closer.register(new FileOutputStream(file)); + this.channel = closer.register(outStream.getChannel()); + } + + boolean canFit(long size) + { + // overflow-safe form of currOffset + size <= maxSize for non-negative currOffset/size/maxSize + return size <= maxSize - currOffset; + } + + @Override + public int write(ByteBuffer src) throws IOException + { + return Ints.checkedCast(recordWrite(channel.write(src))); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException + { + return recordWrite(channel.write(srcs, offset, length)); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException + { + return recordWrite(channel.write(srcs)); + } + + private long recordWrite(long n) + { + if (n > maxSize - currOffset) { + throw new ISE("Wrote more bytes[%,d] than available[%,d]", n, maxSize - currOffset); + } + currOffset += n; + return n; + } + + @Override + public boolean isOpen() + { + return channel.isOpen(); + } + + @Override + public void close() throws IOException + { + closer.close(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Created container file[%s] for group[%s] of size[%,d] bytes.", + file.getAbsolutePath(), + group, + file.length() + ); + } + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java new file mode 100644 index 000000000000..265593b7a3c0 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.file; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.Files; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.TestHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +class SegmentFileBuilderV10Test +{ + private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); + + @TempDir + File tempDir; + + @Test + void testOneContainerPerProjection() throws IOException + { + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + // matches the production usage pattern in IndexMergerV10: call startFileGroup then write that projection's + // columns, then move on to the next projection. + final String[] projections = {"__base", "projA", "projB"}; + final int colCount = 3; + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + for (String projection : projections) { + builder.startFileGroup(projection); + for (int col = 0; col < colCount; col++) { + final String name = projection + "/col" + col; + final File tmpFile = new File(tempDir, StringUtils.format("%s-%s.bin", projection, col)); + Files.write(Ints.toByteArray(name.hashCode()), tmpFile); + builder.add(name, tmpFile); + } + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) { + final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata(); + + Assertions.assertEquals(projections.length, metadata.getContainers().size()); + Assertions.assertEquals(projections.length * 3, metadata.getFiles().size()); + assertNoContainerMixesProjections(metadata); + + assertColumns(projections, colCount, mapper); + } + } + + @Test + void testProjectionNameWithSlashRoutesCorrectly() throws IOException + { + // regression: projection names may legitimately contain '/', so container routing must use the declared + // projection rather than a parsed prefix of internal file names. + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + final String slashyProjection = "nested/projection"; + final int colCount = 3; + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + builder.startFileGroup("__base"); + for (int col = 0; col < colCount; col++) { + final String name = "__base/col" + col; + final File tmpFile = new File(tempDir, StringUtils.format("base-%s.bin", col)); + Files.write(Ints.toByteArray(name.hashCode()), tmpFile); + builder.add(name, tmpFile); + } + builder.startFileGroup(slashyProjection); + for (int col = 0; col < colCount; col++) { + final String name = slashyProjection + "/col" + col; + final File tmpFile = new File(tempDir, StringUtils.format("slashy-%s.bin", col)); + Files.write(Ints.toByteArray(name.hashCode()), tmpFile); + builder.add(name, tmpFile); + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) { + final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata(); + // 2 projections, 2 containers, even though the slashy name's first '/' would have parsed as projection "nested" + Assertions.assertEquals(2, metadata.getContainers().size()); + Assertions.assertEquals(2 * colCount, metadata.getFiles().size()); + + // round-trip both sets of files + for (int col = 0; col < colCount; col++) { + final String baseName = "__base/col" + col; + final ByteBuffer baseBuf = mapper.mapFile(baseName); + Assertions.assertNotNull(baseBuf, baseName); + Assertions.assertEquals(baseName.hashCode(), baseBuf.getInt(), baseName); + + final String slashyName = slashyProjection + "/col" + col; + final ByteBuffer slashyBuf = mapper.mapFile(slashyName); + Assertions.assertNotNull(slashyBuf, slashyName); + Assertions.assertEquals(slashyName.hashCode(), slashyBuf.getInt(), slashyName); + } + } + } + + @Test + void testStartFileGroupWhileWriterInUseThrows() throws IOException + { + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + builder.startFileGroup("__base"); + try (SegmentFileChannel outer = builder.addWithChannel("__base/col0", 4)) { + Assertions.assertThrows(RuntimeException.class, () -> builder.startFileGroup("projA")); + outer.write(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + } + } + } + + @Test + void testExternalBuilderAlsoSplitsContainersByProjection() throws IOException + { + final String externalName = "external.segment"; + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + final String[] mainProjections = {"__base", "projA", "projB"}; + final String[] externalProjections = {"extProjX", "extProjY"}; + final int colCount = 3; + + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + for (String projection : mainProjections) { + builder.startFileGroup(projection); + for (int col = 0; col < colCount; col++) { + final String name = projection + "/col" + col; + final File tmpFile = new File(tempDir, StringUtils.format("main-%s-%s.bin", projection, col)); + Files.write(Ints.toByteArray(name.hashCode()), tmpFile); + builder.add(name, tmpFile); + } + } + + // getExternalBuilder returns the SegmentFileBuilder interface but under the hood produces an independent V10 + // sub-file with its own header + containers. Projection-per-container splitting must apply there too. + final SegmentFileBuilder external = builder.getExternalBuilder(externalName); + for (String projection : externalProjections) { + external.startFileGroup(projection); + for (int col = 0; col < colCount; col++) { + final String name = projection + "/col" + (col + 1000); + final File tmpFile = new File(tempDir, StringUtils.format("ext-%s-%s.bin", projection, col)); + Files.write(Ints.toByteArray(name.hashCode()), tmpFile); + external.add(name, tmpFile); + } + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + final File externalFile = new File(baseDir, externalName); + Assertions.assertTrue(segmentFile.exists(), "main v10 file missing"); + Assertions.assertTrue(externalFile.exists(), "external v10 file missing"); + + // the external file on its own is a well-formed V10 sub-segment, load it directly to check its container layout. + try (SegmentFileMapperV10 externalOnly = SegmentFileMapperV10.create(externalFile, JSON_MAPPER)) { + final SegmentFileMetadata externalMetadata = externalOnly.getSegmentFileMetadata(); + Assertions.assertEquals(externalProjections.length, externalMetadata.getContainers().size()); + Assertions.assertEquals(externalProjections.length * colCount, externalMetadata.getFiles().size()); + assertNoContainerMixesProjections(externalMetadata); + } + + // loaded together: main file checks its own containers and the external is attached for mapExternalFile(). + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) { + final SegmentFileMetadata mainMetadata = mapper.getSegmentFileMetadata(); + Assertions.assertEquals(mainProjections.length, mainMetadata.getContainers().size()); + Assertions.assertEquals(mainProjections.length * colCount, mainMetadata.getFiles().size()); + assertNoContainerMixesProjections(mainMetadata); + + assertColumns(mainProjections, colCount, mapper); + + for (String projection : externalProjections) { + for (int col = 0; col < colCount; col++) { + final String name = projection + "/col" + (col + 1000); + final ByteBuffer buf = mapper.mapExternalFile(externalName, name); + Assertions.assertNotNull(buf, name); + Assertions.assertEquals(name.hashCode(), buf.getInt(), name); + } + } + } + } + + @Test + void testNestedAddWithChannelDelegatesPerBuilder() throws IOException + { + // exercises the delegate-temp-file path on both the main and external builders: while an outer addWithChannel is + // mid-write on a builder, a nested addWithChannel on the same builder must route through a temp file and then be + // merged back in at outer-close. Main and external each drive this independently, and since they share baseDir, + // their delegate file names must not collide. + final String externalName = "external.segment"; + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + final byte[] outerBytes = new byte[]{1, 2, 3, 4}; + final byte[] nestedBytes = new byte[]{5, 6, 7, 8}; + + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + builder.startFileGroup("__base"); + try (SegmentFileChannel outer = builder.addWithChannel("__base/outer", outerBytes.length)) { + // nested write while outer is in use → forced into delegate temp file + try (SegmentFileChannel nested = builder.addWithChannel("__base/nested", nestedBytes.length)) { + nested.write(ByteBuffer.wrap(nestedBytes)); + } + outer.write(ByteBuffer.wrap(outerBytes)); + } + + final SegmentFileBuilder external = builder.getExternalBuilder(externalName); + external.startFileGroup("extProj"); + try (SegmentFileChannel extOuter = external.addWithChannel("extProj/outer", outerBytes.length)) { + try (SegmentFileChannel extNested = external.addWithChannel("extProj/nested", nestedBytes.length)) { + extNested.write(ByteBuffer.wrap(nestedBytes)); + } + extOuter.write(ByteBuffer.wrap(outerBytes)); + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER, List.of(externalName))) { + assertBytes(mapper.mapFile("__base/outer"), outerBytes); + assertBytes(mapper.mapFile("__base/nested"), nestedBytes); + assertBytes(mapper.mapExternalFile(externalName, "extProj/outer"), outerBytes); + assertBytes(mapper.mapExternalFile(externalName, "extProj/nested"), nestedBytes); + } + } + + private static void assertBytes(ByteBuffer actual, byte[] expected) + { + Assertions.assertNotNull(actual); + Assertions.assertEquals(expected.length, actual.remaining()); + final byte[] got = new byte[expected.length]; + actual.get(got); + Assertions.assertArrayEquals(expected, got); + } + + @Test + void testUnprefixedFilesShareSingleContainer() throws IOException + { + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + for (int i = 0; i < 5; ++i) { + final File tmpFile = new File(tempDir, StringUtils.format("plain-%s.bin", i)); + Files.write(Ints.toByteArray(i), tmpFile); + builder.add(String.valueOf(i), tmpFile); + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) { + Assertions.assertEquals(1, mapper.getSegmentFileMetadata().getContainers().size()); + } + } + + private static void assertNoContainerMixesProjections(SegmentFileMetadata metadata) + { + for (int containerIdx = 0; containerIdx < metadata.getContainers().size(); containerIdx++) { + final Set projectionsInContainer = new HashSet<>(); + for (Map.Entry entry : metadata.getFiles().entrySet()) { + if (entry.getValue().getContainer() == containerIdx) { + final int slash = entry.getKey().indexOf('/'); + projectionsInContainer.add(slash < 0 ? "" : entry.getKey().substring(0, slash)); + } + } + Assertions.assertEquals( + 1, + projectionsInContainer.size(), + "container[" + containerIdx + "] mixes projections: " + projectionsInContainer + ); + } + } + + private static void assertColumns(String[] projections, int colCount, SegmentFileMapperV10 mapper) throws IOException + { + for (String projection : projections) { + for (int col = 0; col < colCount; col++) { + final String name = projection + "/col" + col; + final ByteBuffer buf = mapper.mapFile(name); + Assertions.assertNotNull(buf, name); + Assertions.assertEquals(name.hashCode(), buf.getInt(), name); + } + } + } +} From 510b2e75d8cf65c0b70fa2981d4d00f5fd196732 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 27 Apr 2026 01:09:22 -0700 Subject: [PATCH 2/2] fixes --- .../input/impl/AggregateProjectionSpec.java | 8 +-- .../org/apache/druid/segment/IndexIO.java | 2 +- .../apache/druid/segment/IndexMergerBase.java | 10 +-- .../segment/file/SegmentFileBuilderV10.java | 66 +++++++++--------- .../segment/projections/Projections.java | 17 ++++- .../file/SegmentFileBuilderV10Test.java | 67 ++++++++++++++++--- 6 files changed, 115 insertions(+), 55 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java index 60031a6fb259..a19a5e1a6e64 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java @@ -100,13 +100,7 @@ public AggregateProjectionSpec( @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators ) { - if (name == null || name.isEmpty()) { - throw InvalidInput.exception("projection name cannot be null or empty"); - } - if (Projections.BASE_TABLE_PROJECTION_NAME.equals(name)) { - throw InvalidInput.exception("projection cannot use reserved name[%s]", Projections.BASE_TABLE_PROJECTION_NAME); - } - this.name = name; + this.name = Projections.validateProjectionName(name); if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) { throw InvalidInput.exception( "projection[%s] groupingColumns and aggregators must not both be null or empty", diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index e5acd4f73fb3..f246421f8415 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -1039,7 +1039,7 @@ private Map> readProjectionColumns( final Map> projectionColumns = new LinkedHashMap<>(); for (String column : projectionSpec.getSchema().getColumnNames()) { - final String smooshName = Projections.getProjectionSmooshFileName(projectionSpec.getSchema(), column); + final String smooshName = Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column); final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName); final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName); if (columnDescriptor == null) { diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java index 4059ed38c4df..06cde93fa3da 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java @@ -512,7 +512,7 @@ protected Metadata makeProjections( columnFormats.put(dimension, dimensionFormat); DimensionHandler handler = dimensionFormat.getColumnHandler(dimension); DimensionMergerV9 merger = handler.makeMerger( - Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), + Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), indexSpec, segmentWriteOutMedium, dimensionFormat.toColumnCapabilities(), @@ -543,7 +543,7 @@ protected Metadata makeProjections( metrics, columnFormats, indexSpec, - Projections.getProjectionSmooshPrefix(spec.getSchema()) + Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema()) ); Function, TimeAndDimsIterator> rowMergerFn = @@ -637,7 +637,7 @@ protected Metadata makeProjections( progress, timeWriter, indexSpec, - Projections.getProjectionSmooshFileName(spec.getSchema(), projectionSchema.getTimeColumnName()) + Projections.getProjectionSegmentInternalFileName(spec.getSchema(), projectionSchema.getTimeColumnName()) ); } makeMetricsColumns( @@ -647,7 +647,7 @@ protected Metadata makeProjections( columnFormats, metricWriters, indexSpec, - Projections.getProjectionSmooshPrefix(spec.getSchema()) + Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema()) ); for (int i = 0; i < dimensions.size(); i++) { @@ -665,7 +665,7 @@ protected Metadata makeProjections( // use merger descriptor, merger either has values or handles it own null column storage details columnDesc = merger.makeColumnDescriptor(); } - makeColumn(segmentFileBuilder, Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), columnDesc); + makeColumn(segmentFileBuilder, Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), columnDesc); } progress.stopSection(section2); diff --git a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java index 439988dc8758..0557fc2a4bdf 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java +++ b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java @@ -48,7 +48,6 @@ import java.nio.channels.GatheringByteChannel; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -110,10 +109,11 @@ public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File baseDir // Nested addWithChannel calls (for example a serializer that, while being written, emits sub-files for its own // columnar parts) can't write into the current container concurrently with the outer writer. These nested writes are - // redirected to temporary files and merged back into container(s) once the outer writer completes. - private final List completedDelegateFiles = new ArrayList<>(); - private final List inProgressDelegateFiles = new ArrayList<>(); - private final Map delegateFileNameMap = new HashMap<>(); + // redirected to temporary files and merged back into container(s) once the outer writer completes. Each entry + // carries the file group that was active when the delegate was created so that the merge routes it into the + // correct container even if the active group has since changed. + private final List completedDelegates = new ArrayList<>(); + private final List inProgressDelegates = new ArrayList<>(); private long delegateFileCounter = 0; @Nullable @@ -343,11 +343,12 @@ public void close() throws IOException externalBuilder.close(); } - if (!completedDelegateFiles.isEmpty() || !inProgressDelegateFiles.isEmpty()) { + if (!completedDelegates.isEmpty() || !inProgressDelegates.isEmpty()) { abort(); throw new ISE( "[%d] writers in progress and [%d] completed writers needs to be closed before closing builder.", - inProgressDelegateFiles.size(), completedDelegateFiles.size() + inProgressDelegates.size(), + completedDelegates.size() ); } @@ -478,9 +479,14 @@ private ContainerWriter openNewContainer(@Nullable String group) throws IOExcept private SegmentFileChannel delegateChannel(final String name, final long size) throws IOException { - final String delegateName = nextDelegateFileName(name); + // Prefixed with outputFileName so delegate files from a main builder and its externals (which share baseDir) + // cannot collide, since main and external always have distinct output file names. + final String delegateName = StringUtils.format("%s-delegate-%d", outputFileName, delegateFileCounter++); final File tmpFile = new File(baseDir, delegateName); - inProgressDelegateFiles.add(tmpFile); + // Snapshot the active group now so that if this delegate is merged after the outer writer has advanced past + // the group it was created under, it still routes into the correct container. + final DelegateEntry entry = new DelegateEntry(tmpFile, name, currentFileGroup); + inProgressDelegates.add(entry); return new SegmentFileChannel() { @@ -533,8 +539,8 @@ public boolean isOpen() public void close() throws IOException { channel.close(); - completedDelegateFiles.add(tmpFile); - inProgressDelegateFiles.remove(tmpFile); + completedDelegates.add(entry); + inProgressDelegates.remove(entry); if (!writerCurrentlyInUse) { mergeDelegatedFiles(); } @@ -544,36 +550,34 @@ public void close() throws IOException /** * Move completed delegate temp files into containers by replaying them as regular {@link #add} calls. Only called - * when no outer writer is currently holding the builder. + * when no outer writer is currently holding the builder. Each entry's snapshotted group is restored as + * {@link #currentFileGroup} during its replay so the file lands in the container that was active when the + * nested write was originally requested, not whichever group happens to be active at merge time. */ private void mergeDelegatedFiles() throws IOException { - if (completedDelegateFiles.isEmpty()) { + if (completedDelegates.isEmpty()) { return; } - final List toProcess = new ArrayList<>(completedDelegateFiles); - final Map nameMap = new HashMap<>(delegateFileNameMap); - completedDelegateFiles.clear(); - delegateFileNameMap.clear(); - for (File file : toProcess) { - final String name = nameMap.get(file.getName()); - add(name, file); - if (!file.delete()) { - LOG.warn("Unable to delete delegate file[%s]", file); + final List toProcess = new ArrayList<>(completedDelegates); + completedDelegates.clear(); + final String savedGroup = currentFileGroup; + try { + for (DelegateEntry entry : toProcess) { + currentFileGroup = entry.group; + add(entry.name, entry.file); + if (!entry.file.delete()) { + LOG.warn("Unable to delete delegate file[%s]", entry.file); + } } } + finally { + currentFileGroup = savedGroup; + } } - /** - * Generate a unique temp file name for a delegated nested write. Prefixed with {@link #outputFileName} so that - * delegate files from a main builder and its externals (which share {@link #baseDir}) cannot collide as main and - * external always have distinct output file names. - */ - private String nextDelegateFileName(String name) + private record DelegateEntry(File file, String name, @Nullable String group) { - final String delegateName = StringUtils.format("%s-delegate-%d", outputFileName, delegateFileCounter++); - delegateFileNameMap.put(delegateName, name); - return delegateName; } /** diff --git a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java index 0415d540e24f..c81549f41c23 100644 --- a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java +++ b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java @@ -57,6 +57,17 @@ public class Projections private static final ConcurrentHashMap PERIOD_GRAN_CACHE = new ConcurrentHashMap<>(); + public static String validateProjectionName(@Nullable String name) + { + if (name == null || name.isEmpty()) { + throw InvalidInput.exception("projection name cannot be null or empty"); + } + if (name.startsWith("__")) { + throw InvalidInput.exception("projection cannot use reserved name[%s]", BASE_TABLE_PROJECTION_NAME); + } + return name; + } + @Nullable public static QueryableProjection findMatchingProjection( CursorBuildSpec cursorBuildSpec, @@ -509,12 +520,12 @@ public static String getProjectionSmooshV9Prefix(AggregateProjectionMetadata pro return projectionSpec.getSchema().getName() + "/"; } - public static String getProjectionSmooshFileName(ProjectionSchema schema, String columnName) + public static String getProjectionSegmentInternalFileName(ProjectionSchema schema, String columnName) { - return getProjectionSmooshPrefix(schema) + columnName; + return getProjectionSegmentInternalFilePrefix(schema) + columnName; } - public static String getProjectionSmooshPrefix(ProjectionSchema projectionSchema) + public static String getProjectionSegmentInternalFilePrefix(ProjectionSchema projectionSchema) { return projectionSchema.getName() + "/"; } diff --git a/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java index 265593b7a3c0..89c232973cc0 100644 --- a/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java +++ b/processing/src/test/java/org/apache/druid/segment/file/SegmentFileBuilderV10Test.java @@ -83,8 +83,6 @@ void testOneContainerPerProjection() throws IOException @Test void testProjectionNameWithSlashRoutesCorrectly() throws IOException { - // regression: projection names may legitimately contain '/', so container routing must use the declared - // projection rather than a parsed prefix of internal file names. final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); FileUtils.mkdirp(baseDir); @@ -256,13 +254,57 @@ void testNestedAddWithChannelDelegatesPerBuilder() throws IOException } } - private static void assertBytes(ByteBuffer actual, byte[] expected) + @Test + void testNestedDelegateClosedAfterOuterRoutesToOriginalGroup() throws IOException { - Assertions.assertNotNull(actual); - Assertions.assertEquals(expected.length, actual.remaining()); - final byte[] got = new byte[expected.length]; - actual.get(got); - Assertions.assertArrayEquals(expected, got); + // doing something like this is weird and probably should happen in practice, but if a nested write was requested + // while file group "groupA" was active; even if the caller switches to "groupB" before finally closing the nested + // channel, the delegated bytes must still land in groupA's container, not groupB's. Otherwise the grouping breaks, + // and files from other groups end up in the same container. + final File baseDir = new File(tempDir, "base_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(baseDir); + + final byte[] outerBytes = new byte[]{1, 2, 3, 4}; + final byte[] nestedBytes = new byte[]{5, 6, 7, 8}; + final byte[] groupBBytes = new byte[]{9, 10, 11, 12}; + + try (SegmentFileBuilderV10 builder = SegmentFileBuilderV10.create(JSON_MAPPER, baseDir)) { + builder.startFileGroup("groupA"); + + final SegmentFileChannel outer = builder.addWithChannel("groupA/outer", outerBytes.length); + final SegmentFileChannel nested = builder.addWithChannel("groupA/nested", nestedBytes.length); + nested.write(ByteBuffer.wrap(nestedBytes)); + + // close the outer first so writerCurrentlyInUse clears while the nested delegate is still open + outer.write(ByteBuffer.wrap(outerBytes)); + outer.close(); + + // switch group before closing the still-open nested delegate; merge must use the snapshotted "groupA" + builder.startFileGroup("groupB"); + nested.close(); + + // and a real groupB file so we can verify groupB's container is independent of the nested file + try (SegmentFileChannel groupBFile = builder.addWithChannel("groupB/file", groupBBytes.length)) { + groupBFile.write(ByteBuffer.wrap(groupBBytes)); + } + } + + final File segmentFile = new File(baseDir, IndexIO.V10_FILE_NAME); + try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(segmentFile, JSON_MAPPER)) { + final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata(); + + // the nested file was requested under groupA, so it must share groupA's container with groupA/outer + // and must NOT be in groupB's container alongside groupB/file. + final int outerContainer = metadata.getFiles().get("groupA/outer").getContainer(); + final int nestedContainer = metadata.getFiles().get("groupA/nested").getContainer(); + final int groupBContainer = metadata.getFiles().get("groupB/file").getContainer(); + Assertions.assertEquals(outerContainer, nestedContainer, "nested delegate landed in the wrong container"); + Assertions.assertNotEquals(groupBContainer, nestedContainer, "nested delegate leaked into groupB's container"); + + assertBytes(mapper.mapFile("groupA/outer"), outerBytes); + assertBytes(mapper.mapFile("groupA/nested"), nestedBytes); + assertBytes(mapper.mapFile("groupB/file"), groupBBytes); + } } @Test @@ -285,6 +327,15 @@ void testUnprefixedFilesShareSingleContainer() throws IOException } } + private static void assertBytes(ByteBuffer actual, byte[] expected) + { + Assertions.assertNotNull(actual); + Assertions.assertEquals(expected.length, actual.remaining()); + final byte[] got = new byte[expected.length]; + actual.get(got); + Assertions.assertArrayEquals(expected, got); + } + private static void assertNoContainerMixesProjections(SegmentFileMetadata metadata) { for (int containerIdx = 0; containerIdx < metadata.getContainers().size(); containerIdx++) {