diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index e5acd4f73fb3..f246421f8415 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -1039,7 +1039,7 @@ private Map> readProjectionColumns( final Map> projectionColumns = new LinkedHashMap<>(); for (String column : projectionSpec.getSchema().getColumnNames()) { - final String smooshName = Projections.getProjectionSmooshFileName(projectionSpec.getSchema(), column); + final String smooshName = Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column); final ByteBuffer colBuffer = segmentFileMapper.mapFile(smooshName); final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName); if (columnDescriptor == null) { diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java index 2f3110153595..ef6c2b46890f 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java @@ -512,7 +512,7 @@ protected Metadata makeProjections( columnFormats.put(dimension, dimensionFormat); DimensionHandler handler = dimensionFormat.getColumnHandler(dimension); DimensionMergerV9 merger = handler.makeMerger( - Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), + Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), indexSpec, segmentWriteOutMedium, dimensionFormat.toColumnCapabilities(), @@ -543,7 +543,7 @@ protected Metadata makeProjections( metrics, columnFormats, indexSpec, - Projections.getProjectionSmooshPrefix(spec.getSchema()) + Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema()) ); Function, TimeAndDimsIterator> rowMergerFn = @@ -636,7 +636,7 @@ protected Metadata makeProjections( progress, timeWriter, indexSpec, - Projections.getProjectionSmooshFileName(spec.getSchema(), projectionSchema.getTimeColumnName()) + Projections.getProjectionSegmentInternalFileName(spec.getSchema(), projectionSchema.getTimeColumnName()) ); } makeMetricsColumns( @@ -646,7 +646,7 @@ protected Metadata makeProjections( columnFormats, metricWriters, indexSpec, - Projections.getProjectionSmooshPrefix(spec.getSchema()) + Projections.getProjectionSegmentInternalFilePrefix(spec.getSchema()) ); for (int i = 0; i < dimensions.size(); i++) { @@ -664,7 +664,7 @@ protected Metadata makeProjections( // use merger descriptor, merger either has values or handles it own null column storage details columnDesc = merger.makeColumnDescriptor(); } - makeColumn(segmentFileBuilder, Projections.getProjectionSmooshFileName(spec.getSchema(), dimension), columnDesc); + makeColumn(segmentFileBuilder, Projections.getProjectionSegmentInternalFileName(spec.getSchema(), dimension), columnDesc); } progress.stopSection(section2); diff --git a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java new file mode 100644 index 000000000000..7435e0f36700 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet; +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.query.OrderBy; +import org.apache.druid.segment.column.BaseColumnHolder; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ColumnDescriptor; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; +import org.apache.druid.segment.file.SegmentFileMapper; +import org.apache.druid.segment.file.SegmentFileMetadata; +import org.apache.druid.segment.projections.AggregateProjectionSchema; +import org.apache.druid.segment.projections.BaseTableProjectionSchema; +import org.apache.druid.segment.projections.ConstantTimeColumn; +import org.apache.druid.segment.projections.ProjectionMetadata; +import org.apache.druid.segment.projections.Projections; +import org.apache.druid.segment.projections.QueryableProjection; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * A {@link QueryableIndex} that loads projection and base table columns on demand from a + * {@link PartialSegmentFileMapperV10}. Schema queries (column names, types, intervals, metadata) are answered from + * the {@link SegmentFileMetadata} alone without triggering any downloads. Column data is only downloaded when a column + * is accessed via {@link #getColumnHolder(String)} or {@link #getProjection(CursorBuildSpec)}. + *

+ * Projection matching uses only metadata ({@link SegmentFileMetadata#getColumnDescriptors()} keys) to determine if + * a projection can satisfy a query, avoiding downloads of projection data that won't be used. + * + * @see PartialSegmentFileMapperV10 + */ +public class PartialQueryableIndex implements QueryableIndex +{ + private final Interval dataInterval; + private final int baseNumRows; + private final Indexed availableDimensions; + private final List columnNames; + private final BitmapFactory bitmapFactory; + private final PartialSegmentFileMapperV10 fileMapper; + private final SegmentFileMetadata metadata; + private final ColumnConfig columnConfig; + private final Metadata reconstructedMetadata; + private final List ordering; + + // projection metadata for matching + private final SortedSet projections; + private final Map projectionsMap; + private final Map projectionSpecs; + + // segment-internal file prefix for the base table projection, used to translate column names to descriptor keys + private final String baseProjectionPrefix; + + // base table columns, built at construction time. each entry's supplier defers both mapFile() and column + // deserialization until the column is actually accessed, so queries only trigger downloads for the specific + // columns they use. + private final Map> baseColumns; + + // projection columns, keyed by projection name. built on demand (per-projection) when the projection is matched. + // within each projection, per-column suppliers defer both mapFile() and deserialization. + private final ConcurrentHashMap>> projectionColumnsByName = + new ConcurrentHashMap<>(); + + // lazy dimension handlers + private final Supplier> dimensionHandlers; + + public PartialQueryableIndex( + SegmentFileMetadata metadata, + PartialSegmentFileMapperV10 fileMapper, + ColumnConfig columnConfig + ) + { + this.metadata = metadata; + this.fileMapper = fileMapper; + this.columnConfig = columnConfig; + + // base table projection is always first + final ProjectionMetadata baseProjection = metadata.getProjections().get(0); + DruidException.conditionalDefensive( + Projections.BASE_TABLE_PROJECTION_NAME.equals(baseProjection.getSchema().getName()), + "Expected base table projection with name[%s], but got projection with name[%s] instead", + Projections.BASE_TABLE_PROJECTION_NAME, + baseProjection.getSchema().getName() + ); + final BaseTableProjectionSchema baseSchema = (BaseTableProjectionSchema) baseProjection.getSchema(); + this.baseNumRows = baseProjection.getNumRows(); + this.baseProjectionPrefix = Projections.getProjectionSegmentInternalFilePrefix(baseSchema); + this.dataInterval = Intervals.of(metadata.getInterval()); + this.bitmapFactory = metadata.getBitmapEncoding().getBitmapFactory(); + this.availableDimensions = new ListIndexed<>(baseSchema.getDimensionNames()); + + // build column names (dimensions first, then other columns, excluding __time) + final LinkedHashSet dimsFirst = new LinkedHashSet<>(baseSchema.getDimensionNames()); + for (String columnName : baseSchema.getColumnNames()) { + if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) { + dimsFirst.add(columnName); + } + } + this.columnNames = List.copyOf(dimsFirst); + + // build aggregate projection metadata for matching + final List aggProjections = new ArrayList<>(); + this.projectionSpecs = new ConcurrentHashMap<>(); + boolean first = true; + for (ProjectionMetadata projectionSpec : metadata.getProjections()) { + if (first) { + first = false; + continue; + } + if (projectionSpec.getSchema() instanceof AggregateProjectionSchema) { + aggProjections.add( + new AggregateProjectionMetadata( + (AggregateProjectionSchema) projectionSpec.getSchema(), + projectionSpec.getNumRows() + ) + ); + projectionSpecs.put(projectionSpec.getSchema().getName(), projectionSpec); + } + } + + this.reconstructedMetadata = baseSchema.asMetadata(aggProjections); + if (reconstructedMetadata.getOrdering() != null) { + this.ordering = SimpleQueryableIndex.ORDERING_INTERNER.intern(reconstructedMetadata.getOrdering()); + } else { + this.ordering = Cursors.ascendingTimeOrder(); + } + + this.projectionsMap = Maps.newHashMapWithExpectedSize(aggProjections.size()); + this.projections = new ObjectAVLTreeSet<>(AggregateProjectionMetadata.COMPARATOR); + for (AggregateProjectionMetadata projection : aggProjections) { + projections.add(projection); + projectionsMap.put(projection.getSchema().getName(), projection); + } + + // build per-column suppliers for the base table. each supplier is memoized and defers both mapFile() and + // deserialization until the column is accessed. + this.baseColumns = buildProjectionColumnSuppliers(baseProjection, Map.of()); + + this.dimensionHandlers = Suppliers.memoize(this::initDimensionHandlers); + } + + @Override + public Interval getDataInterval() + { + return dataInterval; + } + + @Override + public int getNumRows() + { + return baseNumRows; + } + + @Override + public Indexed getAvailableDimensions() + { + return availableDimensions; + } + + @Override + public BitmapFactory getBitmapFactoryForDimensions() + { + return bitmapFactory; + } + + @Override + public Metadata getMetadata() + { + return reconstructedMetadata; + } + + @Override + public Map getDimensionHandlers() + { + return dimensionHandlers.get(); + } + + @Override + public List getColumnNames() + { + return columnNames; + } + + @Override + public List getOrdering() + { + return ordering; + } + + @Nullable + @Override + public BaseColumnHolder getColumnHolder(String columnName) + { + final Supplier supplier = baseColumns.get(columnName); + return supplier == null ? null : supplier.get(); + } + + /** + * Answers from metadata without triggering column downloads. The default implementation in {@link QueryableIndex} + * calls {@link #getColumnHolder(String)}, which would force a base table load. + *

+ * Only {@link ColumnCapabilities#getType()} and {@link ColumnCapabilities#hasMultipleValues()} are populated from + * the metadata; richer fields ({@code isDictionaryEncoded}, {@code hasBitmapIndexes}, {@code hasNulls}, etc.) keep + * their default/UNKNOWN values. This is intentional: + *

    + *
  • The hot caller is the broker computing SQL schema via {@link CursorFactory#getRowSignature()}, which only + * reads {@link ColumnCapabilities#getType()}; that path must NOT trigger downloads.
  • + *
  • Demanding callers (e.g. {@code DimensionHandlerUtils}, {@code ExpressionPlanner}, the cursor's column + * selector factory) obtain capabilities via {@code ColumnHolder.getCapabilities()} through + * {@code ColumnCache} at cursor execution time, which loads the column on access and returns accurate + * capabilities.
  • + *
  • Projection matching only needs column existence and type, both derivable from {@link ColumnDescriptor}.
  • + *
+ * The known problem with this is {@code SegmentAnalyzer} via {@code QueryableIndexPhysicalSegmentInspector}, which + * reads {@code isDictionaryEncoded()} from these capabilities. That can result in different results for non-default + * analysis on STRING columns under SegmentMetadataQuery. If it becomes a problem, we should problem fix the analyzer + * to source richer fields from the column holder, not this method. + */ + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + // look up the column in the base table projection's namespace + final String smooshName = baseProjectionPrefix + column; + final ColumnDescriptor descriptor = metadata.getColumnDescriptors().get(smooshName); + if (descriptor == null) { + return null; + } + return ColumnCapabilitiesImpl.createDefault() + .setType(descriptor.toColumnType()) + .setHasMultipleValues(descriptor.isHasMultipleValues()); + } + + @Nullable + @Override + public QueryableProjection getProjection(CursorBuildSpec cursorBuildSpec) + { + return Projections.findMatchingProjection( + cursorBuildSpec, + projections, + dataInterval, + (projectionName, columnName) -> { + // check if the projection has this column using metadata column descriptors + final ProjectionMetadata projSpec = projectionSpecs.get(projectionName); + if (projSpec == null) { + return false; + } + final String smooshName = Projections.getProjectionSegmentInternalFileName(projSpec.getSchema(), columnName); + return metadata.getColumnDescriptors().containsKey(smooshName) + || getColumnCapabilities(columnName) == null; + }, + this::getProjectionQueryableIndex + ); + } + + @Nullable + @Override + public QueryableIndex getProjectionQueryableIndex(String name) + { + final AggregateProjectionMetadata projectionMeta = projectionsMap.get(name); + if (projectionMeta == null) { + return null; + } + + // build per-column suppliers for this projection on first access. the suppliers themselves still defer download + // and deserialization until individual columns are read. + final Map> projColumns = projectionColumnsByName.computeIfAbsent( + name, + projName -> buildProjectionColumnSuppliers(projectionSpecs.get(projName), baseColumns) + ); + + final Metadata projectionMetadata = new Metadata( + null, + projectionMeta.getSchema().getAggregators(), + null, + null, + true, + projectionMeta.getSchema().getOrderingWithTimeColumnSubstitution(), + null + ); + + return new SimpleQueryableIndex( + dataInterval, + new ListIndexed<>( + projectionMeta.getSchema() + .getGroupingColumns() + .stream() + .filter(x -> !x.equals(projectionMeta.getSchema().getTimeColumnName())) + .collect(Collectors.toList()) + ), + bitmapFactory, + projColumns, + fileMapper, + projectionMetadata, + null + ) + { + @Override + public Metadata getMetadata() + { + return projectionMetadata; + } + + @Override + public int getNumRows() + { + return projectionMeta.getNumRows(); + } + + @Override + public List getOrdering() + { + return projectionMeta.getSchema().getOrderingWithTimeColumnSubstitution(); + } + }; + } + + @Override + public void close() + { + fileMapper.close(); + } + + private Map initDimensionHandlers() + { + final Map handlers = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + final ColumnHolder columnHolder = getColumnHolder(dim); + if (columnHolder != null) { + handlers.put(dim, columnHolder.getColumnFormat().getColumnHandler(dim)); + } + } + return handlers; + } + + /** + * Build a map of column name to per-column supplier for the given projection. Each supplier defers both + * {@link SegmentFileMapper#mapFile} and {@link ColumnDescriptor#read} until the column is actually accessed, so + * queries only trigger downloads for the specific columns they use. + */ + private Map> buildProjectionColumnSuppliers( + ProjectionMetadata projectionSpec, + Map> parentColumns + ) + { + final String timeColumnName = projectionSpec.getSchema().getTimeColumnName(); + final boolean renameTime = !ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName); + final Map> projectionColumns = new LinkedHashMap<>(); + + for (String column : projectionSpec.getSchema().getColumnNames()) { + final String smooshName = Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column); + final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName); + if (columnDescriptor == null) { + continue; + } + + final String internedColumnName = SmooshedFileMapper.STRING_INTERNER.intern(column); + final Supplier columnSupplier = Suppliers.memoize(() -> { + try { + final ByteBuffer colBuffer = fileMapper.mapFile(smooshName); + final BaseColumnHolder parentColumn = + parentColumns.containsKey(column) ? parentColumns.get(column).get() : null; + return columnDescriptor.read(colBuffer, columnConfig, fileMapper, parentColumn); + } + catch (IOException e) { + throw DruidException.defensive(e, "Failed to load column[%s]", smooshName); + } + }); + + projectionColumns.put(internedColumnName, columnSupplier); + + if (column.equals(timeColumnName) && renameTime) { + projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME, projectionColumns.get(column)); + projectionColumns.remove(column); + } + } + + if (timeColumnName == null) { + projectionColumns.put( + ColumnHolder.TIME_COLUMN_NAME, + ConstantTimeColumn.makeConstantTimeSupplier(projectionSpec.getNumRows(), dataInterval.getStartMillis()) + ); + } + + return projectionColumns; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java index c988d26440a2..47f13ca019e6 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnDescriptor.java @@ -78,6 +78,25 @@ public List getParts() return parts; } + /** + * Derive the {@link ColumnType} for this column from the part serdes, without deserializing column data. This is + * useful for answering schema queries from metadata alone. + *

+ * Part serdes that carry additional type information (such as complex type names or array element types) provide it + * via {@link ColumnPartSerde#getColumnType()}. If no part serde provides a type, falls back to a simple + * {@link ColumnType} from {@link #getValueType()}. + */ + public ColumnType toColumnType() + { + for (ColumnPartSerde part : parts) { + final ColumnType type = part.getColumnType(); + if (type != null) { + return type; + } + } + return new ColumnType(valueType, null, null); + } + @Override public long getSerializedSize() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java index 79d52ab384fd..c622ec756418 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java +++ b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java @@ -315,6 +315,16 @@ public long getDownloadedBytes() return total; } + /** + * The internal file names that have been downloaded so far, scoped to this mapper. External mappers' downloaded + * files are not included; call {@link #getDownloadedFiles()} on each external mapper directly if needed. Primarily + * intended for tests and diagnostics. + */ + public Set getDownloadedFiles() + { + return Set.copyOf(downloadedFiles); + } + @Override public void close() { diff --git a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java index 0415d540e24f..1b8323e5bc81 100644 --- a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java +++ b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java @@ -509,12 +509,12 @@ public static String getProjectionSmooshV9Prefix(AggregateProjectionMetadata pro return projectionSpec.getSchema().getName() + "/"; } - public static String getProjectionSmooshFileName(ProjectionSchema schema, String columnName) + public static String getProjectionSegmentInternalFileName(ProjectionSchema schema, String columnName) { - return getProjectionSmooshPrefix(schema) + columnName; + return getProjectionSegmentInternalFilePrefix(schema) + columnName; } - public static String getProjectionSmooshPrefix(ProjectionSchema projectionSchema) + public static String getProjectionSegmentInternalFilePrefix(ProjectionSchema projectionSchema) { return projectionSchema.getName() + "/"; } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java index f48b46fbbdde..dd387c8d7e7e 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ColumnPartSerde.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -53,6 +54,18 @@ public interface ColumnPartSerde */ Deserializer getDeserializer(); + /** + * Returns the {@link ColumnType} for this part serde, if known. This is used to determine the column type from + * metadata without deserializing the column data. Returns {@code null} by default; implementations that carry + * type information beyond what {@link org.apache.druid.segment.column.ColumnDescriptor#getValueType()} provides + * (such as complex type names or array element types) should override this. + */ + @Nullable + default ColumnType getColumnType() + { + return null; + } + interface Deserializer { void read( diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java index f8220d40046e..0ec98414ff34 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; @@ -68,6 +70,12 @@ public String getTypeName() return typeName; } + @Override + public ColumnType getColumnType() + { + return new ColumnType(ValueType.COMPLEX, typeName, null); + } + @Override public Serializer getSerializer() { diff --git a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java index 5bf50769b55e..aa2070e7a00b 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java @@ -168,6 +168,12 @@ public Deserializer getDeserializer() return new NestedColumnDeserializer(); } + @Override + public ColumnType getColumnType() + { + return logicalType; + } + @JsonProperty("logicalType") public ColumnType getLogicalType() { diff --git a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java new file mode 100644 index 000000000000..bb3e865fa118 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexTest.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.ListBasedInputRow; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentRangeReader; +import org.apache.druid.segment.projections.QueryableProjection; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +class PartialQueryableIndexTest extends InitializedNullHandlingTest +{ + private static final ColumnConfig COLUMN_CONFIG = ColumnConfig.DEFAULT; + private static final DateTime TIME = DateTimes.of("2025-01-01"); + + private static final RowSignature ROW_SIGNATURE = RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("metric1", ColumnType.LONG) + .build(); + + private static final List PROJECTIONS = Collections.singletonList( + AggregateProjectionSpec.builder("dim1_hourly_metric1_sum") + .virtualColumns( + Granularities.toVirtualColumn( + Granularities.HOUR, + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME + ) + ) + .groupingColumns( + new StringDimensionSchema("dim1"), + new LongDimensionSchema(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME) + ) + .aggregators( + new LongSumAggregatorFactory("_metric1_sum", "metric1"), + new CountAggregatorFactory("_count") + ) + .build() + ); + + private static final List ROWS = Arrays.asList( + new ListBasedInputRow(ROW_SIGNATURE, TIME, ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", "x", 1L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1), ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", "y", 2L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", "x", 3L)), + new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3), ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", "y", 4L)) + ); + + @TempDir + static File sharedTempDir; + + // the built V10 segment directory, shared across tests since it's read-only + private static File segmentDir; + + @BeforeAll + static void buildSegment() + { + final File tmpDir = new File(sharedTempDir, "build_" + ThreadLocalRandom.current().nextInt()); + segmentDir = IndexBuilder.create() + .useV10() + .tmpDir(tmpDir) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec( + DimensionsSpec.builder() + .setDimensions( + List.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("metric1") + ) + ) + .build() + ) + .withRollup(false) + .withMinTimestamp(TIME.getMillis()) + .withProjections(PROJECTIONS) + .build() + ) + .indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.ZSTD).build()) + .rows(ROWS) + .buildMMappedIndexFile(); + } + + @Test + void testSchemaWithoutDownloads() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("schema"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + rangeReader.resetCount(); + + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + // all these should work without triggering any range reads (downloads) + Assertions.assertNotNull(index.getDataInterval()); + Assertions.assertEquals(4, index.getNumRows()); + Assertions.assertNotNull(index.getAvailableDimensions()); + Assertions.assertNotNull(index.getMetadata()); + Assertions.assertNotNull(index.getOrdering()); + Assertions.assertFalse(index.getColumnNames().isEmpty()); + Assertions.assertNotNull(index.getBitmapFactoryForDimensions()); + + // no downloads triggered + Assertions.assertEquals(0, rangeReader.getReadCount()); + Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames()); + } + } + + @Test + void testGetColumnCapabilitiesFromMetadata() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("caps"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + rangeReader.resetCount(); + + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + // string dimension + ColumnCapabilities dim1Caps = index.getColumnCapabilities("dim1"); + Assertions.assertNotNull(dim1Caps); + Assertions.assertEquals(ValueType.STRING, dim1Caps.getType()); + + // long metric + ColumnCapabilities metric1Caps = index.getColumnCapabilities("metric1"); + Assertions.assertNotNull(metric1Caps); + Assertions.assertEquals(ValueType.LONG, metric1Caps.getType()); + + // time column + ColumnCapabilities timeCaps = index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME); + Assertions.assertNotNull(timeCaps); + + // non-existent column + Assertions.assertNull(index.getColumnCapabilities("nonexistent")); + + // no downloads triggered + Assertions.assertEquals(0, rangeReader.getReadCount()); + Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames()); + } + } + + @Test + void testGetColumnHolderTriggersBaseTableLoad() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("colholder"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + rangeReader.resetCount(); + + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + // no downloads yet + Assertions.assertEquals(0, rangeReader.getReadCount()); + + // accessing a column holder should trigger downloads + Assertions.assertNotNull(index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME)); + Assertions.assertTrue(rangeReader.getReadCount() > 0); + + Assertions.assertNotNull(index.getColumnHolder("dim1")); + Assertions.assertNull(index.getColumnHolder("nonexistent")); + + // all reads went to the V10 main segment file (no externals queried) + Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME), rangeReader.getReadFilenames()); + } + } + + @Test + void testGetProjectionMatchesFromMetadataAndLoadsLazily() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("projection"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + // build a CursorBuildSpec that should match the projection + final CursorBuildSpec matchingSpec = CursorBuildSpec.builder() + .setInterval(index.getDataInterval()) + .setPhysicalColumns(Set.of("dim1", "metric1")) + .setGroupingColumns(Collections.singletonList("dim1")) + .setVirtualColumns( + VirtualColumns.create( + Granularities.toVirtualColumn(Granularities.HOUR, Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME) + ) + ) + .setAggregators( + List.of( + new LongSumAggregatorFactory("_metric1_sum", "metric1"), + new CountAggregatorFactory("_count") + ) + ) + .build(); + + rangeReader.resetCount(); + + final QueryableProjection projection = index.getProjection(matchingSpec); + Assertions.assertNotNull(projection, "projection should match"); + + // matching the projection itself shouldn't trigger any downloads, it's metadata-based + Assertions.assertEquals(0, rangeReader.getReadCount(), "matching should not download files"); + Assertions.assertEquals(Set.of(), rangeReader.getReadFilenames(), "matching should not download files"); + Assertions.assertEquals(Set.of(), mapper.getDownloadedFiles(), "matching should not download files"); + + final QueryableIndex projIndex = projection.getRowSelector(); + Assertions.assertNotNull(projIndex); + Assertions.assertEquals(0, rangeReader.getReadCount(), "this should not download files either"); + // actually accessing a column on the projection triggers the column's download + Assertions.assertNotNull(projIndex.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME)); + Assertions.assertTrue(rangeReader.getReadCount() > 0, "accessing a projection column should download"); + Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME), rangeReader.getReadFilenames()); + + // downloaded files are scoped to the matched projection's namespace, not the base table (if no shared parts) + final Set downloaded = mapper.getDownloadedFiles(); + Assertions.assertTrue( + downloaded.stream().anyMatch(name -> name.startsWith("dim1_hourly_metric1_sum/")), + "expected at least one file from the matched projection's namespace, got " + downloaded + ); + Assertions.assertTrue( + downloaded.stream().noneMatch(name -> name.startsWith("__base/")), + "no base table files should be downloaded when only the projection was accessed, got " + downloaded + ); + + // fetching a projection column which has a base table parent does download base table stuff + Assertions.assertNotNull(projIndex.getColumnHolder("dim1")); + final Set downloadedAfterDim1 = mapper.getDownloadedFiles(); + Assertions.assertTrue( + downloadedAfterDim1.stream().anyMatch(name -> name.startsWith("dim1_hourly_metric1_sum/")), + "expected at least one file from the matched projection's namespace, got " + downloadedAfterDim1 + ); + Assertions.assertTrue( + downloadedAfterDim1.stream().anyMatch(name -> name.startsWith("__base/")), + "base table files should be downloaded when a projection column shares data with a base table parent, got " + downloadedAfterDim1 + ); + } + } + + @Test + void testPerColumnLaziness() throws IOException + { + // verify that accessing one column of a projection doesn't download other columns + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("per_col"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + rangeReader.resetCount(); + + // access one base table column + Assertions.assertNotNull(index.getColumnHolder("dim1")); + final int countAfterDim1 = rangeReader.getReadCount(); + Assertions.assertTrue(countAfterDim1 > 0, "accessing dim1 should trigger downloads"); + + // dim1's smoosh entry is downloaded; metric1's is not + final Set filesAfterDim1 = mapper.getDownloadedFiles(); + Assertions.assertTrue(filesAfterDim1.contains("__base/dim1"), "expected __base/dim1 in " + filesAfterDim1); + Assertions.assertFalse(filesAfterDim1.contains("__base/metric1"), "metric1 should not be downloaded yet"); + + // access the same column again should not trigger more downloads + Assertions.assertNotNull(index.getColumnHolder("dim1")); + Assertions.assertEquals(countAfterDim1, rangeReader.getReadCount(), "re-access should be cached"); + Assertions.assertEquals(filesAfterDim1, mapper.getDownloadedFiles(), "re-access should not download new files"); + + // access a different column should trigger additional downloads for its files + Assertions.assertNotNull(index.getColumnHolder("metric1")); + Assertions.assertTrue( + rangeReader.getReadCount() > countAfterDim1, + "accessing metric1 should trigger additional downloads" + ); + + // metric1's smoosh entry is now also downloaded + final Set filesAfterMetric1 = mapper.getDownloadedFiles(); + Assertions.assertTrue(filesAfterMetric1.contains("__base/dim1")); + Assertions.assertTrue(filesAfterMetric1.contains("__base/metric1"), "expected __base/metric1 in " + filesAfterMetric1); + + // all reads went to the V10 main segment file (no externals queried) + Assertions.assertEquals(Set.of(IndexIO.V10_FILE_NAME), rangeReader.getReadFilenames()); + } + } + + @Test + void testGetProjectionReturnsNullForNonAggregateQuery() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + final File cacheDir = newCacheDir("no_proj"); + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + )) { + final PartialQueryableIndex index = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + // scan query, no grouping, no aggregation, should not match any projection + final CursorBuildSpec scanSpec = CursorBuildSpec.builder() + .setInterval(index.getDataInterval()) + .build(); + + Assertions.assertNull(index.getProjection(scanSpec)); + } + } + + @Test + void testMatchesEagerQueryableIndex() throws IOException + { + // verify that the partial index produces the same schema info as the eager (full) index + final IndexIO indexIO = TestHelper.getTestIndexIO(); + final File cacheDir = newCacheDir("match_eager"); + final DirectoryRangeReader rangeReader = new DirectoryRangeReader(segmentDir); + + try ( + QueryableIndex eagerIndex = indexIO.loadIndex(segmentDir); + PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.create( + rangeReader, + TestHelper.makeJsonMapper(), + cacheDir, + IndexIO.V10_FILE_NAME, + Collections.emptyList() + ) + ) { + final PartialQueryableIndex partialIndex = new PartialQueryableIndex( + mapper.getSegmentFileMetadata(), + mapper, + COLUMN_CONFIG + ); + + Assertions.assertEquals(eagerIndex.getDataInterval(), partialIndex.getDataInterval()); + Assertions.assertEquals(eagerIndex.getNumRows(), partialIndex.getNumRows()); + final List eagerDims = new ArrayList<>(); + eagerIndex.getAvailableDimensions().forEach(eagerDims::add); + final List partialDims = new ArrayList<>(); + partialIndex.getAvailableDimensions().forEach(partialDims::add); + Assertions.assertEquals(eagerDims, partialDims); + Assertions.assertEquals(eagerIndex.getColumnNames(), partialIndex.getColumnNames()); + Assertions.assertEquals(eagerIndex.getOrdering(), partialIndex.getOrdering()); + + // verify column capabilities match for all columns + for (String colName : eagerIndex.getColumnNames()) { + final ColumnCapabilities eagerCaps = eagerIndex.getColumnCapabilities(colName); + final ColumnCapabilities partialCaps = partialIndex.getColumnCapabilities(colName); + Assertions.assertNotNull(eagerCaps, "eager caps for " + colName); + Assertions.assertNotNull(partialCaps, "partial caps for " + colName); + Assertions.assertEquals( + eagerCaps.toColumnType(), + partialCaps.toColumnType(), + "type mismatch for " + colName + ); + } + } + } + + private File newCacheDir(String name) throws IOException + { + final File dir = new File(sharedTempDir, name + "_" + ThreadLocalRandom.current().nextInt()); + FileUtils.mkdirp(dir); + return dir; + } + + static class DirectoryRangeReader implements SegmentRangeReader + { + private final File directory; + + DirectoryRangeReader(File directory) + { + this.directory = directory; + } + + @Override + public InputStream readRange(String filename, long offset, long length) throws IOException + { + File target = new File(directory, filename); + try (RandomAccessFile raf = new RandomAccessFile(target, "r")) { + final int available = (int) Math.min(length, Math.max(0, raf.length() - offset)); + byte[] data = new byte[available]; + raf.seek(offset); + raf.readFully(data); + return new ByteArrayInputStream(data); + } + } + } + + static class CountingRangeReader extends DirectoryRangeReader + { + private final AtomicInteger readCount = new AtomicInteger(0); + private final Set readFilenames = ConcurrentHashMap.newKeySet(); + + CountingRangeReader(File directory) + { + super(directory); + } + + int getReadCount() + { + return readCount.get(); + } + + Set getReadFilenames() + { + return Set.copyOf(readFilenames); + } + + void resetCount() + { + readCount.set(0); + readFilenames.clear(); + } + + @Override + public InputStream readRange(String filename, long offset, long length) throws IOException + { + readCount.incrementAndGet(); + readFilenames.add(filename); + return super.readRange(filename, offset, length); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java b/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java new file mode 100644 index 000000000000..0008c9640fff --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/column/ColumnDescriptorTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.column; + +import org.apache.druid.segment.nested.NestedCommonFormatColumnFormatSpec; +import org.apache.druid.segment.serde.ColumnPartSerde; +import org.apache.druid.segment.serde.ComplexColumnPartSerde; +import org.apache.druid.segment.serde.LongNumericColumnPartSerde; +import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteOrder; +import java.util.List; + +class ColumnDescriptorTest +{ + @Test + void testToColumnTypeFallsBackToValueTypeForSimpleSerdes() + { + // simple numeric serdes don't carry extra type info, so toColumnType() falls back to ValueType + final ColumnPartSerde longSerde = LongNumericColumnPartSerde.serializerBuilder() + .withByteOrder(ByteOrder.nativeOrder()) + .build(); + final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.LONG, false, List.of(longSerde)); + + Assertions.assertEquals(ColumnType.LONG, descriptor.toColumnType()); + } + + @Test + void testToColumnTypeUsesComplexTypeNameFromPartSerde() + { + // complex columns carry the complex type name in the part serde; toColumnType() must preserve it + final ColumnPartSerde complexSerde = ComplexColumnPartSerde.serializerBuilder() + .withTypeName("hyperUnique") + .build(); + final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.COMPLEX, false, List.of(complexSerde)); + + final ColumnType columnType = descriptor.toColumnType(); + Assertions.assertEquals(ValueType.COMPLEX, columnType.getType()); + Assertions.assertEquals("hyperUnique", columnType.getComplexTypeName()); + } + + @Test + void testToColumnTypeUsesLogicalTypeFromNestedPartSerde() + { + // nested common format columns carry the full logical type (including array element types) in the part serde + final ColumnPartSerde nestedSerde = + NestedCommonFormatColumnPartSerde.serializerBuilder() + .withLogicalType(ColumnType.STRING_ARRAY) + .withHasNulls(false) + .withColumnFormatSpec(NestedCommonFormatColumnFormatSpec.builder().build()) + .withByteOrder(ByteOrder.nativeOrder()) + .build(); + final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.ARRAY, false, List.of(nestedSerde)); + + Assertions.assertEquals(ColumnType.STRING_ARRAY, descriptor.toColumnType()); + } + + @Test + void testToColumnTypeEmptyPartsFallsBackToValueType() + { + final ColumnDescriptor descriptor = new ColumnDescriptor(ValueType.STRING, false, List.of()); + final ColumnType columnType = descriptor.toColumnType(); + Assertions.assertEquals(ValueType.STRING, columnType.getType()); + Assertions.assertNull(columnType.getComplexTypeName()); + } +}