Skip to content

[Arrow] Column-major segment build from Arrow IPC files via the ColumnReader SPI #18629

@real-mj-song

Description

@real-mj-song

Summary

This issue proposes adding a column-major ingestion path for Apache Arrow IPC files by implementing the ColumnReader SPI (introduced in #16727) inside the existing pinot-arrow plugin. The new ArrowColumnReaderFactory would let SegmentIndexCreationDriverImpl.buildColumnar() consume an Arrow IPC file directly, one FieldVector at a time, instead of pivoting through per-row GenericRow materialization. The existing row-major ArrowRecordReader is unchanged and remains the default; the new path is opt-in.


Background

pinot-arrow today exposes Arrow IPC files only through ArrowRecordReader, a row-major reader. The file is opened with ArrowFileReader, and each row is copied out of the columnar VectorSchemaRoot into a GenericRow for the driver. The standard SegmentIndexCreationDriverImpl.build() then routes each row back into per-column stats collectors and forward-index creators. The source format is columnar and the segment storage is columnar, but the ingestion path between them is row-major.

The ColumnReader / ColumnReaderFactory SPI added in #16727 enables a column-major alternative: SegmentIndexCreationDriverImpl.buildColumnar() walks one column at a time, handing each value directly to the per-column writers. At present this SPI has only one in-tree consumer — the segment-to-segment converter — and no consumer that reads from a non-Pinot source.

pinot-arrow is a natural first non-segment consumer: an Arrow FieldVector already exposes positional access (getObject(int), typed accessors, null bitmap), which lines up directly with the ColumnReader interface. No restructuring of the source is required.


Proposal

Add two source files under pinot-plugins/pinot-input-format/pinot-arrow:

  • ArrowColumnReaderFactory implements ColumnReaderFactory — opens an Arrow IPC file via ArrowFileReader, accumulates the file's record batches into per-column FieldVectors via TransferPair.copyValueSafe, and exposes one ColumnReader per requested target-schema column. The Arrow allocator limit is configurable via arrowAllocatorLimit.
  • ArrowColumnReader implements ColumnReader — wraps a single FieldVector and implements the three documented access patterns: generic sequential next() with null checks, typed sequential nextInt() / nextLong() / ... with isNextNull() and skipNext(), and random access by docId. Single-value and multi-value (List of primitive) variants are covered.

Both readers must agree on Arrow → Pinot type conversion (e.g. Utf8 / LargeUtf8 must unwrap from org.apache.arrow.vector.util.Text to String; List must materialize to Object[]; temporal types must follow the extractRawTimeValues flag). To avoid drift between the row-major and column-major readers, the proposal extracts ArrowRecordExtractor.convert() and its helpers into a shared utility (ArrowToPinotTypeConverter) and routes both ArrowRecordExtractor.extract and ArrowColumnReader.getValue through it. This is a behavior-preserving refactor for the row-major path.

After the change, two paths coexist under pinot-arrow:

Path Driver entry Input adapter
Row-major (existing, unchanged) driver.build() ArrowRecordReader (per-row GenericRow)
Column-major (new, opt-in) driver.build()buildColumnar() ArrowColumnReaderFactory (per-column ColumnReader)

The row-major path remains the default. The column-major path is selected by constructing the driver with a ColumnReaderFactory instead of a RecordReader. Both share the same Arrow → Pinot type conversion through ArrowToPinotTypeConverter.


Architectural gap: null handling in buildColumnar() (separate prerequisite)

The column-major driver path skips TransformPipeline, which on the row-major side runs NullValueTransformer to substitute FieldSpec.getDefaultNullValue() for null field values before they reach the typed stats collectors and index creators. Those downstream consumers — IntColumnPreIndexStatsCollector, StringColumnPreIndexStatsCollector, etc. — follow apache Pinot's established collect(Object entry) cast convention (the same convention used throughout the row-major path) and assume non-null input per their FieldSpec-declared type.

The only existing in-tree consumer of buildColumnar() is the segment-to-segment converter, which reads from a built Pinot segment where null-value substitution already happened during the original segment build. So it never trips the gap. An Arrow-fed columnar path — or any other non-segment column-major source — delivers raw values including null for null rows, which the typed collectors then NPE on (((Integer) entry).intValue() with a null entry).

The fix isn't in the collectors — their cast convention is the established apache Pinot style and is consistent with the row-major path. The fix is to give the column-major driver an equivalent of NullValueTransformer — most naturally a one-line substitution in ColumnarSegmentPreIndexStatsContainer (and the analogous column-major index-write code path) that replaces any null value with fieldSpec.getDefaultNullValue() before dispatching to the typed collector / index creator.

This is out of scope for this proposal but is a strict prerequisite for any non-segment-to-segment consumer of buildColumnar() (including the Arrow path this proposal adds) to handle realistic data with nulls. Worth tracking as a separate apache/pinot issue alongside this proposal.


How this can be leveraged

This is a building block for batch ingestion pipelines whose upstream stage operates on columnar data. Pipelines that can emit Arrow IPC shards can hand those shards to ArrowColumnReaderFactory and drive segment construction via buildColumnar() instead of replaying each shard row-by-row through ArrowRecordReader. The structural change is that the segment-build phase consumes one column at a time in bulk rather than one row at a time, so the per-row GenericRow allocation and per-primitive boxing inside the driver are no longer on the path.

The proposal does not change the pinot-batch-ingestion-spark-3 plugin, which today doesn't read Arrow at all. Wiring this path into that plugin (or any other batch framework) is a separate follow-up.


Compatibility and rollout

The change is purely additive:

  • ArrowRecordReader and the row-major path are unchanged.
  • Users opt in by constructing the driver via the ColumnReaderFactory-accepting overload of SegmentIndexCreationDriverImpl.init.
  • The shared converter extraction does not change row-major behavior.
  • No SPI changes — the new code consumes the already-merged ColumnReader / ColumnReaderFactory interfaces.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions