Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.segment.projections.Projections;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -99,10 +100,7 @@ public AggregateProjectionSpec(
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators
)
{
if (name == null || name.isEmpty()) {
throw InvalidInput.exception("projection name cannot be null or empty");
}
this.name = name;
this.name = Projections.validateProjectionName(name);
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) {
throw InvalidInput.exception(
"projection[%s] groupingColumns and aggregators must not both be null or empty",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.file.SegmentFileBuilder;
import org.apache.druid.segment.file.SegmentFileChannel;
import org.apache.druid.segment.file.SegmentFileContainerMetadata;
import org.apache.druid.segment.file.SegmentInternalFileMetadata;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -99,35 +96,20 @@ public class FileSmoosher implements SegmentFileBuilder
private Outer currOut = null;
private boolean writerCurrentlyInUse = false;

// helper for SegmentFileBuilderV10 to have control over naming of smoosh output files; if this is non-null
// meta.smoosh is not written
@Nullable
private final String outputFileName;

public FileSmoosher(
File baseDir
)
{
this(baseDir, Integer.MAX_VALUE, null);
this(baseDir, Integer.MAX_VALUE);
}

public FileSmoosher(
File baseDir,
int maxChunkSize
)
{
this(baseDir, maxChunkSize, null);
}

public FileSmoosher(
File baseDir,
int maxChunkSize,
@Nullable String outputFileName
)
{
this.baseDir = baseDir;
this.maxChunkSize = maxChunkSize;
this.outputFileName = outputFileName;
this.delegateFileNameMap = new HashMap<>();

Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value.");
Expand All @@ -143,43 +125,6 @@ static File makeChunkFile(File baseDir, int i)
return new File(baseDir, StringUtils.format("%05d.%s", i, FILE_EXTENSION));
}

static File makeChunkFile(File baseDir, String prefix, int i)
{
return new File(baseDir, StringUtils.format("%s-%05d.%s", prefix, i, FILE_EXTENSION));
}

public List<File> getOutFiles()
{
return outFiles;
}

public List<SegmentFileContainerMetadata> getContainers()
{
List<SegmentFileContainerMetadata> smooshContainers = new ArrayList<>();
long offset = 0;
for (File f : outFiles) {
smooshContainers.add(new SegmentFileContainerMetadata(offset, f.length()));
offset += f.length();
}
return smooshContainers;
}

public Map<String, SegmentInternalFileMetadata> getInternalFiles()
{
Map<String, SegmentInternalFileMetadata> smooshFileMetadata = new TreeMap<>();
for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
smooshFileMetadata.put(
entry.getKey(),
new SegmentInternalFileMetadata(
entry.getValue().getFileNum(),
entry.getValue().getStartOffset(),
entry.getValue().getEndOffset() - entry.getValue().getStartOffset()
)
);
}
return smooshFileMetadata;
}

@Override
public void addColumn(String name, ColumnDescriptor columnDescriptor)
{
Expand Down Expand Up @@ -456,36 +401,32 @@ public void close() throws IOException
currOut.close();
}

if (outputFileName == null) {
File metaFile = metaFile(baseDir);

try (Writer out =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size()));
File metaFile = metaFile(baseDir);

try (Writer out =
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(metaFile), StandardCharsets.UTF_8))) {
out.write(StringUtils.format("v1,%d,%d", maxChunkSize, outFiles.size()));
out.write("\n");

for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
final Metadata metadata = entry.getValue();
out.write(
JOINER.join(
entry.getKey(),
metadata.getFileNum(),
metadata.getStartOffset(),
metadata.getEndOffset()
)
);
out.write("\n");

for (Map.Entry<String, Metadata> entry : internalFiles.entrySet()) {
final Metadata metadata = entry.getValue();
out.write(
JOINER.join(
entry.getKey(),
metadata.getFileNum(),
metadata.getStartOffset(),
metadata.getEndOffset()
)
);
out.write("\n");
}
}
}
}

