diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index 9436a49ee355..9df76b6e9ca4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -24,6 +24,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +36,7 @@ import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.map.ImmutableMapDataSource; +import org.apache.pinot.segment.local.segment.index.openstruct.ImmutableOpenStructDataSource; import org.apache.pinot.segment.local.segment.index.readers.text.MultiColumnLuceneTextIndexReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; @@ -49,6 +52,7 @@ import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; +import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -58,7 +62,9 @@ import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.OpenStructNaming; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -98,13 +104,44 @@ public ImmutableSegmentImpl( _dataSources = new Object2ObjectOpenHashMap<>(segmentMetadata.getColumnMetadataMap().size()); + Map> openStructDenseChildren = new HashMap<>(); + Map openStructSparseChildren = new HashMap<>(); + Set openStructParents = new HashSet<>(); + for (Map.Entry entry : segmentMetadata.getColumnMetadataMap().entrySet()) { String colName = entry.getKey(); ColumnMetadata columnMetadata = entry.getValue(); + + if (columnMetadata instanceof ColumnMetadataImpl && ((ColumnMetadataImpl) columnMetadata).isMaterializedChild()) { + String parent = ((ColumnMetadataImpl) columnMetadata).getParentColumn(); + openStructParents.add(parent); + DataSource childDs = new ImmutableDataSource(columnMetadata, _indexContainerMap.get(colName)); + if (OpenStructNaming.isSparseColumn(colName)) { + openStructSparseChildren.put(parent, childDs); + } else { + openStructDenseChildren.computeIfAbsent(parent, k -> new HashMap<>()) + .put(OpenStructNaming.parseKey(colName), childDs); + } + continue; + } + if (columnMetadata.getFieldSpec().getDataType() == FieldSpec.DataType.MAP) { - _dataSources.put(colName, new ImmutableMapDataSource(entry.getValue(), _indexContainerMap.get(colName))); + _dataSources.put(colName, new ImmutableMapDataSource(columnMetadata, _indexContainerMap.get(colName))); } else { - _dataSources.put(colName, new ImmutableDataSource(entry.getValue(), _indexContainerMap.get(colName))); + _dataSources.put(colName, new ImmutableDataSource(columnMetadata, _indexContainerMap.get(colName))); + } + } + + if (!openStructParents.isEmpty()) { + Schema schema = segmentMetadata.getSchema(); + for (String parent : openStructParents) { + FieldSpec fieldSpec = schema != null ? schema.getFieldSpecFor(parent) : null; + if (!(fieldSpec instanceof ComplexFieldSpec)) { + continue; + } + _dataSources.put(parent, new ImmutableOpenStructDataSource((ComplexFieldSpec) fieldSpec, + openStructDenseChildren.getOrDefault(parent, Map.of()), + openStructSparseChildren.get(parent), segmentMetadata.getTotalDocs())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java index 20c16ad962d3..88aeb655c2ab 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java @@ -49,6 +49,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.OpenStructNaming; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.ReadMode; @@ -216,6 +217,17 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad if (schema != null) { Set columnsInMetadata = new HashSet<>(columnMetadataMap.keySet()); columnsInMetadata.removeIf(schema::hasColumn); + // Materialized OPEN_STRUCT child columns (col$key, col$__sparse__) live in segment metadata + // but not in the user-facing schema. Keep them when the parent OPEN_STRUCT column is in the + // schema; they will be grouped under their parent at segment-impl post-load (Task 16). + columnsInMetadata.removeIf(col -> { + if (!OpenStructNaming.isMaterializedOpenStructColumn(col)) { + return false; + } + String parent = OpenStructNaming.parseParentColumn(col); + return schema.hasColumn(parent) + && schema.getFieldSpecFor(parent).getDataType() == FieldSpec.DataType.OPEN_STRUCT; + }); if (!columnsInMetadata.isEmpty()) { LOGGER.info("Skip loading columns only exist in metadata but not in schema: {}", columnsInMetadata); for (String column : columnsInMetadata) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index edf4909a4bbd..7c9683c72a34 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -66,6 +66,8 @@ import org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource; +import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructDataSource; +import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructIndex; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext; @@ -107,6 +109,7 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.config.table.ingestion.AggregationConfig; @@ -302,7 +305,8 @@ public MultiColumnTextMetadata getMultiColumnTextMetadata() { Set specialIndexes = Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract - StandardIndexes.nullValueVector()); // null value vector implements other contract + StandardIndexes.nullValueVector(), // null value vector implements other contract + StandardIndexes.openStruct()); // open-struct is constructed out-of-band below // Initialize for each column boolean hasColumnWithReuseMutableTextIndex = false; @@ -428,6 +432,15 @@ public MultiColumnTextMetadata getMultiColumnTextMetadata() { } } + if (dataType == DataType.OPEN_STRUCT && fieldSpec instanceof ComplexFieldSpec) { + IndexConfig openStructConfig = indexConfigs.getConfig(StandardIndexes.openStruct()); + if (openStructConfig instanceof OpenStructIndexConfig && openStructConfig.isEnabled()) { + MutableOpenStructIndex openStructIndex = new MutableOpenStructIndex(column, (ComplexFieldSpec) fieldSpec, + (OpenStructIndexConfig) openStructConfig, _memoryManager, _capacity); + mutableIndexes.put(StandardIndexes.openStruct(), openStructIndex); + } + } + _indexContainerMap.put(column, new IndexContainer(fieldSpec, partitionFunction, partitions, new ValuesInfo(), mutableIndexes, dictionary, nullValueVector, sourceColumn, valueAggregator)); @@ -569,7 +582,7 @@ private void addMutableIndex(Map indexEntry : indexContainer._mutableIndexes.entrySet()) { @@ -1310,6 +1334,20 @@ public void commit() { } } + /** + * Returns the per-column mutable OPEN_STRUCT index, or {@code null} if the column is not OPEN_STRUCT + * or the index has not been initialized. + */ + @Nullable + public MutableOpenStructIndex getOpenStructIndex(String column) { + IndexContainer container = _indexContainerMap.get(column); + if (container == null) { + return null; + } + MutableIndex index = container._mutableIndexes.get(StandardIndexes.openStruct()); + return index instanceof MutableOpenStructIndex ? (MutableOpenStructIndex) index : null; + } + @Override public void offload() { if (_partitionUpsertMetadataManager != null) { @@ -1695,7 +1733,10 @@ private class IndexContainer implements Closeable { @Nullable Set partitions, ValuesInfo valuesInfo, Map mutableIndexes, @Nullable MutableDictionary dictionary, @Nullable MutableNullValueVector nullValueVector, @Nullable String sourceColumn, @Nullable ValueAggregator valueAggregator) { - Preconditions.checkArgument(mutableIndexes.containsKey(StandardIndexes.forward()), "Forward index is required"); + Preconditions.checkArgument( + mutableIndexes.containsKey(StandardIndexes.forward()) + || mutableIndexes.containsKey(StandardIndexes.openStruct()), + "Forward index or OPEN_STRUCT index is required"); _fieldSpec = fieldSpec; _mutableIndexes = mutableIndexes; _dictionary = dictionary; @@ -1708,6 +1749,11 @@ private class IndexContainer implements Closeable { } DataSource toDataSource() { + if (_fieldSpec.getDataType() == DataType.OPEN_STRUCT) { + MutableIndex idx = _mutableIndexes.get(StandardIndexes.openStruct()); + return new MutableOpenStructDataSource((ComplexFieldSpec) _fieldSpec, (MutableOpenStructIndex) idx, + _numDocsIndexed); + } if (_fieldSpec.getDataType() == MAP) { return new MutableMapDataSource(_fieldSpec, _numDocsIndexed, _valuesInfo._numValues, _valuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : _dictionary.length(), _partitionFunction, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java index bb9462af85d8..be9544707b06 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java @@ -26,6 +26,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.EmptyColumnStatistics; import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource; +import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructDataSource; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.creator.ColumnStatistics; import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsContainer; @@ -78,6 +79,13 @@ private ColumnStatistics createColumnStatistics(DataSource dataSource, @Nullable if (dataSource instanceof MutableMapDataSource) { return createMapColumnStatistics(dataSource, validDocIds, statsCollectorConfig); } + // OPEN_STRUCT parent columns have no forward index of their own — per-key stats are recomputed + // by the splitter at offline-segment build time. Return EmptyColumnStatistics so the standard + // dispatch (which would call getForwardIndex / getDictionary) doesn't NPE. + if (dataSource instanceof MutableOpenStructDataSource) { + return new EmptyColumnStatistics(dataSourceMetadata.getFieldSpec(), dataSourceMetadata.getPartitionFunction(), + dataSourceMetadata.getPartitions()); + } if (validDocIds != null) { if (dataSource.getDictionary() != null) { return new CompactedColumnStatistics(dataSource, sortedDocIds, isSortedColumn, validDocIds); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java index 12d9cf88bdcb..4469ee569f1d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java @@ -68,6 +68,7 @@ import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; @@ -555,6 +556,22 @@ protected void writeMetadata() hasDictionary, dictionaryElementSize, fwdConfig.getEncodingType(), false); } + // OPEN_STRUCT splitters produce per-key materialized child columns (col$key, col$__sparse__) that + // are not in _columnStatisticsMap. Merge their pre-built metadata into the segment properties so + // each child appears as its own column at load time. + for (ColumnIndexCreators columnIndexCreators : _colIndexes.values()) { + for (IndexCreator indexCreator : columnIndexCreators.getIndexCreators()) { + if (indexCreator instanceof ColumnarOpenStructIndexCreator) { + ColumnarOpenStructIndexCreator splitter = (ColumnarOpenStructIndexCreator) indexCreator; + for (Map.Entry childEntry + : splitter.getMaterializedColumnMetadata().entrySet()) { + PropertiesConfiguration childProps = childEntry.getValue(); + childProps.getKeys().forEachRemaining(key -> properties.setProperty(key, childProps.getProperty(key))); + } + } + } + } + SegmentZKPropsConfig segmentZKPropsConfig = _config.getSegmentZKPropsConfig(); if (segmentZKPropsConfig != null) { properties.setProperty(Realtime.START_OFFSET, segmentZKPropsConfig.getStartOffset()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java new file mode 100644 index 000000000000..bb1e900702cc --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitter.java @@ -0,0 +1,552 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.openstruct; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.segment.creator.impl.BaseSegmentCreator; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator; +import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.StatsCollectorUtil; +import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; +import org.apache.pinot.segment.local.segment.index.openstruct.OpenStructSupportedIndexes; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.OpenStructNaming; +import org.apache.pinot.spi.data.OpenStructTypeInference; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.PinotDataType; +import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Splits an OPEN_STRUCT column into per-key materialized columns using standard Pinot index + * creators. Dense keys become independent virtual columns; remaining keys go into a single + * synthetic JSON column for sparse storage. + * + *

Lifecycle: instantiated by {@code BaseSegmentCreator} for OPEN_STRUCT columns. Receives + * per-doc {@code Map} values via {@link #add(Map, int)}, accumulates in memory, + * then on {@link #seal()} writes per-key column files using standard creators. + */ +public class OpenStructColumnSplitter implements ColumnarOpenStructIndexCreator { + + private static final Logger LOGGER = LoggerFactory.getLogger(OpenStructColumnSplitter.class); + + private final File _indexDir; + private final String _columnName; + private final Map _childFieldSpecs; + private final OpenStructIndexConfig _config; + private final int _maxDenseKeys; + + // Per-key accumulation + private final Map _presenceBitmaps = new HashMap<>(); + private final Map> _values = new HashMap<>(); + private final Map _inferredTypes = new HashMap<>(); + private int _numDocs; + + // Resolved at seal time + @Nullable + private Set _resolvedDenseKeys; + private final Map _materializedColumnMetadata = new LinkedHashMap<>(); + + public OpenStructColumnSplitter(File indexDir, String columnName, FieldSpec fieldSpec, + OpenStructIndexConfig config) { + _indexDir = indexDir; + _columnName = columnName; + _config = config; + _maxDenseKeys = config.getMaxDenseKeys(); + + Map childFieldSpecs = null; + if (fieldSpec instanceof ComplexFieldSpec) { + ComplexFieldSpec complexSpec = (ComplexFieldSpec) fieldSpec; + childFieldSpecs = complexSpec.getChildFieldSpecs(); + } + _childFieldSpecs = childFieldSpecs != null ? new HashMap<>(childFieldSpecs) : new HashMap<>(); + } + + @Override + public void add(Object value, int docId) + throws IOException { + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map map = (Map) value; + addMap(map); + } else { + addMap(null); + } + } + + @Override + public void add(Map openStructValue, int docId) + throws IOException { + addMap(openStructValue); + } + + @Override + public void add(Object[] values, @Nullable int[] dictIds) + throws IOException { + throw new UnsupportedOperationException("OPEN_STRUCT index is single-value only"); + } + + /** + * Returns the resolved dense-key set after {@link #seal()} or {@link #classify()}. + * Returns an empty set before resolution. + */ + public Set getResolvedDenseKeys() { + return _resolvedDenseKeys != null ? Collections.unmodifiableSet(_resolvedDenseKeys) : Set.of(); + } + + /** + * Resolves dense vs sparse keys without writing any files. Exposed for testing and for callers + * that need the classification independent of file output. {@link #seal()} calls this internally. + */ + public Set classify() { + if (_resolvedDenseKeys != null) { + return _resolvedDenseKeys; + } + if (_numDocs == 0 || _presenceBitmaps.isEmpty()) { + _resolvedDenseKeys = new LinkedHashSet<>(); + return _resolvedDenseKeys; + } + List allKeys = new ArrayList<>(_presenceBitmaps.keySet()); + allKeys.sort((a, b) -> { + double fillA = (double) _presenceBitmaps.get(a).getCardinality() / _numDocs; + double fillB = (double) _presenceBitmaps.get(b).getCardinality() / _numDocs; + int cmp = Double.compare(fillB, fillA); + return cmp != 0 ? cmp : a.compareTo(b); + }); + + double minFillRate = _config.getDenseKeyMinFillRate(); + _resolvedDenseKeys = new LinkedHashSet<>(); + + Set configuredDenseKeys = _config.getDenseKeys(); + for (String key : configuredDenseKeys) { + if (_presenceBitmaps.containsKey(key) && (_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys)) { + _resolvedDenseKeys.add(key); + } + } + + for (String key : allKeys) { + if (_resolvedDenseKeys.contains(key)) { + continue; + } + double fillRate = (double) _presenceBitmaps.get(key).getCardinality() / _numDocs; + if ((_maxDenseKeys < 0 || _resolvedDenseKeys.size() < _maxDenseKeys) && fillRate >= minFillRate) { + _resolvedDenseKeys.add(key); + } + } + return _resolvedDenseKeys; + } + + private void addMap(@Nullable Map map) { + if (map != null && !map.isEmpty()) { + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object rawValue = entry.getValue(); + if (rawValue == null) { + continue; + } + FieldSpec keySpec = _childFieldSpecs.get(key); + DataType valueType = keySpec != null + ? keySpec.getDataType() + : _inferredTypes.computeIfAbsent(key, k -> { + DataType inferred = OpenStructTypeInference.inferDataType(rawValue); + return inferred != null ? inferred : DataType.STRING; + }); + if (!_presenceBitmaps.containsKey(key)) { + _presenceBitmaps.put(key, new RoaringBitmap()); + _values.put(key, new ArrayList<>()); + } + _presenceBitmaps.get(key).add(_numDocs); + Object coerced; + try { + PinotDataType sourceType = PinotDataType.getSingleValueType(rawValue); + PinotDataType destType = ColumnDataType.fromDataTypeSV(valueType.getStoredType()).toPinotDataType(); + coerced = destType.convert(rawValue, sourceType); + } catch (Exception e) { + LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' value '{}' to {}. Skipping.", + _columnName, key, rawValue, valueType, e); + _presenceBitmaps.get(key).remove(_numDocs); + continue; + } + _values.get(key).add(coerced); + } + } + _numDocs++; + } + + @Override + public void seal() + throws IOException { + classify(); + if (_resolvedDenseKeys == null || (_numDocs == 0 && _presenceBitmaps.isEmpty())) { + return; + } + + for (String key : _resolvedDenseKeys) { + writeDenseKeyColumn(key); + } + + List sparseKeys = new ArrayList<>(); + for (String key : _presenceBitmaps.keySet()) { + if (!_resolvedDenseKeys.contains(key)) { + sparseKeys.add(key); + } + } + if (!sparseKeys.isEmpty()) { + writeSparseJsonColumn(sparseKeys); + } + + emitParentColumnMetadata(!sparseKeys.isEmpty()); + } + + @Override + public void close() + throws IOException { + // Nothing to close — sub-creators are created and closed within seal() + } + + @Override + public Map getMaterializedColumnMetadata() { + return _materializedColumnMetadata; + } + + private void writeDenseKeyColumn(String key) + throws IOException { + String materializedCol = OpenStructNaming.materializedColumnName(_columnName, key); + FieldSpec keySpec = _childFieldSpecs.get(key); + DataType valueType = keySpec != null + ? keySpec.getDataType() + : _inferredTypes.getOrDefault(key, DataType.STRING); + DataType storedType = valueType.getStoredType(); + RoaringBitmap presence = _presenceBitmaps.get(key); + List values = _values.get(key); + + // Synthetic field spec for the materialized child. Its natural Pinot dimension null value is the value + // stored for absent docs, so column metadata stays consistent with on-disk content. + DimensionFieldSpec childFieldSpec = new DimensionFieldSpec(materializedCol, storedType, true); + Object defaultValue = childFieldSpec.getDefaultNullValue(); + + // Collect statistics the standard way: present docs contribute their value, absent docs the default + // (absent docs are also marked in the null vector below). + AbstractColumnStatisticsCollector statsCollector = + StatsCollectorUtil.createStatsCollector(childFieldSpec, null); + int statsOrdinal = 0; + for (int docId = 0; docId < _numDocs; docId++) { + statsCollector.collect(presence.contains(docId) ? values.get(statsOrdinal++) : defaultValue); + } + statsCollector.seal(); + + // Build per-key index configuration from the key's FieldConfig (falling back to the default), then apply + // the OPEN_STRUCT inverted-on default. No TableConfig/Schema required. + FieldConfig keyFieldConfig = _config.getValueFieldConfig(key); + if (keyFieldConfig == null) { + keyFieldConfig = _config.getDefaultValueFieldConfig(); + } + boolean enableInverted = _config.shouldEnableInvertedIndexForKey(key); + FieldIndexConfigs configsForDecision = new FieldIndexConfigs.Builder( + FieldIndexConfigsUtil.fromFieldConfig(keyFieldConfig, childFieldSpec)) + .add(StandardIndexes.inverted(), enableInverted ? IndexConfig.ENABLED : IndexConfig.DISABLED) + .build(); + + boolean useDictionary = resolveUseDictionary(childFieldSpec, configsForDecision, statsCollector); + + // Reconcile dictionary + forward encoding with the final decision (mirrors BaseSegmentCreator.adaptConfig); + // ForwardIndexCreatorFactory selects dict-vs-raw from the forward config's EncodingType. A compression codec + // applies only to the raw forward format (LZ4 preserves the dense child's current on-disk layout); attaching + // one to a dictionary-encoded forward is rejected by ForwardIndexType.validate. + ForwardIndexConfig.Builder forwardBuilder = new ForwardIndexConfig.Builder( + useDictionary ? FieldConfig.EncodingType.DICTIONARY : FieldConfig.EncodingType.RAW); + if (!useDictionary) { + forwardBuilder.withCompressionCodec(FieldConfig.CompressionCodec.LZ4); + } + FieldIndexConfigs fieldIndexConfigs = new FieldIndexConfigs.Builder(configsForDecision) + .add(StandardIndexes.dictionary(), + useDictionary ? DictionaryIndexConfig.DEFAULT : DictionaryIndexConfig.DISABLED) + .add(StandardIndexes.forward(), forwardBuilder.build()) + .build(); + + int dictElementSize = writeColumnIndexes(materializedCol, storedType, presence, values, + defaultValue, statsCollector, useDictionary, fieldIndexConfigs, childFieldSpec); + + NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir, materializedCol); + try { + for (int docId = 0; docId < _numDocs; docId++) { + if (!presence.contains(docId)) { + nullCreator.setNull(docId); + } + } + nullCreator.seal(); + } finally { + nullCreator.close(); + } + + PropertiesConfiguration props = new PropertiesConfiguration(); + FieldConfig.EncodingType encoding = + useDictionary ? FieldConfig.EncodingType.DICTIONARY : FieldConfig.EncodingType.RAW; + BaseSegmentCreator.addColumnMetadataInfo(props, materializedCol, statsCollector, _numDocs, childFieldSpec, + useDictionary, dictElementSize, encoding, false); + // OPEN_STRUCT-specific keys not written by addColumnMetadataInfo. + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, V1Constants.MetadataKeys.Column.PARENT_COLUMN), + _columnName); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, "hasNullValue"), true); + if (enableInverted && useDictionary) { + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(materializedCol, "hasInvertedIndex"), true); + } + _materializedColumnMetadata.put(materializedCol, props); + } + + /** + * Decides dictionary vs raw encoding for a materialized child column, mirroring the three steps of + * {@code BaseSegmentCreator.createDictionaryForColumn} with standard default flags (optimizeDictionary + * off => dictionary unless explicitly disabled and not required by an enabled index). + */ + private boolean resolveUseDictionary(FieldSpec childFieldSpec, FieldIndexConfigs fieldIndexConfigs, + AbstractColumnStatisticsCollector statsCollector) { + if (DictionaryIndexConfig.requiresDictionary(childFieldSpec, fieldIndexConfigs)) { + return true; + } + if (fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isDisabled()) { + return false; + } + return DictionaryIndexType.ignoreDictionaryOverride(false, false, + IndexingConfig.DEFAULT_NO_DICTIONARY_SIZE_RATIO_THRESHOLD, null, childFieldSpec, fieldIndexConfigs, + statsCollector.getCardinality(), statsCollector.getTotalNumberOfEntries()); + } + + /** + * Writes the dictionary (when used) plus all vetted, enabled indexes for a materialized child column through + * the standard index-creator family, driven from a single per-doc loop. Returns the dictionary element size in + * bytes (0 when raw-encoded), for column metadata. The dictionary is built separately because its build + * lifecycle is CUSTOM and it supplies the dictIds the per-row creators consume. + */ + private int writeColumnIndexes(String materializedCol, DataType storedType, RoaringBitmap presence, + List values, Object defaultValue, AbstractColumnStatisticsCollector statsCollector, + boolean useDictionary, FieldIndexConfigs fieldIndexConfigs, FieldSpec childFieldSpec) + throws IOException { + int dictElementSize = 0; + SegmentDictionaryCreator dictCreator = null; + try { + if (useDictionary) { + dictCreator = new SegmentDictionaryCreator(materializedCol, storedType, + new File(_indexDir, materializedCol + V1Constants.Dict.FILE_EXTENSION), true); + dictCreator.build(statsCollector.getUniqueValuesSet()); + } + + // Index-creation context built from the sealed collector (a ColumnShape) — no TableConfig required. + IndexCreationContext context = + new IndexCreationContext.Builder(_indexDir, null, statsCollector, useDictionary, false) + .withOnHeap(false).build(); + + List creators = new ArrayList<>(); + try { + for (IndexType indexType : IndexService.getInstance().getAllIndexes()) { + if (indexType.getIndexBuildLifecycle() != IndexType.BuildLifecycle.DURING_SEGMENT_CREATION) { + continue; // excludes dictionary (lifecycle CUSTOM), built separately above + } + if (!OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexType.getPrettyName())) { + continue; // non-vetted indexes already rejected at table-config validation; defensive backstop + } + IndexCreator creator = createColumnIndexCreator(indexType, context, fieldIndexConfigs, materializedCol, + childFieldSpec); + if (creator != null) { + creators.add(creator); + } + } + + int ordinal = 0; + for (int docId = 0; docId < _numDocs; docId++) { + Object value = presence.contains(docId) ? values.get(ordinal++) : defaultValue; + int dictId = useDictionary ? dictCreator.indexOfSV(value) : -1; + for (IndexCreator creator : creators) { + creator.add(value, dictId); + } + } + for (IndexCreator creator : creators) { + creator.seal(); + } + } finally { + for (IndexCreator creator : creators) { + creator.close(); + } + } + + // Seal the dictionary only after the index writes succeeded; capture its element size for metadata. + if (dictCreator != null) { + dictElementSize = dictCreator.getNumBytesPerEntry(); + dictCreator.seal(); + } + } finally { + if (dictCreator != null) { + dictCreator.close(); + } + } + return dictElementSize; + } + + @Nullable + private static IndexCreator createColumnIndexCreator(IndexType indexType, + IndexCreationContext context, FieldIndexConfigs fieldIndexConfigs, String materializedCol, + FieldSpec childFieldSpec) + throws IOException { + // Materialized child columns exist in no schema/TableConfig, so the standard table-config-time validation + // never sees them. Run the index type's own guards here against the resolved child FieldSpec (e.g. range + // rejects a non-numeric column without a dictionary) so misconfigurations fail with the canonical message + // instead of crashing opaquely inside the creator. validate() internally no-ops when the index is disabled. + indexType.validate(fieldIndexConfigs, childFieldSpec, null); + C config = fieldIndexConfigs.getConfig(indexType); + if (!config.isEnabled() || !indexType.shouldCreateIndex(context, config)) { + return null; + } + try { + return indexType.createIndexCreator(context, config); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("Failed to create " + indexType.getPrettyName() + " creator for: " + materializedCol, e); + } + } + + private void writeSparseJsonColumn(List sparseKeys) + throws IOException { + String sparseCol = OpenStructNaming.sparseColumnName(_columnName); + int maxLen = 1; + String[] jsonPerDoc = new String[_numDocs]; + int nonNullCount = 0; + for (int docId = 0; docId < _numDocs; docId++) { + Map sparseEntries = new LinkedHashMap<>(); + for (String key : sparseKeys) { + RoaringBitmap presence = _presenceBitmaps.get(key); + if (presence != null && presence.contains(docId)) { + int ordinal = presence.rank(docId) - 1; + sparseEntries.put(key, _values.get(key).get(ordinal)); + } + } + if (!sparseEntries.isEmpty()) { + try { + String json = JsonUtils.objectToString(sparseEntries); + jsonPerDoc[docId] = json; + maxLen = Math.max(maxLen, json.getBytes(StandardCharsets.UTF_8).length); + nonNullCount++; + } catch (IOException e) { + throw new RuntimeException("Failed to serialize sparse entries for docId " + docId, e); + } + } + } + + SingleValueVarByteRawIndexCreator fwdCreator = new SingleValueVarByteRawIndexCreator( + _indexDir, ChunkCompressionType.LZ4, sparseCol, _numDocs, DataType.STRING, maxLen); + NullValueVectorCreator nullCreator = new NullValueVectorCreator(_indexDir, sparseCol); + try { + for (int docId = 0; docId < _numDocs; docId++) { + if (jsonPerDoc[docId] != null) { + fwdCreator.putString(jsonPerDoc[docId]); + } else { + fwdCreator.putString(""); + nullCreator.setNull(docId); + } + } + fwdCreator.seal(); + nullCreator.seal(); + } finally { + fwdCreator.close(); + nullCreator.close(); + } + + PropertiesConfiguration props = new PropertiesConfiguration(); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.DATA_TYPE), + DataType.STRING.name()); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.COLUMN_TYPE), + FieldSpec.FieldType.DIMENSION.name()); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.IS_SINGLE_VALUED), true); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.TOTAL_DOCS), + _numDocs); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.CARDINALITY), + nonNullCount); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES), + _numDocs); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY), false); + props.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, "hasNullValue"), true); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(sparseCol, V1Constants.MetadataKeys.Column.PARENT_COLUMN), + _columnName); + _materializedColumnMetadata.put(sparseCol, props); + } + + private void emitParentColumnMetadata(boolean hasSparseColumn) { + PropertiesConfiguration props = new PropertiesConfiguration(); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.COLUMN_NAME), + _columnName); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.DATA_TYPE), + FieldSpec.DataType.OPEN_STRUCT.name()); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.COLUMN_TYPE), + FieldSpec.FieldType.COMPLEX.name()); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.IS_SINGLE_VALUED), + true); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.TOTAL_DOCS), + _numDocs); + props.setProperty( + V1Constants.MetadataKeys.Column.getKeyFor(_columnName, V1Constants.MetadataKeys.Column.HAS_SPARSE_COLUMN), + hasSparseColumn); + _materializedColumnMetadata.put(_columnName, props); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java index e548248dea16..9bf8f7f3d486 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java @@ -56,13 +56,28 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist protected Comparable _previousValue = null; public AbstractColumnStatisticsCollector(String column, StatsCollectorConfig statsCollectorConfig) { - _fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column); - Preconditions.checkArgument(_fieldSpec != null, "Failed to find column: %s", column); - _storedType = _fieldSpec.getDataType().getStoredType(); - _sorted = _fieldSpec.isSingleValueField(); - _fieldConfig = statsCollectorConfig.getFieldConfigForColumn(column); - _partitionFunction = statsCollectorConfig.getPartitionFunction(column); - _partitions = _partitionFunction != null ? new HashSet<>() : null; + this(getRequiredFieldSpec(column, statsCollectorConfig), + statsCollectorConfig.getFieldConfigForColumn(column), + statsCollectorConfig.getPartitionFunction(column)); + } + + /// Constructs a collector directly from a [FieldSpec], without a [StatsCollectorConfig]. Lets callers + /// that operate outside schema-driven segment generation (e.g. OPEN_STRUCT materialized child columns, + /// whose synthetic columns exist in no schema) reuse the standard stats collectors. + public AbstractColumnStatisticsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + _fieldSpec = fieldSpec; + _storedType = fieldSpec.getDataType().getStoredType(); + _sorted = fieldSpec.isSingleValueField(); + _fieldConfig = fieldConfig; + _partitionFunction = partitionFunction; + _partitions = partitionFunction != null ? new HashSet<>() : null; + } + + private static FieldSpec getRequiredFieldSpec(String column, StatsCollectorConfig statsCollectorConfig) { + FieldSpec fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column); + Preconditions.checkArgument(fieldSpec != null, "Failed to find column: %s", column); + return fieldSpec; } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java index dd0dcfe00b9a..926611d2f7f7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.math.BigDecimal; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -40,6 +44,11 @@ public BigDecimalColumnPreIndexStatsCollector(String column, StatsCollectorConfi super(column, statsCollectorConfig); } + public BigDecimalColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPreIndexStatsCollector.java index 371e5bbf51d3..169d4d61f446 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.Arrays; import java.util.Set; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.ByteArray; @@ -40,6 +44,11 @@ public BytesColumnPreIndexStatsCollector(String column, StatsCollectorConfig sta super(column, statsCollectorConfig); } + public BytesColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java index 8024460c004c..094a4cd62408 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet; import it.unimi.dsi.fastutil.doubles.DoubleSet; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector { @@ -34,6 +38,11 @@ public DoubleColumnPreIndexStatsCollector(String column, StatsCollectorConfig st super(column, statsCollectorConfig); } + public DoubleColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java index e26f15293358..a5b2604f53b6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.floats.FloatOpenHashSet; import it.unimi.dsi.fastutil.floats.FloatSet; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector { @@ -34,6 +38,11 @@ public FloatColumnPreIndexStatsCollector(String column, StatsCollectorConfig sta super(column, statsCollectorConfig); } + public FloatColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java index 249bf894c55f..1c10b7089730 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector { @@ -34,6 +38,11 @@ public IntColumnPreIndexStatsCollector(String column, StatsCollectorConfig stats super(column, statsCollectorConfig); } + public IntColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java index c8d98f1d5d5b..f19d94388884 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java @@ -21,7 +21,11 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.longs.LongSet; import java.util.Arrays; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector { @@ -34,6 +38,11 @@ public LongColumnPreIndexStatsCollector(String column, StatsCollectorConfig stat super(column, statsCollectorConfig); } + public LongColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java index 43a6f8d0fec0..609f79e7e497 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.stats; +import javax.annotation.Nullable; import org.apache.pinot.segment.local.utils.ClusterConfigForTable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -69,4 +71,30 @@ public static AbstractColumnStatisticsCollector createStatsCollector(String colu throw new IllegalStateException("Unsupported data type: " + fieldSpec.getDataType()); } } + + /// Creates a scalar stats collector directly from a [FieldSpec], for callers without a + /// [StatsCollectorConfig]/[Schema] (e.g. OPEN_STRUCT materialized child columns). Skips the + /// no-dictionary-optimization branch, which requires a TableConfig that synthetic columns lack. + public static AbstractColumnStatisticsCollector createStatsCollector(FieldSpec fieldSpec, + @Nullable FieldConfig fieldConfig) { + switch (fieldSpec.getDataType().getStoredType()) { + case INT: + return new IntColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case LONG: + return new LongColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case FLOAT: + return new FloatColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case DOUBLE: + return new DoubleColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case BIG_DECIMAL: + return new BigDecimalColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case STRING: + return new StringColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + case BYTES: + return new BytesColumnPreIndexStatsCollector(fieldSpec, fieldConfig, null); + default: + throw new IllegalStateException("Unsupported stored type for OPEN_STRUCT child: " + + fieldSpec.getDataType().getStoredType()); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java index a01afe7b49ae..26a168d49863 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java @@ -27,7 +27,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Set; +import javax.annotation.Nullable; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -50,6 +52,14 @@ public StringColumnPreIndexStatsCollector(String column, StatsCollectorConfig st } } + public StringColumnPreIndexStatsCollector(FieldSpec fieldSpec, @Nullable FieldConfig fieldConfig, + @Nullable PartitionFunction partitionFunction) { + super(fieldSpec, fieldConfig, partitionFunction); + if (_fieldConfig != null && _fieldConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) { + _clpStatsCollector = new CLPStatsCollector(); + } + } + @Override public void collect(Object entry) { assert !_sealed; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSource.java new file mode 100644 index 000000000000..e83553e7cc98 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSource.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource; +import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.datasource.OpenStructDataSource; +import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * Per-key {@link DataSource} accessor for sealed (immutable) segments with an OPEN_STRUCT column. + * + *

Always columnar — there is no blob branch. Every key that was dense enough during segment + * creation gets its own materialized {@link DataSource} (forward index + optional inverted index / + * dictionary). Keys that did not meet the density threshold are stored in an optional sparse + * column; the sparse {@link DataSource} is returned for any unmaterialized key lookup. + * + *

Use {@link #isMaterialized(String)} and {@link #isFullyMaterialized()} together to choose + * the query execution path: + *

    + *
  • Materialized key → fast path via per-key DataSource (inverted/dictionary index available). + *
  • Not materialized + not fully materialized → fall back to the sparse DataSource. + *
  • Not materialized + fully materialized → key is definitively absent; short-circuit. + *
+ * + *

Thread-safety: immutable after construction; safe for concurrent reads. + */ +public class ImmutableOpenStructDataSource extends BaseDataSource implements OpenStructDataSource { + private final ComplexFieldSpec _fieldSpec; + private final Map _perKeyDataSources; + @Nullable + private final DataSource _sparseDataSource; + + public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map perKeyDataSources, + @Nullable DataSource sparseDataSource, DataSourceMetadata dataSourceMetadata, + ColumnIndexContainer indexContainer) { + super(dataSourceMetadata, indexContainer); + _fieldSpec = fieldSpec; + _perKeyDataSources = perKeyDataSources; + _sparseDataSource = sparseDataSource; + } + + /** + * Convenience constructor for segment-load time. Synthesizes a minimal {@link DataSourceMetadata} + * for the parent OPEN_STRUCT column (which has no on-disk presence of its own) and uses an empty + * {@link ColumnIndexContainer} — all real readers live on the per-key data sources. + */ + public ImmutableOpenStructDataSource(ComplexFieldSpec fieldSpec, Map perKeyDataSources, + @Nullable DataSource sparseDataSource, int numDocs) { + this(fieldSpec, perKeyDataSources, sparseDataSource, + new ImmutableOpenStructDataSourceMetadata(fieldSpec, numDocs), + new ColumnIndexContainer.FromMap.Builder().build()); + } + + @Override + public ComplexFieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + @Nullable + public DataSource getDataSource(String key) { + DataSource ds = _perKeyDataSources.get(key); + return ds != null ? ds : _sparseDataSource; + } + + @Override + public boolean isMaterialized(String key) { + return _perKeyDataSources.containsKey(key); + } + + @Override + public boolean isFullyMaterialized() { + return _sparseDataSource == null; + } + + @Override + public Map getDataSources() { + return _perKeyDataSources; + } + + @Override + @Nullable + public DataSourceMetadata getDataSourceMetadata(String key) { + DataSource ds = _perKeyDataSources.get(key); + return ds != null ? ds.getDataSourceMetadata() : null; + } + + @Override + @Nullable + public ColumnIndexContainer getIndexContainer(String key) { + DataSource ds = _perKeyDataSources.get(key); + return ds instanceof ImmutableDataSource immutableDs ? immutableDs.getIndexContainer() : null; + } + + private static class ImmutableOpenStructDataSourceMetadata implements DataSourceMetadata { + private final FieldSpec _fieldSpec; + private final int _numDocs; + + ImmutableOpenStructDataSourceMetadata(FieldSpec fieldSpec, int numDocs) { + _fieldSpec = fieldSpec; + _numDocs = numDocs; + } + + @Override + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + public boolean isSorted() { + return false; + } + + @Override + public int getNumDocs() { + return _numDocs; + } + + @Override + public int getNumValues() { + return _numDocs; + } + + @Override + public int getMaxNumValuesPerMVEntry() { + return 0; + } + + @Override + public int getCardinality() { + return Constants.UNKNOWN_CARDINALITY; + } + + @Nullable + @Override + public Comparable getMinValue() { + return null; + } + + @Nullable + @Override + public Comparable getMaxValue() { + return null; + } + + @Nullable + @Override + public PartitionFunction getPartitionFunction() { + return null; + } + + @Nullable + @Override + public java.util.Set getPartitions() { + return null; + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableKeyColumn.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableKeyColumn.java new file mode 100644 index 000000000000..4f7455f1c6de --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableKeyColumn.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory; +import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeInvertedIndex; +import org.apache.pinot.segment.spi.index.mutable.MutableDictionary; +import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex; +import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A single key's mutable column for an OPEN_STRUCT column: forward index (dictionary-encoded) + * + presence bitmap tracking which docIds had this key set. + * + * Single-writer during ingestion; presence bitmap and forward index are not thread-safe for + * concurrent writes. + */ +public class MutableKeyColumn implements Closeable { + private static final int DEFAULT_AVG_STRING_LENGTH = 32; + private static final int DEFAULT_ROWS_PER_CHUNK = 1000; + + private final String _key; + private final DataType _storedType; + private final MutableForwardIndex _forwardIndex; + private final MutableRoaringBitmap _presenceBitmap; + private final MutableDictionary _dictionary; + private final RealtimeInvertedIndex _invertedIndex; + + public MutableKeyColumn(String key, DataType storedType, PinotDataBufferMemoryManager memoryManager, int capacity) { + this(key, storedType, memoryManager, capacity, key); + } + + public MutableKeyColumn(String key, DataType storedType, PinotDataBufferMemoryManager memoryManager, int capacity, + String allocationContext) { + _key = key; + _storedType = storedType; + _presenceBitmap = new MutableRoaringBitmap(); + _invertedIndex = new RealtimeInvertedIndex(); + + int estimatedCardinality = Math.max(capacity / 100, 16); + int avgLength = storedType.isFixedWidth() ? storedType.size() : DEFAULT_AVG_STRING_LENGTH; + _dictionary = MutableDictionaryFactory.getMutableDictionary( + storedType, false, memoryManager, avgLength, estimatedCardinality, + allocationContext + ".dict"); + + _forwardIndex = new FixedByteSVMutableForwardIndex(true, DataType.INT, + DEFAULT_ROWS_PER_CHUNK, memoryManager, allocationContext + ".fwd"); + } + + public String getKey() { + return _key; + } + + public DataType getStoredType() { + return _storedType; + } + + public MutableForwardIndex getForwardIndex() { + return _forwardIndex; + } + + /** Bitmap of docIds where this key was present (non-null). */ + public ImmutableRoaringBitmap getPresenceBitmap() { + return _presenceBitmap; + } + + /** Number of documents where this key had a non-null value. */ + public int getNumNonNullDocs() { + return _presenceBitmap.getCardinality(); + } + + /** Distinct values in this key's dictionary, for cardinality estimation at seal time. */ + public Set getDistinctValues() { + int len = _dictionary.length(); + Set result = new java.util.HashSet<>(len); + for (int i = 0; i < len; i++) { + Object val = _dictionary.get(i); + result.add(val == null ? null : val.toString()); + } + return result; + } + + public MutableDictionary getDictionary() { + return _dictionary; + } + + public RealtimeInvertedIndex getInvertedIndex() { + return _invertedIndex; + } + + /** + * Indexes {@code value} at {@code docId}. The value must already be coerced to the stored type. + */ + public void setValue(int docId, Object value) { + _presenceBitmap.add(docId); + int dictId = _dictionary.index(value); + _forwardIndex.setDictId(docId, dictId); + _invertedIndex.add(dictId, docId); + } + + public Object getValue(int docId) { + // The forward index returns whatever bit pattern is at this offset, even for docs that were + // never written for this key. The presence bitmap is the source of truth — without this check, + // an absent doc would deserialize as if it held the first dictionary entry (dictId 0). + if (!_presenceBitmap.contains(docId)) { + return null; + } + int dictId = _forwardIndex.getDictId(docId, null); + if (dictId < 0 || dictId >= _dictionary.length()) { + return null; + } + return _dictionary.get(dictId); + } + + @Override + public void close() + throws IOException { + _forwardIndex.close(); + _dictionary.close(); + _invertedIndex.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSource.java new file mode 100644 index 000000000000..e9f8e1aa1423 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSource.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.datasource.BaseDataSource; +import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.datasource.OpenStructDataSource; +import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; + + +/// Per-key {@link DataSource} accessor for mutable (consuming) segments with an OPEN_STRUCT column. +/// +/// Always columnar — no blob branch. Per-key DataSources are synthesized on demand from the +/// underlying {@link MutableOpenStructIndex}; mutable mode always holds every observed key, so +/// {@link #isFullyMaterialized()} is unconditionally {@code true}. +public class MutableOpenStructDataSource extends BaseDataSource implements OpenStructDataSource { + private final ComplexFieldSpec _fieldSpec; + private final MutableOpenStructIndex _index; + private final int _numDocs; + + public MutableOpenStructDataSource(ComplexFieldSpec fieldSpec, MutableOpenStructIndex index, int numDocs) { + super(new MutableOpenStructDataSourceMetadata(fieldSpec, numDocs), + new ColumnIndexContainer.FromMap.Builder().build()); + _fieldSpec = fieldSpec; + _index = index; + _numDocs = numDocs; + } + + @Override + public ComplexFieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + @Nullable + public DataSource getDataSource(String key) { + Map indexes = _index.getIndexes(key); + if (indexes == null || indexes.isEmpty()) { + return null; + } + ColumnMetadata metadata = _index.getColumnMetadata(key); + return new ImmutableDataSource(metadata, + new ColumnIndexContainer.FromMap.Builder().withAll(indexes).build()); + } + + @Override + public boolean isMaterialized(String key) { + return _index.getKeyColumn(key) != null; + } + + /// Mutable mode always holds every observed key in-memory; the sparse tier exists only after seal. + @Override + public boolean isFullyMaterialized() { + return true; + } + + @Override + public Map getDataSources() { + Map result = new HashMap<>(); + for (String key : _index.getKeys()) { + DataSource ds = getDataSource(key); + if (ds != null) { + result.put(key, ds); + } + } + return result; + } + + @Override + @Nullable + public DataSourceMetadata getDataSourceMetadata(String key) { + DataSource ds = getDataSource(key); + return ds != null ? ds.getDataSourceMetadata() : null; + } + + @Override + @Nullable + public ColumnIndexContainer getIndexContainer(String key) { + DataSource ds = getDataSource(key); + return ds instanceof ImmutableDataSource imm ? imm.getIndexContainer() : null; + } + + private static class MutableOpenStructDataSourceMetadata implements DataSourceMetadata { + private final FieldSpec _fieldSpec; + private final int _numDocs; + + MutableOpenStructDataSourceMetadata(FieldSpec fieldSpec, int numDocs) { + _fieldSpec = fieldSpec; + _numDocs = numDocs; + } + + @Override + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + public boolean isSorted() { + return false; + } + + @Override + public int getNumDocs() { + return _numDocs; + } + + @Override + public int getNumValues() { + return _numDocs; + } + + @Override + public int getMaxNumValuesPerMVEntry() { + return 0; + } + + @Override + public int getCardinality() { + return Constants.UNKNOWN_CARDINALITY; + } + + @Override + @Nullable + public PartitionFunction getPartitionFunction() { + return null; + } + + @Override + @Nullable + public java.util.Set getPartitions() { + return null; + } + + @Override + @Nullable + public Comparable getMinValue() { + return null; + } + + @Override + @Nullable + public Comparable getMaxValue() { + return null; + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java new file mode 100644 index 000000000000..8c0fc46e3dd1 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndex.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.segment.local.segment.index.map.SimpleColumnMetadata; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.mutable.MutableIndex; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.OpenStructTypeInference; +import org.apache.pinot.spi.utils.PinotDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manages per-key mutable columns for an OPEN_STRUCT column during real-time consumption. + * Each discovered key gets its own {@link MutableKeyColumn} (dictionary-encoded forward index + + * presence bitmap). Dense/sparse classification is deferred to seal time. + * + *

Single-writer for {@link #index}: the consuming thread calls this method. Readers may + * concurrently read {@link #getKeys()} and {@link #getKeyColumns()} via the volatile map swap. + */ +@SuppressWarnings("rawtypes") +public class MutableOpenStructIndex implements OpenStructIndexReader, MutableIndex { + private static final Logger LOGGER = LoggerFactory.getLogger(MutableOpenStructIndex.class); + + private final String _openStructColumn; + private final OpenStructIndexConfig _config; + private final Map _childFieldSpecs; + private final int _maxDenseKeys; + private final PinotDataBufferMemoryManager _memoryManager; + private final int _capacity; + + // Volatile for lock-free reader access; writer always holds the consuming-thread lock. + private volatile Map _keyColumns = new HashMap<>(); + private int _distinctKeyCount = 0; + private final Set _droppedKeys = new HashSet<>(); + + public MutableOpenStructIndex(String openStructColumn, ComplexFieldSpec fieldSpec, + OpenStructIndexConfig config, PinotDataBufferMemoryManager memoryManager, int capacity) { + _openStructColumn = openStructColumn; + _config = config; + _maxDenseKeys = config.getMaxDenseKeys(); + _memoryManager = memoryManager; + _capacity = capacity; + + Map childFieldSpecs = fieldSpec.getChildFieldSpecs(); + _childFieldSpecs = childFieldSpecs != null ? new HashMap<>(childFieldSpecs) : new HashMap<>(); + } + + @Override + public void add(Object value, int dictId, int docId) { + index(docId, value); + } + + @Override + public void add(Object[] values, @Nullable int[] dictIds, int docId) { + throw new UnsupportedOperationException("OPEN_STRUCT does not support multi-value indexing"); + } + + /** + * Indexes the OPEN_STRUCT value for the given document. {@code value} must be a + * {@code Map} or {@code null}; null and non-Map values are silently skipped. + */ + @SuppressWarnings("unchecked") + public void index(int docId, @Nullable Object value) { + if (!(value instanceof Map)) { + return; + } + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object rawValue = entry.getValue(); + if (rawValue == null) { + continue; + } + + MutableKeyColumn keyCol = _keyColumns.get(key); + if (keyCol == null) { + if (_maxDenseKeys >= 0 && _distinctKeyCount >= _maxDenseKeys) { + if (_droppedKeys.add(key)) { + LOGGER.warn("OPEN_STRUCT '{}' reached maxDenseKeys ({}).. Dropping '{}'.", + _openStructColumn, _maxDenseKeys, key); + } + continue; + } + // Resolve stored type and coerce BEFORE allocating a column. A first-row coercion failure + // would otherwise permanently consume a maxDenseKeys slot for a column that was never used. + DataType resolvedType = resolveStoredType(key, rawValue); + if (resolvedType == null) { + continue; + } + Object coerced = tryCoerce(key, rawValue, resolvedType); + if (coerced == null) { + continue; + } + keyCol = allocateKeyColumn(key, resolvedType); + keyCol.setValue(docId, coerced); + continue; + } + + DataType storedType = keyCol.getStoredType(); + Object coerced = tryCoerce(key, rawValue, storedType); + if (coerced == null) { + continue; + } + keyCol.setValue(docId, coerced); + } + } + + /// Resolves the stored type for a key without allocating any state. Returns null when the type + /// cannot be inferred (caller should skip the entry). + @Nullable + private DataType resolveStoredType(String key, Object rawValue) { + FieldSpec spec = _childFieldSpecs.get(key); + DataType valueType; + if (spec != null) { + valueType = spec.getDataType(); + } else { + valueType = OpenStructTypeInference.inferDataType(rawValue); + if (valueType == null) { + LOGGER.warn("OPEN_STRUCT '{}': could not infer DataType for key '{}' from value of class '{}'." + + " Dropping the entry.", + _openStructColumn, key, rawValue.getClass().getName()); + return null; + } + } + return valueType.getStoredType(); + } + + /// Coerces rawValue to storedType. Returns null on failure (logged at WARN); the caller drops + /// the entry. Note: a successful coerce of a "null"-shaped raw value would also return null — + /// but callers gate on rawValue != null before reaching here. + @Nullable + private Object tryCoerce(String key, Object rawValue, DataType storedType) { + try { + PinotDataType sourceType = PinotDataType.getSingleValueType(rawValue); + PinotDataType destType = ColumnDataType.fromDataTypeSV(storedType).toPinotDataType(); + return destType.convert(rawValue, sourceType); + } catch (Exception e) { + LOGGER.warn("OPEN_STRUCT '{}': coercion failed for key '{}' to {}. Skipping.", + _openStructColumn, key, storedType, e); + return null; + } + } + + /// Allocates a new MutableKeyColumn for {@code key} with the resolved {@code storedType} and + /// publishes it via volatile copy-on-write. Increments the dense-key count. + private MutableKeyColumn allocateKeyColumn(String key, DataType storedType) { + _distinctKeyCount++; + String allocationContext = _openStructColumn + "$" + key; + MutableKeyColumn newCol = + new MutableKeyColumn(key, storedType, _memoryManager, _capacity, allocationContext); + Map updated = new HashMap<>(_keyColumns); + updated.put(key, newCol); + _keyColumns = updated; + return newCol; + } + + /** Returns the set of keys discovered so far. */ + public Set getKeys() { + return _keyColumns.keySet(); + } + + /** Returns a snapshot of the per-key column map. */ + public Map getKeyColumns() { + return _keyColumns; + } + + /** Returns the {@link MutableKeyColumn} for {@code key}, or {@code null} if not seen yet. */ + @Nullable + public MutableKeyColumn getKeyColumn(String key) { + return _keyColumns.get(key); + } + + @Override + public Map getIndexes(String key) { + MutableKeyColumn col = _keyColumns.get(key); + if (col == null) { + return Map.of(); + } + return Map.of( + StandardIndexes.forward(), col.getForwardIndex(), + StandardIndexes.dictionary(), col.getDictionary(), + StandardIndexes.inverted(), col.getInvertedIndex()); + } + + @Nullable + @Override + public ColumnMetadata getColumnMetadata(String key) { + MutableKeyColumn col = _keyColumns.get(key); + if (col == null) { + return null; + } + FieldSpec spec = _childFieldSpecs.get(key); + if (spec == null) { + spec = new DimensionFieldSpec(key, col.getStoredType(), true); + } + return new SimpleColumnMetadata(spec, _capacity); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return true; + } + + @Override + public DataType getStoredType() { + return DataType.OPEN_STRUCT; + } + + @Override + public void close() + throws IOException { + for (MutableKeyColumn keyCol : _keyColumns.values()) { + keyCol.close(); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexPlugin.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexPlugin.java new file mode 100644 index 000000000000..21199f208d94 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexPlugin.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import com.google.auto.service.AutoService; +import org.apache.pinot.segment.spi.index.IndexPlugin; + + +@AutoService(IndexPlugin.class) +public class OpenStructIndexPlugin implements IndexPlugin { + public static final OpenStructIndexType INSTANCE = new OpenStructIndexType(); + + @Override + public OpenStructIndexType getIndexType() { + return INSTANCE; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexType.java new file mode 100644 index 000000000000..467ba5835719 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexType.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.openstruct.OpenStructColumnSplitter; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; +import org.apache.pinot.segment.spi.index.IndexHandler; +import org.apache.pinot.segment.spi.index.IndexReaderFactory; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator; +import org.apache.pinot.segment.spi.index.mutable.MutableIndex; +import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext; +import org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; + + +/** + * Index type for the OPEN_STRUCT index on OPEN_STRUCT columns. + * + *

The OPEN_STRUCT index has no reader of its own — per-key materialized columns are loaded by + * the standard {@code PhysicalColumnIndexContainer} and served via standard index readers. This + * type exists for SPI registration, config deserialization, and validation. + */ +public class OpenStructIndexType + extends AbstractIndexType { + + public static final String INDEX_DISPLAY_NAME = "open_struct"; + private static final List EXTENSIONS = Collections.singletonList(".open_struct.idx"); + + protected OpenStructIndexType() { + super(StandardIndexes.OPEN_STRUCT_ID); + } + + @Override + public Class getIndexConfigClass() { + return OpenStructIndexConfig.class; + } + + @Override + public OpenStructIndexConfig getDefaultConfig() { + return OpenStructIndexConfig.DEFAULT; + } + + @Override + public void validate(FieldIndexConfigs indexConfigs, FieldSpec fieldSpec, TableConfig tableConfig) { + // The default OpenStructIndexConfig is auto-applied to every column; only enforce on OPEN_STRUCT + // fields. Non-OPEN_STRUCT columns cannot meaningfully opt in to this index. + if (fieldSpec.getDataType() != FieldSpec.DataType.OPEN_STRUCT) { + return; + } + OpenStructIndexConfig config = indexConfigs.getConfig(this); + if (config.isEnabled()) { + Preconditions.checkState(fieldSpec.isSingleValueField(), + "OPEN_STRUCT index can only be created on single-value columns, but column '%s' is multi-value", + fieldSpec.getName()); + validatePerKeyIndexes(config); + } + } + + private void validatePerKeyIndexes(OpenStructIndexConfig config) { + List fieldConfigs = new ArrayList<>(); + if (config.getValueFieldConfigs() != null) { + fieldConfigs.addAll(config.getValueFieldConfigs()); + } + if (config.getDefaultValueFieldConfig() != null) { + fieldConfigs.add(config.getDefaultValueFieldConfig()); + } + for (FieldConfig fieldConfig : fieldConfigs) { + JsonNode indexes = fieldConfig.getIndexes(); + if (indexes == null) { + continue; + } + Iterator indexNames = indexes.fieldNames(); + while (indexNames.hasNext()) { + String indexName = indexNames.next(); + Preconditions.checkState(OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES.contains(indexName), + "OPEN_STRUCT key '%s' declares unsupported index '%s'; supported indexes are %s", + fieldConfig.getName(), indexName, OpenStructSupportedIndexes.ALLOWED_PRETTY_NAMES); + } + } + } + + @Override + public String getPrettyName() { + return INDEX_DISPLAY_NAME; + } + + @Override + protected ColumnConfigDeserializer createDeserializerForLegacyConfigs() { + // OPEN_STRUCT is net-new; no legacy FieldConfig.properties path to migrate. + return IndexConfigDeserializer.fromIndexTypes(FieldConfig.IndexType.OPEN_STRUCT, + (tableConfig, fieldConfig) -> OpenStructIndexConfig.DEFAULT); + } + + @Override + public boolean shouldCreateIndex(IndexCreationContext context, OpenStructIndexConfig indexConfig) { + // The default OpenStructIndexConfig is auto-applied to every column; only build a creator for + // OPEN_STRUCT columns. Non-OPEN_STRUCT columns cannot meaningfully host this index. + return context.getFieldSpec().getDataType() == FieldSpec.DataType.OPEN_STRUCT; + } + + @Override + public ColumnarOpenStructIndexCreator createIndexCreator(IndexCreationContext context, + OpenStructIndexConfig indexConfig) { + FieldSpec fieldSpec = context.getFieldSpec(); + return new OpenStructColumnSplitter(context.getIndexDir(), fieldSpec.getName(), fieldSpec, indexConfig); + } + + @Override + protected IndexReaderFactory createReaderFactory() { + return new NoOpReaderFactory(); + } + + @Override + public List getFileExtensions(@Nullable ColumnMetadata columnMetadata) { + return EXTENSIONS; + } + + @Override + public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, + Map configsByCol, Schema schema, TableConfig tableConfig) { + return IndexHandler.NoOp.INSTANCE; + } + + @Nullable + @Override + public MutableIndex createMutableIndex(MutableIndexContext context, OpenStructIndexConfig config) { + throw new UnsupportedOperationException("Mutable OPEN_STRUCT index is constructed by MutableSegmentImpl, " + + "not via this SPI path"); + } + + @Override + public boolean shouldInvalidateOnDictionaryChange(FieldSpec fieldSpec, OpenStructIndexConfig indexConfig) { + return false; + } + + @Override + public boolean requiresDictionary(FieldSpec fieldSpec, OpenStructIndexConfig indexConfig) { + return false; + } + + /** + * Reader factory that always returns null. The OPEN_STRUCT index has no reader of its own — + * materialized columns are loaded independently by the standard column loading infrastructure. + */ + private static class NoOpReaderFactory implements IndexReaderFactory { + @Nullable + @Override + public OpenStructIndexReader createIndexReader(SegmentDirectory.Reader segmentReader, + FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata) { + return null; + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructSupportedIndexes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructSupportedIndexes.java new file mode 100644 index 000000000000..5daf62558216 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructSupportedIndexes.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.util.Set; +import org.apache.pinot.segment.spi.index.StandardIndexes; + + +/** + * The set of index types (by pretty name) supported on OPEN_STRUCT materialized child columns. A key's + * {@code FieldConfig} may declare only these; non-vetted indexes are rejected at table-config validation. + * {@code dictionary} is built structurally (lifecycle CUSTOM); {@code forward} is always written. + */ +public final class OpenStructSupportedIndexes { + private OpenStructSupportedIndexes() { + } + + public static final Set ALLOWED_PRETTY_NAMES = Set.of( + StandardIndexes.forward().getPrettyName(), + StandardIndexes.dictionary().getPrettyName(), + StandardIndexes.inverted().getPrettyName(), + StandardIndexes.range().getPrettyName(), + StandardIndexes.bloomFilter().getPrettyName()); +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 5f87017a0849..3d15359485f3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -93,6 +93,7 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.OpenStructNaming; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.function.FunctionEvaluator; import org.apache.pinot.spi.ingestion.batch.BatchConfig; @@ -1603,6 +1604,23 @@ private static void validateIndexingConfigAndFieldConfigList(TableConfig tableCo validateMultiColumnTextIndex(indexingConfig.getMultiColumnTextIndexConfig()); + // OPEN_STRUCT materialized child columns use a reserved separator '$' in their name. When any + // schema field is OPEN_STRUCT, reject user columns whose names contain '$' to prevent naming + // collisions with future per-key materialized columns (col$key) or the sparse column + // (col$__sparse__). This validation runs here (with full schema access) rather than per-field + // because each OpenStructIndexType.validate() call only sees one field at a time. + boolean anyOpenStruct = schema.getAllFieldSpecs().stream() + .anyMatch(fs -> fs.getDataType() == DataType.OPEN_STRUCT); + if (anyOpenStruct) { + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + Preconditions.checkState( + fieldSpec.getDataType() == DataType.OPEN_STRUCT + || !OpenStructNaming.isMaterializedOpenStructColumn(fieldSpec.getName()), + "Schema column '%s' contains reserved OPEN_STRUCT separator '%s'", + fieldSpec.getName(), OpenStructNaming.SEPARATOR); + } + } + // Star-tree index config is not managed by FieldIndexConfigs, and we need to validate it separately. List starTreeIndexConfigs = indexingConfig.getStarTreeIndexConfigs(); if (CollectionUtils.isNotEmpty(starTreeIndexConfigs)) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitterTest.java new file mode 100644 index 000000000000..0a7ce90eb7fe --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/openstruct/OpenStructColumnSplitterTest.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.openstruct; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.math.BigDecimal; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.OpenStructNaming; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + + +public class OpenStructColumnSplitterTest { + + private File _tempDir; + + @BeforeMethod + public void setUp() + throws Exception { + _tempDir = Files.createTempDirectory("OpenStructColumnSplitterTest").toFile(); + } + + @AfterMethod + public void tearDown() + throws Exception { + FileUtils.deleteDirectory(_tempDir); + } + + private ComplexFieldSpec spec() { + return new ComplexFieldSpec("metrics", DataType.OPEN_STRUCT, true, Map.of()); + } + + private OpenStructIndexConfig config(double minFillRate, int maxDenseKeys, Set denseKeys) { + return new OpenStructIndexConfig(false, null, maxDenseKeys, denseKeys, minFillRate, null); + } + + @Test + public void testClassifyByFillRate() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + Map doc = d < 7 ? Map.of("clicks", (long) d) : Map.of(); + s.add(doc, d); + } + Set dense = s.classify(); + assertTrue(dense.contains("clicks")); + } + + @Test + public void testExplicitDenseKeysAlwaysMaterialized() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.99, -1, Set.of("rare"))); + s.add(Map.of("rare", "x"), 0); + for (int d = 1; d < 100; d++) { + s.add(Map.of(), d); + } + Set dense = s.classify(); + assertTrue(dense.contains("rare")); + } + + @Test + public void testRareKeyDroppedFromDense() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + s.add(Map.of("rare", "x"), 0); + for (int d = 1; d < 100; d++) { + s.add(Map.of(), d); + } + Set dense = s.classify(); + assertFalse(dense.contains("rare")); + } + + @Test + public void testMaxDenseKeysCap() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.1, 1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("a", "x", "b", "y", "c", "z"), d); + } + Set dense = s.classify(); + assertEquals(dense.size(), 1); + } + + @Test + public void testZeroDocsIsNoop() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + s.seal(); + assertTrue(s.getResolvedDenseKeys().isEmpty()); + } + + @Test + public void testSealEmitsParentMetadataForDense() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("clicks", (long) d), d); + } + s.seal(); + String denseCol = OpenStructNaming.materializedColumnName("metrics", "clicks"); + Map meta = s.getMaterializedColumnMetadata(); + PropertiesConfiguration denseProps = meta.get(denseCol); + assertEquals(denseProps.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.PARENT_COLUMN)), "metrics"); + + PropertiesConfiguration parentProps = meta.get("metrics"); + assertNotNull(parentProps); + assertEquals(parentProps.getString(V1Constants.MetadataKeys.Column.getKeyFor( + "metrics", V1Constants.MetadataKeys.Column.DATA_TYPE)), "OPEN_STRUCT"); + assertEquals(parentProps.getString(V1Constants.MetadataKeys.Column.getKeyFor( + "metrics", V1Constants.MetadataKeys.Column.COLUMN_TYPE)), "COMPLEX"); + assertEquals(parentProps.getString(V1Constants.MetadataKeys.Column.getKeyFor( + "metrics", V1Constants.MetadataKeys.Column.HAS_SPARSE_COLUMN)), "false"); + } + + @Test + public void testDenseColumnMetadataKeysPresent() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("clicks", (long) d), d); + } + s.seal(); + String denseCol = OpenStructNaming.materializedColumnName("metrics", "clicks"); + PropertiesConfiguration p = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(p); + assertEquals(p.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.DATA_TYPE)), "LONG"); + assertEquals(p.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.COLUMN_TYPE)), "DIMENSION"); + assertEquals(p.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "true"); + assertEquals(p.getInt(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.TOTAL_DOCS)), 10); + assertEquals(p.getInt(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.CARDINALITY)), 10); + assertEquals(p.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.PARENT_COLUMN)), "metrics"); + assertEquals(p.getString(V1Constants.MetadataKeys.Column.getKeyFor(denseCol, "hasNullValue")), "true"); + } + + @Test + public void testSparseJsonColumnWritten() + throws Exception { + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.9, -1, null)); + s.add(Map.of("rare", "x"), 0); + for (int d = 1; d < 10; d++) { + s.add(Map.of(), d); + } + s.seal(); + String sparseCol = OpenStructNaming.sparseColumnName("metrics"); + assertTrue(s.getMaterializedColumnMetadata().containsKey(sparseCol)); + } + + @Test + public void testBigDecimalDictionaryRoundTrip() + throws Exception { + // Regression: an untyped key whose value is a BigDecimal used to crash seal() with + // IllegalStateException("Unsupported OPEN_STRUCT stored type for dictionary build: BIG_DECIMAL"). + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("amount", new BigDecimal("12.34").add(BigDecimal.valueOf(d))), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "amount"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.DATA_TYPE)), "BIG_DECIMAL"); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "true"); + assertTrue(new File(_tempDir, denseCol + V1Constants.Dict.FILE_EXTENSION).exists()); + } + + @Test + public void testBigDecimalScaleDistinctValuesNotCollapsed() + throws Exception { + // 1.0 and 1.00 are equal by compareTo but distinct by equals; they must stay separate dictionary + // entries. Doc 2 is absent, so the default (BigDecimal.ZERO) is also collected -> 3 distinct values. + // A compareTo-based dedup would wrongly collapse 1.0/1.00 and yield 2. + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + s.add(Map.of("amount", new BigDecimal("1.0")), 0); + s.add(Map.of("amount", new BigDecimal("1.00")), 1); + s.add(Map.of(), 2); + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "amount"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getInt(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.CARDINALITY)), 3); + } + + @Test + public void testBigDecimalExplicitChildSpec() + throws Exception { + // A key declared BIG_DECIMAL in the schema bypasses inferDataType but must still seal. + Map children = Map.of( + "amount", new DimensionFieldSpec("amount", DataType.BIG_DECIMAL, true)); + ComplexFieldSpec specWithChild = new ComplexFieldSpec("metrics", DataType.OPEN_STRUCT, true, children); + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", specWithChild, + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("amount", new BigDecimal("100.5")), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "amount"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.DATA_TYPE)), "BIG_DECIMAL"); + } + + @Test + public void testBigDecimalRawForwardIndex() + throws Exception { + // RAW-encoded BIG_DECIMAL key must take the raw var-byte forward index path, not the dictionary. + FieldConfig rawConfig = new FieldConfig.Builder("amount") + .withEncodingType(FieldConfig.EncodingType.RAW).build(); + OpenStructIndexConfig cfg = new OpenStructIndexConfig( + false, null, -1, null, 0.5, List.of(rawConfig)); + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), cfg); + for (int d = 0; d < 10; d++) { + s.add(Map.of("amount", new BigDecimal("7.5").add(BigDecimal.valueOf(d))), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "amount"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "false"); + assertFalse(new File(_tempDir, denseCol + V1Constants.Dict.FILE_EXTENSION).exists()); + assertTrue(new File(_tempDir, + denseCol + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION).exists()); + } + + @Test + public void testBigDecimalSparseKey() + throws Exception { + // A BIG_DECIMAL key below the fill-rate threshold goes to the sparse JSON column without crashing. + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.9, -1, null)); + s.add(Map.of("rare", new BigDecimal("3.14159")), 0); + for (int d = 1; d < 10; d++) { + s.add(Map.of(), d); + } + s.seal(); + assertTrue(s.getMaterializedColumnMetadata().containsKey(OpenStructNaming.sparseColumnName("metrics"))); + } + + @Test + public void testAbsentDocUsesDimensionNullDefault() + throws Exception { + // Absent docs now store the standard Pinot dimension null value (INT -> Integer.MIN_VALUE), + // so the column min reflects that default rather than the old metric-style 0. + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 5; d++) { + s.add(Map.of("clicks", 10 + d), d); // present: 10..14 + } + for (int d = 5; d < 10; d++) { + s.add(Map.of(), d); // absent + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "clicks"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + // Default (non-RAW) numeric key is dictionary-encoded. + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "true"); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.MIN_VALUE)), String.valueOf(Integer.MIN_VALUE)); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.MAX_VALUE)), "14"); + } + + @Test + public void testDenseDefaultKeyWritesDictionaryAndInvertedIndex() + throws Exception { + // Default keys are dictionary-encoded with an inverted index (both default on), now written via the + // standard ForwardIndexCreator and inverted index creator. + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), + config(0.5, -1, null)); + for (int d = 0; d < 10; d++) { + s.add(Map.of("tag", "v" + (d % 3)), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "tag"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "true"); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor(denseCol, "hasInvertedIndex")), + "true"); + assertTrue(new File(_tempDir, denseCol + V1Constants.Dict.FILE_EXTENSION).exists()); + assertTrue(new File(_tempDir, + denseCol + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION).exists()); + } + + @Test + public void testRawStringForwardIndexViaStandardCreator() + throws Exception { + // A RAW-configured key takes the standard raw var-byte forward index path (no dictionary). + FieldConfig rawConfig = new FieldConfig.Builder("note") + .withEncodingType(FieldConfig.EncodingType.RAW).build(); + OpenStructIndexConfig cfg = new OpenStructIndexConfig(false, null, -1, null, 0.5, List.of(rawConfig)); + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), cfg); + for (int d = 0; d < 10; d++) { + s.add(Map.of("note", "n" + d), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "note"); + PropertiesConfiguration props = s.getMaterializedColumnMetadata().get(denseCol); + assertNotNull(props); + assertEquals(props.getString(V1Constants.MetadataKeys.Column.getKeyFor( + denseCol, V1Constants.MetadataKeys.Column.HAS_DICTIONARY)), "false"); + assertFalse(new File(_tempDir, denseCol + V1Constants.Dict.FILE_EXTENSION).exists()); + assertTrue(new File(_tempDir, + denseCol + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION).exists()); + } + + @Test + public void testRangeAndBloomIndexesWrittenForKey() + throws Exception { + // An INT key configured with range + bloom must produce those index buffers via the generic loop. + JsonNode indexes = JsonUtils.stringToJsonNode("{\"range\": {}, \"bloom\": {}}"); + FieldConfig keyConfig = new FieldConfig.Builder("clicks").withIndexes(indexes).build(); + OpenStructIndexConfig cfg = new OpenStructIndexConfig(false, null, -1, null, 0.5, List.of(keyConfig)); + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), cfg); + for (int d = 0; d < 10; d++) { + s.add(Map.of("clicks", d), d); + } + s.seal(); + + String denseCol = OpenStructNaming.materializedColumnName("metrics", "clicks"); + assertTrue(new File(_tempDir, denseCol + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION).exists(), + "range index buffer should be written"); + assertTrue(new File(_tempDir, denseCol + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION).exists(), + "bloom filter buffer should be written"); + } + + @Test + public void testRangeOnRawNonNumericKeyFailsWithCanonicalGuard() + throws Exception { + // A STRING key with RAW encoding + range resolves to raw (range does not require a dictionary), which the + // range creator cannot build. The splitter must surface the canonical RangeIndexType.validate guard + // (IllegalStateException) at build time rather than crashing opaquely inside the creator. + JsonNode indexes = JsonUtils.stringToJsonNode("{\"range\": {}}"); + FieldConfig keyConfig = new FieldConfig.Builder("tag") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withIndexes(indexes) + .build(); + OpenStructIndexConfig cfg = new OpenStructIndexConfig(false, null, -1, null, 0.5, List.of(keyConfig)); + OpenStructColumnSplitter s = new OpenStructColumnSplitter(_tempDir, "metrics", spec(), cfg); + for (int d = 0; d < 10; d++) { + s.add(Map.of("tag", "v" + d), d); + } + + assertThrows(IllegalStateException.class, s::seal); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtilFieldSpecTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtilFieldSpecTest.java new file mode 100644 index 000000000000..04fe3d57351f --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtilFieldSpecTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.stats; + +import java.math.BigDecimal; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class StatsCollectorUtilFieldSpecTest { + + @Test + public void testCreateIntCollectorFromFieldSpec() { + DimensionFieldSpec spec = new DimensionFieldSpec("c", DataType.INT, true); + AbstractColumnStatisticsCollector collector = StatsCollectorUtil.createStatsCollector(spec, null); + collector.collect(5); + collector.collect(5); + collector.collect(2); + collector.seal(); + assertEquals(collector.getCardinality(), 2); + assertEquals(collector.getMinValue(), 2); + assertEquals(collector.getMaxValue(), 5); + } + + @Test + public void testCreateBigDecimalCollectorFromFieldSpec() { + DimensionFieldSpec spec = new DimensionFieldSpec("c", DataType.BIG_DECIMAL, true); + AbstractColumnStatisticsCollector collector = StatsCollectorUtil.createStatsCollector(spec, null); + collector.collect(new BigDecimal("1.0")); + collector.collect(new BigDecimal("1.00")); + collector.seal(); + // 1.0 and 1.00 are distinct under equals (BigDecimalColumnPreIndexStatsCollector uses ObjectOpenHashSet). + assertEquals(collector.getCardinality(), 2); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSourceTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSourceTest.java new file mode 100644 index 000000000000..6df147c657a0 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/ImmutableOpenStructDataSourceTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.util.Map; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; + + +public class ImmutableOpenStructDataSourceTest { + + private static ComplexFieldSpec openStructSpec(String name) { + ComplexFieldSpec spec = new ComplexFieldSpec(name, DataType.OPEN_STRUCT, true, Map.of()); + return spec; + } + + @Test + public void testGetDataSourceReturnsPerKeyDataSource() { + DataSource clicksDs = mock(DataSource.class); + DataSource sparseDs = mock(DataSource.class); + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of("clicks", clicksDs), + sparseDs, + meta, + container); + + assertSame(ds.getDataSource("clicks"), clicksDs); + // absent key falls back to sparse + assertSame(ds.getDataSource("unknown"), sparseDs); + } + + @Test + public void testIsMaterializedTrueOnlyForMaterializedKeys() { + DataSource clicksDs = mock(DataSource.class); + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of("clicks", clicksDs), + null, + meta, + container); + + assertTrue(ds.isMaterialized("clicks")); + assertFalse(ds.isMaterialized("absent")); + } + + @Test + public void testIsFullyMaterializedTrueWhenNoSparse() { + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of(), + null, + meta, + container); + + assertTrue(ds.isFullyMaterialized()); + } + + @Test + public void testIsFullyMaterializedFalseWhenSparsePresent() { + DataSource sparseDs = mock(DataSource.class); + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of(), + sparseDs, + meta, + container); + + assertFalse(ds.isFullyMaterialized()); + } + + @Test + public void testGetFieldSpecReturnsOpenStructView() { + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of(), + null, + meta, + container); + + ComplexFieldSpec fieldSpec = ds.getFieldSpec(); + assertNotNull(fieldSpec); + assertEquals(fieldSpec.getName(), "event"); + } + + @Test + public void testGetDataSourceMetadataByKeyReturnsDelegated() { + DataSource clicksDs = mock(DataSource.class); + DataSourceMetadata clicksMeta = mock(DataSourceMetadata.class); + DataSourceMetadata topMeta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + org.mockito.Mockito.when(clicksDs.getDataSourceMetadata()).thenReturn(clicksMeta); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of("clicks", clicksDs), + null, + topMeta, + container); + + assertSame(ds.getDataSourceMetadata("clicks"), clicksMeta); + assertNull(ds.getDataSourceMetadata("absent")); + } + + @Test + public void testGetDataSourcesReturnsPerKeyMap() { + DataSource clicksDs = mock(DataSource.class); + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + Map perKeyMap = Map.of("clicks", clicksDs); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + perKeyMap, + null, + meta, + container); + + assertEquals(ds.getDataSources(), perKeyMap); + } + + @Test + public void testTopLevelMetadataAndContainerDelegated() { + DataSourceMetadata meta = mock(DataSourceMetadata.class); + ColumnIndexContainer container = mock(ColumnIndexContainer.class); + + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of(), + null, + meta, + container); + + assertSame(ds.getDataSourceMetadata(), meta); + assertSame(ds.getIndexContainer(), container); + } + + @Test + public void testConvenienceConstructorSynthesizesMetadata() { + DataSource clicksDs = mock(DataSource.class); + ImmutableOpenStructDataSource ds = new ImmutableOpenStructDataSource( + openStructSpec("event"), + Map.of("clicks", clicksDs), + null, + 42); + + DataSourceMetadata meta = ds.getDataSourceMetadata(); + assertNotNull(meta); + assertEquals(meta.getNumDocs(), 42); + assertEquals(meta.getNumValues(), 42); + assertEquals(meta.getFieldSpec().getDataType(), DataType.OPEN_STRUCT); + assertNotNull(ds.getIndexContainer()); + assertTrue(ds.isFullyMaterialized()); + assertSame(ds.getDataSource("clicks"), clicksDs); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSourceTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSourceTest.java new file mode 100644 index 000000000000..397a91d92712 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructDataSourceTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.util.Map; +import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +public class MutableOpenStructDataSourceTest { + + private PinotDataBufferMemoryManager _mm; + + @BeforeMethod + public void setUp() { + _mm = new DirectMemoryManager("MutableOpenStructDataSourceTest"); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mm.close(); + } + + private ComplexFieldSpec spec() { + return new ComplexFieldSpec("metrics", DataType.OPEN_STRUCT, true, Map.of()); + } + + @Test + public void testGetDataSourcePerKey() + throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", spec(), + OpenStructIndexConfig.DEFAULT, _mm, 100)) { + idx.index(0, Map.of("clicks", 5L)); + MutableOpenStructDataSource ds = new MutableOpenStructDataSource(spec(), idx, 1); + DataSource clicks = ds.getDataSource("clicks"); + assertNotNull(clicks); + assertTrue(ds.isMaterialized("clicks")); + assertTrue(ds.isFullyMaterialized()); // mutable always holds everything + } + } + + @Test + public void testGetDataSourceForUnknownKey() + throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", spec(), + OpenStructIndexConfig.DEFAULT, _mm, 100)) { + MutableOpenStructDataSource ds = new MutableOpenStructDataSource(spec(), idx, 0); + assertNull(ds.getDataSource("missing")); + assertFalse(ds.isMaterialized("missing")); + } + } + + @Test + public void testGetDataSourcesReturnsAllKeys() + throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", spec(), + OpenStructIndexConfig.DEFAULT, _mm, 100)) { + idx.index(0, Map.of("clicks", 5L, "country", "US")); + MutableOpenStructDataSource ds = new MutableOpenStructDataSource(spec(), idx, 1); + assertEquals(ds.getDataSources().size(), 2); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndexTest.java new file mode 100644 index 000000000000..fc031f04df0a --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/MutableOpenStructIndexTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager; +import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +public class MutableOpenStructIndexTest { + private PinotDataBufferMemoryManager _memMgr; + + @BeforeMethod + public void setUp() { + _memMgr = new DirectMemoryManager(MutableOpenStructIndexTest.class.getName()); + } + + @AfterMethod + public void tearDown() + throws IOException { + _memMgr.close(); + } + + private static ComplexFieldSpec openStructSpec() { + return new ComplexFieldSpec("metrics", DataType.OPEN_STRUCT, true, Map.of()); + } + + @Test + public void testAddAndGetKeys() + throws IOException { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex( + "metrics", openStructSpec(), OpenStructIndexConfig.DEFAULT, _memMgr, 1000)) { + + idx.index(0, Map.of("clicks", 42L, "impressions", 100L)); + idx.index(1, Map.of("clicks", 7L, "revenue", "1.5")); + + Set keys = idx.getKeys(); + assertTrue(keys.contains("clicks"), "Expected 'clicks' in keys"); + assertTrue(keys.contains("impressions"), "Expected 'impressions' in keys"); + assertTrue(keys.contains("revenue"), "Expected 'revenue' in keys"); + assertEquals(keys.size(), 3); + } + } + + @Test + public void testIndexNullIsNoop() + throws IOException { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex( + "metrics", openStructSpec(), OpenStructIndexConfig.DEFAULT, _memMgr, 1000)) { + + idx.index(0, null); + + assertTrue(idx.getKeys().isEmpty(), "Expected no keys after indexing null"); + assertNull(idx.getKeyColumn("clicks")); + } + } + + @Test + public void testFillRateTracking() + throws IOException { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex( + "metrics", openStructSpec(), OpenStructIndexConfig.DEFAULT, _memMgr, 1000)) { + + for (int docId = 0; docId < 10; docId++) { + if (docId < 7) { + idx.index(docId, Map.of("clicks", (long) (docId + 1))); + } else { + // docs 7,8,9 have no "clicks" key + idx.index(docId, Map.of("impressions", 100L)); + } + } + + MutableKeyColumn clicksCol = idx.getKeyColumn("clicks"); + assertNotNull(clicksCol, "Expected 'clicks' column to exist"); + assertEquals(clicksCol.getNumNonNullDocs(), 7, + "Expected 7 non-null docs for 'clicks'"); + } + } + + @Test + public void testTypeInferenceFromValue() + throws IOException { + // No childFieldSpecs — type inference from rawValue + ComplexFieldSpec spec = new ComplexFieldSpec("metrics", DataType.OPEN_STRUCT, true, Map.of()); + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", spec, + OpenStructIndexConfig.DEFAULT, _memMgr, 100)) { + idx.index(0, java.util.Map.of("clicks", 5L)); + assertEquals(idx.getKeyColumn("clicks").getStoredType(), DataType.LONG); + idx.index(1, java.util.Map.of("country", "US")); + assertEquals(idx.getKeyColumn("country").getStoredType(), DataType.STRING); + } + } + + @Test + public void testImplementsOpenStructIndexReader() throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", openStructSpec(), + OpenStructIndexConfig.DEFAULT, _memMgr, 100)) { + assertTrue(idx instanceof org.apache.pinot.segment.spi.index.reader.OpenStructIndexReader); + } + } + + @Test + public void testGetIndexesReturnsForwardIndexForMaterializedKey() throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", openStructSpec(), + OpenStructIndexConfig.DEFAULT, _memMgr, 100)) { + idx.index(0, Map.of("clicks", 5L)); + Map indexes = + idx.getIndexes("clicks"); + assertNotNull(indexes.get(org.apache.pinot.segment.spi.index.StandardIndexes.forward())); + } + } + + @Test + public void testGetIndexesUnknownKeyReturnsEmpty() throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", openStructSpec(), + OpenStructIndexConfig.DEFAULT, _memMgr, 100)) { + assertTrue(idx.getIndexes("missing").isEmpty()); + } + } + + @Test + public void testGetColumnMetadataReturnsKeyMetadata() throws Exception { + try (MutableOpenStructIndex idx = new MutableOpenStructIndex("metrics", openStructSpec(), + OpenStructIndexConfig.DEFAULT, _memMgr, 100)) { + idx.index(0, Map.of("clicks", 5L)); + assertNotNull(idx.getColumnMetadata("clicks")); + assertEquals(idx.getColumnMetadata("clicks").getColumnName(), "clicks"); + assertNull(idx.getColumnMetadata("absent")); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexTypeTest.java new file mode 100644 index 000000000000..317fe219f3f1 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/openstruct/OpenStructIndexTypeTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.openstruct; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import java.util.Map; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.OpenStructIndexConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; + + +public class OpenStructIndexTypeTest { + + @Test + public void testServiceLoaderResolves() { + assertNotNull(StandardIndexes.openStruct(), + "StandardIndexes.openStruct() should resolve via OpenStructIndexPlugin"); + } + + @Test + public void testIndexIdMatches() { + assertEquals(StandardIndexes.openStruct().getId(), StandardIndexes.OPEN_STRUCT_ID); + } + + @Test + public void testSingletonInstance() { + assertSame(StandardIndexes.openStruct(), OpenStructIndexPlugin.INSTANCE); + } + + @Test + public void testValidateRejectsUnsupportedPerKeyIndex() + throws Exception { + // 'h3' is not in the OPEN_STRUCT vetted subset; declaring it on a key must fail validation. + JsonNode indexes = JsonUtils.stringToJsonNode("{\"h3\": {}}"); + FieldConfig keyConfig = new FieldConfig.Builder("loc").withIndexes(indexes).build(); + // First constructor arg is `disabled` — pass false so the config is enabled and validation runs. + OpenStructIndexConfig config = new OpenStructIndexConfig(false, null, -1, null, 0.5, List.of(keyConfig)); + FieldIndexConfigs fieldIndexConfigs = + new FieldIndexConfigs.Builder().add(StandardIndexes.openStruct(), config).build(); + FieldSpec openStructSpec = new ComplexFieldSpec("payload", FieldSpec.DataType.OPEN_STRUCT, true, Map.of()); + + assertThrows(IllegalStateException.class, + () -> StandardIndexes.openStruct().validate(fieldIndexConfigs, openStructSpec, null)); + } + + @Test + public void testValidateAllowsVettedPerKeyIndexes() + throws Exception { + JsonNode indexes = JsonUtils.stringToJsonNode("{\"range\": {}, \"bloom\": {}, \"inverted\": {}}"); + FieldConfig keyConfig = new FieldConfig.Builder("clicks").withIndexes(indexes).build(); + OpenStructIndexConfig config = new OpenStructIndexConfig(false, null, -1, null, 0.5, List.of(keyConfig)); + FieldIndexConfigs fieldIndexConfigs = + new FieldIndexConfigs.Builder().add(StandardIndexes.openStruct(), config).build(); + FieldSpec openStructSpec = new ComplexFieldSpec("payload", FieldSpec.DataType.OPEN_STRUCT, true, Map.of()); + + // Must not throw. + StandardIndexes.openStruct().validate(fieldIndexConfigs, openStructSpec, null); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java index fbc278291275..84adb3b421c9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java @@ -178,6 +178,9 @@ public static class Column { // parentColumn = metrics public static final String PARENT_COLUMN = "parentColumn"; + // Whether this OPEN_STRUCT column has a sparse column for keys not materialized as dense. + public static final String HAS_SPARSE_COLUMN = "hasSparseColumn"; + /// Partition function, all optional public static final String PARTITION_FUNCTION = "partitionFunction"; public static final String NUM_PARTITIONS = "numPartitions"; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtil.java index 033918e437f9..8dcbd1734531 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtil.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtil.java @@ -19,14 +19,20 @@ package org.apache.pinot.segment.spi.index; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Sets; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; public class FieldIndexConfigsUtil { @@ -51,6 +57,48 @@ public static Map createIndexConfigsByColName( .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().build())); } + /** + * Builds a {@link FieldIndexConfigs} for a single column directly from one {@link FieldConfig}, without a + * {@link TableConfig} or {@link Schema}. The dictionary entry is derived from + * {@code fieldConfig.getEncodingType()} (RAW => disabled, otherwise default-enabled); every other index type is + * read from the modern {@code fieldConfig.getIndexes()} JSON (keyed by index pretty name), falling back to each + * type's default config. A {@code null} fieldConfig yields built-in defaults (dictionary enabled). + * + *

This reads only the modern {@code indexes} format and never legacy {@code IndexingConfig} lists, so it suits + * synthetic columns (e.g. OPEN_STRUCT materialized children) that exist in no schema. + */ + public static FieldIndexConfigs fromFieldConfig(@Nullable FieldConfig fieldConfig, FieldSpec fieldSpec) { + FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(); + boolean rawEncoded = fieldConfig != null && fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW; + builder.add(StandardIndexes.dictionary(), + rawEncoded ? DictionaryIndexConfig.DISABLED : DictionaryIndexConfig.DEFAULT); + JsonNode indexes = fieldConfig != null ? fieldConfig.getIndexes() : null; + for (IndexType indexType : IndexService.getInstance().getAllIndexes()) { + if (indexType.getId().equals(StandardIndexes.DICTIONARY_ID)) { + continue; + } + addConfigFromIndexes(builder, indexType, indexes); + } + return builder.build(); + } + + private static void addConfigFromIndexes(FieldIndexConfigs.Builder builder, + IndexType indexType, @Nullable JsonNode indexes) { + JsonNode node = indexes != null ? indexes.get(indexType.getPrettyName()) : null; + C config; + if (node != null) { + try { + config = JsonUtils.jsonNodeToObject(node, indexType.getIndexConfigClass()); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to parse '" + indexType.getPrettyName() + "' index config from FieldConfig", e); + } + } else { + config = indexType.getDefaultConfig(); + } + builder.add(indexType, config); + } + @FunctionalInterface public interface DeserializerProvider { ColumnConfigDeserializer get(IndexType indexType); diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtilTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtilTest.java new file mode 100644 index 000000000000..fdb0e2f0dcb6 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/FieldIndexConfigsUtilTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class FieldIndexConfigsUtilTest { + + private static final FieldSpec INT_SPEC = new DimensionFieldSpec("x", FieldSpec.DataType.INT, true); + + private IndexService _originalIndexService; + private StubIndexType _dictType; + private StubIndexType _rangeType; + private StubIndexType _bloomType; + + @BeforeMethod + public void setUp() { + _originalIndexService = IndexService.getInstance(); + _dictType = new StubIndexType(StandardIndexes.DICTIONARY_ID, StandardIndexes.DICTIONARY_ID); + _rangeType = new StubIndexType(StandardIndexes.RANGE_ID, "range"); + _bloomType = new StubIndexType(StandardIndexes.BLOOM_FILTER_ID, "bloom"); + installIndexService(_dictType, _rangeType, _bloomType); + } + + @AfterMethod + public void tearDown() { + IndexService.setInstance(_originalIndexService); + } + + @Test + public void testParsesIndexesAndRawEncoding() + throws Exception { + JsonNode indexes = JsonUtils.stringToJsonNode("{\"range\": {}, \"bloom\": {}}"); + FieldConfig fc = new FieldConfig.Builder("x") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withIndexes(indexes) + .build(); + + FieldIndexConfigs configs = FieldIndexConfigsUtil.fromFieldConfig(fc, INT_SPEC); + + // RAW encoding disables the dictionary — verify via the stored DictionaryIndexConfig constant. + IndexConfig dictConfig = configs.getConfig(_dictType); + assertTrue(dictConfig.isDisabled(), "RAW => dictionary disabled"); + + // range and bloom should be enabled because the JSON contained their pretty-name keys. + IndexConfig rangeConfig = configs.getConfig(_rangeType); + assertTrue(rangeConfig.isEnabled(), "range config from JSON should be enabled"); + + IndexConfig bloomConfig = configs.getConfig(_bloomType); + assertTrue(bloomConfig.isEnabled(), "bloom config from JSON should be enabled"); + } + + @Test + public void testNullFieldConfigUsesDictionaryDefault() { + FieldIndexConfigs configs = FieldIndexConfigsUtil.fromFieldConfig(null, INT_SPEC); + // null fieldConfig => dictionary should use DictionaryIndexConfig.DEFAULT (not disabled) + IndexConfig dictConfig = configs.getConfig(_dictType); + assertFalse(dictConfig.isDisabled(), "null => dictionary enabled"); + } + + private void installIndexService(StubIndexType... types) { + Set> plugins = new HashSet<>(); + int priority = 0; + for (StubIndexType type : types) { + final int p = priority++; + final StubIndexType t = type; + plugins.add(new IndexPlugin() { + @Override + public StubIndexType getIndexType() { + return t; + } + + @Override + public int getPriority() { + return p; + } + }); + } + IndexService.setInstance(new IndexService(plugins)); + } + + private static final class StubIndexType implements IndexType { + private final String _id; + private final String _prettyName; + + StubIndexType(String id, String prettyName) { + _id = id; + _prettyName = prettyName; + } + + @Override + public String getId() { + return _id; + } + + @Override + public String getPrettyName() { + return _prettyName; + } + + @Override + public Class getIndexConfigClass() { + return IndexConfig.class; + } + + @Override + public IndexConfig getDefaultConfig() { + return IndexConfig.DISABLED; + } + + @Override + public Map getConfig(TableConfig tableConfig, Schema schema) { + return Map.of(); + } + + @Override + public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) { + throw new UnsupportedOperationException(); + } + + @Override + public IndexReaderFactory getReaderFactory() { + throw new UnsupportedOperationException(); + } + + @Override + public List getFileExtensions(@Nullable ColumnMetadata columnMetadata) { + return List.of(); + } + + @Override + public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, + Map configsByCol, Schema schema, TableConfig tableConfig) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean requiresDictionary(FieldSpec fieldSpec, IndexConfig indexConfig) { + return false; + } + + @Override + public boolean shouldInvalidateOnDictionaryChange(FieldSpec fieldSpec, IndexConfig indexConfig) { + return false; + } + + @Override + public void convertToNewFormat(TableConfig tableConfig, Schema schema) { + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructNaming.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructNaming.java index dc0bc138d3ec..a0061c6d1a40 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructNaming.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructNaming.java @@ -36,4 +36,41 @@ public static String materializedColumnName(String openStructColumn, String key) public static String sparseColumnName(String openStructColumn) { return openStructColumn + SEPARATOR + SPARSE_SUFFIX; } + + /// Returns true if the given column name is a materialized OPEN_STRUCT child column + /// (dense materialized key or the sparse JSON column). + public static boolean isMaterializedOpenStructColumn(String columnName) { + return columnName.indexOf(SEPARATOR.charAt(0)) > 0; + } + + /// Returns true if the given column name is the sparse JSON column for some + /// OPEN_STRUCT parent. + public static boolean isSparseColumn(String columnName) { + int sep = columnName.indexOf(SEPARATOR.charAt(0)); + return sep > 0 && SPARSE_SUFFIX.equals(columnName.substring(sep + 1)); + } + + /// Returns the parent OPEN_STRUCT column name for a materialized child column. + /// Throws IllegalArgumentException if the input is not a materialized child column. + public static String parseParentColumn(String materializedColumnName) { + int sep = materializedColumnName.indexOf(SEPARATOR.charAt(0)); + if (sep <= 0) { + throw new IllegalArgumentException("Not a materialized OPEN_STRUCT column: " + materializedColumnName); + } + return materializedColumnName.substring(0, sep); + } + + /// Returns the key portion of a materialized dense column name. Throws + /// IllegalArgumentException for the sparse column or non-materialized names. + public static String parseKey(String materializedColumnName) { + int sep = materializedColumnName.indexOf(SEPARATOR.charAt(0)); + if (sep <= 0) { + throw new IllegalArgumentException("Not a materialized OPEN_STRUCT column: " + materializedColumnName); + } + String key = materializedColumnName.substring(sep + 1); + if (SPARSE_SUFFIX.equals(key)) { + throw new IllegalArgumentException("Sparse column has no key: " + materializedColumnName); + } + return key; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructTypeInference.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructTypeInference.java new file mode 100644 index 000000000000..7cfab573fb2f --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/OpenStructTypeInference.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import javax.annotation.Nullable; +import org.apache.pinot.spi.utils.PinotDataType; + + +/// Infers the {@link FieldSpec.DataType} for an OPEN_STRUCT key from raw ingested values when the key has +/// no declared child {@link FieldSpec}. This is OPEN_STRUCT-specific policy (it keeps TIMESTAMP, folds +/// DATE/TIME/UUID to STRING, widens BYTE/CHARACTER/SHORT to INT, and returns {@code null} for values that +/// cannot be represented as a stored column type), distinct from the JSON-node-based inference in +/// {@code JsonUtils.valueOf}. +public final class OpenStructTypeInference { + private OpenStructTypeInference() { + } + + /// Infers the {@link FieldSpec.DataType} from a raw ingested value. Returns {@code null} when the value + /// cannot be represented as a stored column type; callers decide whether to drop the entry or fall back + /// to a default (e.g. STRING). + @Nullable + public static FieldSpec.DataType inferDataType(Object rawValue) { + switch (PinotDataType.getSingleValueType(rawValue)) { + case INTEGER: + case BYTE: + case CHARACTER: + case SHORT: + return FieldSpec.DataType.INT; + case LONG: + return FieldSpec.DataType.LONG; + case FLOAT: + return FieldSpec.DataType.FLOAT; + case DOUBLE: + return FieldSpec.DataType.DOUBLE; + case BIG_DECIMAL: + return FieldSpec.DataType.BIG_DECIMAL; + case BOOLEAN: + return FieldSpec.DataType.BOOLEAN; + case TIMESTAMP: + return FieldSpec.DataType.TIMESTAMP; + case STRING: + case DATE: + case TIME: + case UUID: + return FieldSpec.DataType.STRING; + case BYTES: + return FieldSpec.DataType.BYTES; + default: + return null; + } + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructNamingTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructNamingTest.java index 6adad485c391..5c057d4acac8 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructNamingTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructNamingTest.java @@ -21,17 +21,55 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class OpenStructNamingTest { @Test public void testMaterializedColumnName() { - assertEquals(OpenStructNaming.materializedColumnName("metrics", "tenancy"), "metrics$tenancy"); + assertEquals(OpenStructNaming.materializedColumnName("metrics", "clicks"), "metrics$clicks"); } @Test public void testSparseColumnName() { assertEquals(OpenStructNaming.sparseColumnName("metrics"), "metrics$__sparse__"); } + + @Test + public void testIsMaterializedOpenStructColumn() { + assertTrue(OpenStructNaming.isMaterializedOpenStructColumn("metrics$clicks")); + assertTrue(OpenStructNaming.isMaterializedOpenStructColumn("metrics$__sparse__")); + assertFalse(OpenStructNaming.isMaterializedOpenStructColumn("metrics")); + assertFalse(OpenStructNaming.isMaterializedOpenStructColumn("plain_column")); + } + + @Test + public void testIsSparseColumn() { + assertTrue(OpenStructNaming.isSparseColumn("metrics$__sparse__")); + assertFalse(OpenStructNaming.isSparseColumn("metrics$clicks")); + assertFalse(OpenStructNaming.isSparseColumn("metrics")); + } + + @Test + public void testParseParentColumn() { + assertEquals(OpenStructNaming.parseParentColumn("metrics$clicks"), "metrics"); + assertEquals(OpenStructNaming.parseParentColumn("metrics$__sparse__"), "metrics"); + } + + @Test + public void testParseKey() { + assertEquals(OpenStructNaming.parseKey("metrics$clicks"), "clicks"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseKeyRejectsSparse() { + OpenStructNaming.parseKey("metrics$__sparse__"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseKeyRejectsNonMaterialized() { + OpenStructNaming.parseKey("metrics"); + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructTypeInferenceTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructTypeInferenceTest.java new file mode 100644 index 000000000000..3d67eba7514a --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/OpenStructTypeInferenceTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Map; +import java.util.UUID; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class OpenStructTypeInferenceTest { + + @DataProvider(name = "inferenceCases") + public Object[][] inferenceCases() { + return new Object[][]{ + // Integral types all widen to INT. + {42, DataType.INT}, + {(byte) 1, DataType.INT}, + {'a', DataType.INT}, + {(short) 7, DataType.INT}, + {42L, DataType.LONG}, + {3.14f, DataType.FLOAT}, + {3.14d, DataType.DOUBLE}, + {new BigDecimal("1.23"), DataType.BIG_DECIMAL}, + {true, DataType.BOOLEAN}, + {new Timestamp(0L), DataType.TIMESTAMP}, + {"hello", DataType.STRING}, + // Temporal and UUID values fold to STRING. + {LocalDate.of(2026, 6, 2), DataType.STRING}, + {LocalTime.of(12, 0), DataType.STRING}, + {UUID.randomUUID(), DataType.STRING}, + {new byte[]{1, 2, 3}, DataType.BYTES}, + // Unrepresentable values return null so callers can drop or default. + {Map.of("k", "v"), null}, + {new Object(), null}, + }; + } + + @Test(dataProvider = "inferenceCases") + public void testInferDataType(Object rawValue, DataType expected) { + assertEquals(OpenStructTypeInference.inferDataType(rawValue), expected); + } +}