Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
72cf061
[WIP] OPEN_STRUCT storage layer — columnar two-tier dense/sparse inde…
tarun11Mavani Jun 1, 2026
d58308e
fix(open_struct): support BIG_DECIMAL in dense materialization
tarun11Mavani Jun 1, 2026
8b4c0df
refactor(stats): add FieldSpec-based constructor to AbstractColumnSta…
tarun11Mavani Jun 1, 2026
8d438bf
refactor(stats): add FieldSpec-based collector factory overload
tarun11Mavani Jun 1, 2026
a3621d4
refactor(open_struct): build dense dictionary and stats from a column…
tarun11Mavani Jun 1, 2026
ea87ba3
refactor(open_struct): emit dense child metadata via shared addColumn…
tarun11Mavani Jun 1, 2026
c2aa947
refactor(open_struct): size dict-vs-raw from collector stats; drop _d…
tarun11Mavani Jun 1, 2026
d8bd6d5
refactor(open_struct): drop redundant dictionaryElementSize alias
tarun11Mavani Jun 1, 2026
4e778ab
refactor(open_struct): write dense child indexes via standard index c…
tarun11Mavani Jun 1, 2026
302460b
refactor(open_struct): extract dict decision and index writing from w…
tarun11Mavani Jun 2, 2026
25e7bf9
refactor(open_struct): move inferDataType out of OpenStructNaming int…
tarun11Mavani Jun 2, 2026
8fe4042
test(open_struct): add unit tests for OpenStructTypeInference.inferDa…
tarun11Mavani Jun 2, 2026
c895874
feat(open_struct): add FieldIndexConfigsUtil.fromFieldConfig (per-Fie…
tarun11Mavani Jun 2, 2026
2a37cf9
feat(open_struct): validate per-key indexes against vetted allowlist …
tarun11Mavani Jun 2, 2026
a7063ff
feat(open_struct): build dense-key indexes via a generic creator loop…
tarun11Mavani Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,6 +36,7 @@
import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.map.ImmutableMapDataSource;
import org.apache.pinot.segment.local.segment.index.openstruct.ImmutableOpenStructDataSource;
import org.apache.pinot.segment.local.segment.index.readers.text.MultiColumnLuceneTextIndexReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
Expand All @@ -49,6 +52,7 @@
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
Expand All @@ -58,7 +62,9 @@
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.data.ComplexFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.OpenStructNaming;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
Expand Down Expand Up @@ -98,13 +104,44 @@ public ImmutableSegmentImpl(
_dataSources =
new Object2ObjectOpenHashMap<>(segmentMetadata.getColumnMetadataMap().size());

Map<String, Map<String, DataSource>> openStructDenseChildren = new HashMap<>();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Map.of() for all

Map<String, DataSource> openStructSparseChildren = new HashMap<>();
Set<String> openStructParents = new HashSet<>();

for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
String colName = entry.getKey();
ColumnMetadata columnMetadata = entry.getValue();

if (columnMetadata instanceof ColumnMetadataImpl && ((ColumnMetadataImpl) columnMetadata).isMaterializedChild()) {
String parent = ((ColumnMetadataImpl) columnMetadata).getParentColumn();
openStructParents.add(parent);
DataSource childDs = new ImmutableDataSource(columnMetadata, _indexContainerMap.get(colName));
if (OpenStructNaming.isSparseColumn(colName)) {
openStructSparseChildren.put(parent, childDs);
} else {
openStructDenseChildren.computeIfAbsent(parent, k -> new HashMap<>())
.put(OpenStructNaming.parseKey(colName), childDs);
}
continue;
}

if (columnMetadata.getFieldSpec().getDataType() == FieldSpec.DataType.MAP) {
_dataSources.put(colName, new ImmutableMapDataSource(entry.getValue(), _indexContainerMap.get(colName)));
_dataSources.put(colName, new ImmutableMapDataSource(columnMetadata, _indexContainerMap.get(colName)));
} else {
_dataSources.put(colName, new ImmutableDataSource(entry.getValue(), _indexContainerMap.get(colName)));
_dataSources.put(colName, new ImmutableDataSource(columnMetadata, _indexContainerMap.get(colName)));
}
}

if (!openStructParents.isEmpty()) {
Schema schema = segmentMetadata.getSchema();
for (String parent : openStructParents) {
FieldSpec fieldSpec = schema != null ? schema.getFieldSpecFor(parent) : null;
if (!(fieldSpec instanceof ComplexFieldSpec)) {
continue;
}
_dataSources.put(parent, new ImmutableOpenStructDataSource((ComplexFieldSpec) fieldSpec,
openStructDenseChildren.getOrDefault(parent, Map.of()),
openStructSparseChildren.get(parent), segmentMetadata.getTotalDocs()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.OpenStructNaming;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ReadMode;
Expand Down Expand Up @@ -216,6 +217,17 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad
if (schema != null) {
Set<String> columnsInMetadata = new HashSet<>(columnMetadataMap.keySet());
columnsInMetadata.removeIf(schema::hasColumn);
// Materialized OPEN_STRUCT child columns (col$key, col$__sparse__) live in segment metadata
// but not in the user-facing schema. Keep them when the parent OPEN_STRUCT column is in the
// schema; they will be grouped under their parent at segment-impl post-load (Task 16).
columnsInMetadata.removeIf(col -> {
if (!OpenStructNaming.isMaterializedOpenStructColumn(col)) {
return false;
}
String parent = OpenStructNaming.parseParentColumn(col);
return schema.hasColumn(parent)
&& schema.getFieldSpecFor(parent).getDataType() == FieldSpec.DataType.OPEN_STRUCT;
});
if (!columnsInMetadata.isEmpty()) {
LOGGER.info("Skip loading columns only exist in metadata but not in schema: {}", columnsInMetadata);
for (String column : columnsInMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource;
import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructDataSource;
import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructIndex;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
Expand Down Expand Up @@ -107,6 +109,7 @@
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig;
import org.apache.pinot.spi.config.table.OpenStructIndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
Expand Down Expand Up @@ -302,7 +305,8 @@ public MultiColumnTextMetadata getMultiColumnTextMetadata() {

Set<IndexType> specialIndexes =
Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract
StandardIndexes.nullValueVector()); // null value vector implements other contract
StandardIndexes.nullValueVector(), // null value vector implements other contract
StandardIndexes.openStruct()); // open-struct is constructed out-of-band below

// Initialize for each column
boolean hasColumnWithReuseMutableTextIndex = false;
Expand Down Expand Up @@ -428,6 +432,15 @@ public MultiColumnTextMetadata getMultiColumnTextMetadata() {
}
}

if (dataType == DataType.OPEN_STRUCT && fieldSpec instanceof ComplexFieldSpec) {
IndexConfig openStructConfig = indexConfigs.getConfig(StandardIndexes.openStruct());
if (openStructConfig instanceof OpenStructIndexConfig && openStructConfig.isEnabled()) {
MutableOpenStructIndex openStructIndex = new MutableOpenStructIndex(column, (ComplexFieldSpec) fieldSpec,
(OpenStructIndexConfig) openStructConfig, _memoryManager, _capacity);
mutableIndexes.put(StandardIndexes.openStruct(), openStructIndex);
}
}

_indexContainerMap.put(column,
new IndexContainer(fieldSpec, partitionFunction, partitions, new ValuesInfo(), mutableIndexes, dictionary,
nullValueVector, sourceColumn, valueAggregator));
Expand Down Expand Up @@ -569,7 +582,7 @@ private <C extends IndexConfig> void addMutableIndex(Map<IndexType, MutableIndex
*/
private boolean isNoDictionaryColumn(FieldIndexConfigs indexConfigs, FieldSpec fieldSpec, String column) {
DataType dataType = fieldSpec.getDataType();
if (dataType == DataType.MAP) {
if (dataType == DataType.MAP || dataType == DataType.OPEN_STRUCT) {
return true;
}
if (indexConfigs == null) {
Expand Down Expand Up @@ -941,6 +954,17 @@ private void addNewRow(int docId, GenericRow row) {
// Update numValues info
indexContainer._valuesInfo.updateSVNumValues();

// Route OPEN_STRUCT values to the dedicated mutable index. OPEN_STRUCT has no forward
// index / dictionary / min-max, so the standard per-IndexType loop and the comparable
// tracking below would be no-ops at best and crash at worst (Map is not Comparable).
if (dataType == DataType.OPEN_STRUCT) {
MutableIndex openStructIndex = indexContainer._mutableIndexes.get(StandardIndexes.openStruct());
if (openStructIndex != null) {
openStructIndex.add(value, -1, docId);
}
continue;
}

// Update indexes
int dictId = indexContainer._dictId;
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) {
Expand Down Expand Up @@ -1310,6 +1334,20 @@ public void commit() {
}
}

/**
* Returns the per-column mutable OPEN_STRUCT index, or {@code null} if the column is not OPEN_STRUCT
* or the index has not been initialized.
*/
@Nullable
public MutableOpenStructIndex getOpenStructIndex(String column) {
IndexContainer container = _indexContainerMap.get(column);
if (container == null) {
return null;
}
MutableIndex index = container._mutableIndexes.get(StandardIndexes.openStruct());
return index instanceof MutableOpenStructIndex ? (MutableOpenStructIndex) index : null;
}

@Override
public void offload() {
if (_partitionUpsertMetadataManager != null) {
Expand Down Expand Up @@ -1695,7 +1733,10 @@ private class IndexContainer implements Closeable {
@Nullable Set<Integer> partitions, ValuesInfo valuesInfo, Map<IndexType, MutableIndex> mutableIndexes,
@Nullable MutableDictionary dictionary, @Nullable MutableNullValueVector nullValueVector,
@Nullable String sourceColumn, @Nullable ValueAggregator valueAggregator) {
Preconditions.checkArgument(mutableIndexes.containsKey(StandardIndexes.forward()), "Forward index is required");
Preconditions.checkArgument(
mutableIndexes.containsKey(StandardIndexes.forward())
|| mutableIndexes.containsKey(StandardIndexes.openStruct()),
"Forward index or OPEN_STRUCT index is required");
_fieldSpec = fieldSpec;
_mutableIndexes = mutableIndexes;
_dictionary = dictionary;
Expand All @@ -1708,6 +1749,11 @@ private class IndexContainer implements Closeable {
}

DataSource toDataSource() {
if (_fieldSpec.getDataType() == DataType.OPEN_STRUCT) {
MutableIndex idx = _mutableIndexes.get(StandardIndexes.openStruct());
return new MutableOpenStructDataSource((ComplexFieldSpec) _fieldSpec, (MutableOpenStructIndex) idx,
_numDocsIndexed);
}
if (_fieldSpec.getDataType() == MAP) {
return new MutableMapDataSource(_fieldSpec, _numDocsIndexed, _valuesInfo._numValues,
_valuesInfo._maxNumValuesPerMVEntry, _dictionary == null ? -1 : _dictionary.length(), _partitionFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.segment.local.segment.creator.impl.stats.EmptyColumnStatistics;
import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource;
import org.apache.pinot.segment.local.segment.index.openstruct.MutableOpenStructDataSource;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsContainer;
Expand Down Expand Up @@ -78,6 +79,13 @@ private ColumnStatistics createColumnStatistics(DataSource dataSource, @Nullable
if (dataSource instanceof MutableMapDataSource) {
return createMapColumnStatistics(dataSource, validDocIds, statsCollectorConfig);
}
// OPEN_STRUCT parent columns have no forward index of their own — per-key stats are recomputed
// by the splitter at offline-segment build time. Return EmptyColumnStatistics so the standard
// dispatch (which would call getForwardIndex / getDictionary) doesn't NPE.
if (dataSource instanceof MutableOpenStructDataSource) {
return new EmptyColumnStatistics(dataSourceMetadata.getFieldSpec(), dataSourceMetadata.getPartitionFunction(),
dataSourceMetadata.getPartitions());
}
if (validDocIds != null) {
if (dataSource.getDictionary() != null) {
return new CompactedColumnStatistics(dataSource, sortedDocIds, isSortedColumn, validDocIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.TextIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ColumnarOpenStructIndexCreator;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
Expand Down Expand Up @@ -555,6 +556,22 @@ protected void writeMetadata()
hasDictionary, dictionaryElementSize, fwdConfig.getEncodingType(), false);
}

// OPEN_STRUCT splitters produce per-key materialized child columns (col$key, col$__sparse__) that
// are not in _columnStatisticsMap. Merge their pre-built metadata into the segment properties so
// each child appears as its own column at load time.
for (ColumnIndexCreators columnIndexCreators : _colIndexes.values()) {
for (IndexCreator indexCreator : columnIndexCreators.getIndexCreators()) {
if (indexCreator instanceof ColumnarOpenStructIndexCreator) {
ColumnarOpenStructIndexCreator splitter = (ColumnarOpenStructIndexCreator) indexCreator;
for (Map.Entry<String, PropertiesConfiguration> childEntry
: splitter.getMaterializedColumnMetadata().entrySet()) {
PropertiesConfiguration childProps = childEntry.getValue();
childProps.getKeys().forEachRemaining(key -> properties.setProperty(key, childProps.getProperty(key)));
}
}
}
}

SegmentZKPropsConfig segmentZKPropsConfig = _config.getSegmentZKPropsConfig();
if (segmentZKPropsConfig != null) {
properties.setProperty(Realtime.START_OFFSET, segmentZKPropsConfig.getStartOffset());
Expand Down
Loading
Loading