private Outer getNewCurrOut() throws FileNotFoundException
{
final int fileNum = outFiles.size();
File outFile = outputFileName != null
? makeChunkFile(baseDir, outputFileName, fileNum)
: makeChunkFile(baseDir, fileNum);
File outFile = makeChunkFile(baseDir, fileNum);
outFiles.add(outFile);
return new Outer(fileNum, outFile, maxChunkSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ private Map<String, Supplier<BaseColumnHolder>> readProjectionColumns(
final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new LinkedHashMap<>();

for (String column : projectionSpec.getSchema().getColumnNames()) {
final String smooshName = Projections.getProjectionSmooshFileName(projectionSpec.getSchema(), column);
final String smooshName = Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column);
final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName);
final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName);
if (columnDescriptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ protected Metadata makeProjections(
columnFormats.put(dimension, dimensionFormat);
DimensionHandler handler = dimensionFormat.getColumnHandler(dimension);
DimensionMergerV9 merger = handler.makeMerger(
Projections.getProjectionSmooshFileName(spec.getSchema(), dimension),
Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension),
indexSpec,
segmentWriteOutMedium,
dimensionFormat.toColumnCapabilities(),
Expand Down Expand Up @@ -543,7 +543,7 @@ protected Metadata makeProjections(
metrics,
columnFormats,
indexSpec,
Projections.getProjectionSmooshPrefix(spec.getSchema())
Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
);

Function<List<TransformableRowIterator>, TimeAndDimsIterator> rowMergerFn =
Expand Down Expand Up @@ -630,13 +630,14 @@ protected Metadata makeProjections(

final String section2 = "build projection[" + projectionSchema.getName() + "] inverted index and columns";
progress.startSection(section2);
segmentFileBuilder.startFileGroup(projectionSchema.getName());
if (projectionSchema.getTimeColumnName() != null) {
makeTimeColumn(
segmentFileBuilder,
progress,
timeWriter,
indexSpec,
Projections.getProjectionSmooshFileName(spec.getSchema(), projectionSchema.getTimeColumnName())
Projections.getProjectionSegmentInternalFileName(spec.getSchema(), projectionSchema.getTimeColumnName())
);
}
makeMetricsColumns(
Expand All @@ -646,7 +647,7 @@ protected Metadata makeProjections(
columnFormats,
metricWriters,
indexSpec,
Projections.getProjectionSmooshPrefix(spec.getSchema())
Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema())
);

for (int i = 0; i < dimensions.size(); i++) {
Expand All @@ -664,7 +665,7 @@ protected Metadata makeProjections(
// use merger descriptor, merger either has values or handles it own null column storage details
columnDesc = merger.makeColumnDescriptor();
}
makeColumn(segmentFileBuilder, Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), columnDesc);
makeColumn(segmentFileBuilder, Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), columnDesc);
}

progress.stopSection(section2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ protected File makeIndexFiles(
/************ Create Inverted Indexes and Finalize Build Columns *************/
final String section = "build inverted index and columns";
progress.startSection(section);
v10Smoosher.startFileGroup(Projections.BASE_TABLE_PROJECTION_NAME);
makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix + ColumnHolder.TIME_COLUMN_NAME);
makeMetricsColumns(
v10Smoosher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.segment.column.ColumnDescriptor;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand All @@ -45,6 +46,23 @@ public interface SegmentFileBuilder extends Closeable
*/
void addColumn(String name, ColumnDescriptor columnDescriptor);

/**
* Declare that subsequent writes belong to a named group of files that should be stored together. This is a hint
* about physical layout, it does not constrain the names of files subsequently added, and implementations are free
* to ignore it entirely (the default is a no-op for formats that don't organize data into coarse-grained
* groupings). Projections are the primary caller today, but the mechanism is generic, it's equally applicable to
* grouping internal metadata, data shared across columns, etc.
* <p>
* Callers should invoke this before writing each group's files; passing {@code null} clears the current group.
* Callers should not invoke this while a writer returned by {@link #addWithChannel} is still open (implementations
* may reject such calls).
*
* @see SegmentFileBuilderV10#startFileGroup(String) for the V10 semantics
*/
default void startFileGroup(@Nullable String groupName)
{
}

/**
* Add a {@link File} to the segment file as the specified name
*/
Expand Down
Loading
Loading