Skip to content

feat(partitioning): support multiple partition columns#468

Open
LuciferYang wants to merge 3 commits intolance-format:mainfrom
LuciferYang:issue-429-multi-column-partitioning
Open

feat(partitioning): support multiple partition columns#468
LuciferYang wants to merge 3 commits intolance-format:mainfrom
LuciferYang:issue-429-multi-column-partitioning

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

Summary

Extends lance.partition.columns from a single column to a comma-separated list. When every declared column is partition-constant per fragment with matching coverage, the connector now reports a multi-key KeyGroupedPartitioning so SPJ can join on the full tuple without a shuffle. This is the read-side half of the work that #445 started on the write side: a table declaring lance.partition.columns='region,year' now writes fragments partitioned by both columns and reads report a 2-key grouping.

Closes #429.

Changes

ZonemapFragmentPruner.PartitionInfo now holds a List<String> of column names, a parallel List<DataType> of column types, and a Map<Integer, Comparable<?>[]> of per-fragment tuples instead of a single column plus scalar map. The constructor checks both the names/types width invariant and the tuple-width invariant, and copies tuples into immutable storage.

A few helpers were added: forSingleColumn is a factory so the existing single-column call site stays ergonomic; restrictTo narrows the info to a surviving fragment set after filter pushdown; withSoftCapped marks the cap branch. serialVersionUID was bumped from 1L (the single-column shape on upstream main) to 2L because both the field set and the map value type changed. TABLE_OPT_PARTITION_COLUMNS is reused from LanceConstant (added by #445) instead of redeclared here.

Parsing lives in LanceScanBuilder.parsePartitionColumns. It tokenizes on ,, trims, drops empties, and dedupes with a WARN — first occurrence is kept rather than hard-failing, so a table accidentally carrying region,region today doesn't regress. Nested paths are rejected until we pick a quoting syntax.

The type whitelist is Boolean, Byte, Short, Int, Long, String, Date, and Timestamp. PartitionInfo.toSparkValue is type-aware: I verified against lance-core 6.0.0-beta.1 that ZoneStats.getMin/getMax returns java.lang.Long for every integral Arrow width (int8/16/32 included) and for Date (epoch-days) / Timestamp (epoch-micros). The converter narrows Long → byte/short/int for Byte/Short/Int/Date slots, passes through for Long/Timestamp, and wraps strings in UTF8String; anything else throws because detection never lets it reach this code path. This also fixes a latent ClassCastException in the single-column path: the old pass-through would hand a Long to Spark's InternalRow.getInt/getShort/getByte for Byte/Short/Int columns. Float, Double, Decimal, and complex types are rejected outright because NaN and scale-drift break tuple equality. Lookup uses fullSchema (not the pruned schema) with a fieldIndex try/catch so column pruning and typos fall back cleanly instead of throwing.

detectPartitioning runs the existing per-column zone-constancy check, then requires all per-column fragment sets to be identical. If a column's coverage differs, any uncovered fragment would end up with a null-filled tuple at scan time, which Spark's SPJ would mis-group — so the whole detection rejects. The mismatch WARN iterates in declaration order so diagnostics are deterministic. Per-column Spark types are resolved from fullSchema at detection time and threaded into PartitionInfo alongside the tuples.

Filter pushdown narrows PartitionInfo via restrictTo before the scan is built. restrictTo drops the soft-cap flag since the cap is size-dependent, and the builder re-applies it if the restricted set is still over the threshold. build() is wrapped in a try/finally so the lazily-opened dataset closes when any helper throws.

LanceScan.outputPartitioning emits Expression[] of FieldReference in declaration order. It returns UnknownPartitioning when the info is soft-capped, when spark.lance.partition.reporting.enabled=false, or when SparkVersionUtil.supportsMultiKeySpj() gates off N>1 keys. The gate is version-string based: it returns true for 3.5.x and for any major >= 4, and false otherwise. So Spark 4.x and 5.x both pass the gate without code changes; we've only actually exercised it on 3.5.x in CI. Everything else (3.4.x, custom forks, unparseable version strings) falls back to the safe single-key-or-Unknown path, and single-key SPJ is unchanged everywhere.

Configs

spark.lance.partition.reporting.maxPartitions (default 10000) — scans with more partitions than this report UnknownPartitioning. The threshold is drawn from Spark's own planner scale: spark.sql.shuffle.partitions defaults to 200 and AQE coalescing targets low thousands, so above ~10k the driver-side planning cost (O(partitions × columns) for key-array materialization and matching) starts to eat the SPJ win.

spark.lance.partition.reporting.enabled (default true) is a session-level escape hatch. Flip it to disable SPJ reporting entirely without editing table metadata.

Tests

PartitionInfoTest covers constructor invariants (including the new names-vs-types width check), defensive copies, forSingleColumn equivalence to the list form, restrictTo and withSoftCapped (both preserve the types list), multi-column key-row width, a Java-serialization round-trip, a per-type encoding suite that asserts Long → byte/short/int narrowing for Byte/Short/Int/Date, pass-through for Boolean/Long/Timestamp, and UTF8String wrapping for String, and a guard that a non-whitelisted type reaching toSparkValue throws IllegalArgumentException.

LanceScanBuilderTest picks up six parser cases (unknown column, nested path, whitespace, empty, pure-delimiter, unsupported type), a combined dedupe + whitespace-trim + declaration-order assertion, and two detectPartitioning coverage checks (identical vs mismatched, with tuple-value and column-type assertions). LanceScanTest adds multi-column KeyGroupedPartitioning reporting and the soft-cap Unknown fallback. ZonemapFragmentPrunerTest was migrated to the new API. SparkVersionUtilTest is new: two table-driven tests cover the version-string allowlist (3.5.x incl. snapshot/rc/vendor suffixes, 4+/5+ major) and the denylist (3.4.x and earlier, conservative 3.6+, plus null / empty / no-dot / leading-dot / non-numeric major).

End-to-end SPJ validation across Spark versions isn't exercised here. It needs a working btree-zonemap index setup, which is currently blocked by a describeIndices retrain limitation in lance-core 6.0.0-beta.1 — tracked separately.

)

Extend `lance.partition.columns` TBLPROPERTY from a single column name to a
comma-separated list of columns. When every declared column is partition-
constant per fragment with identical coverage, the connector reports a
multi-key `KeyGroupedPartitioning` so Spark's SPJ can join on the full key
tuple without a shuffle.

Core changes
- `PartitionInfo` refactored from `(columnName, Map<Integer, Comparable<?>>)`
  to `(List<String>, Map<Integer, Comparable<?>[]>)` with constructor-
  enforced width invariants, defensive tuple clones, `restrictTo`,
  `withSoftCapped`, and `forSingleColumn` factory. `serialVersionUID`
  bumped to 2L for the shape change.
- `LanceScanBuilder.parsePartitionColumns` tokenizes on `,`, trims, drops
  empties, dedupes with WARN (preserving first), rejects nested paths.
- Type whitelist restricted to types that pass through Spark's InternalRow
  encoding without a converter (Boolean/Byte/Short/Int/Long/String).
  Date/Timestamp deferred pending JNI-returned-class pinning; Float/Double
  /Decimal/complex rejected for NaN/scale/equality reasons.
- `detectPartitioning` requires identical per-column fragment coverage;
  strict-subset intersection is rejected to avoid phantom null-key SPJ
  groups. Iterates in declaration order for deterministic diagnostics.
- Filter-pushdown path restricts `PartitionInfo` to the surviving fragment
  set; soft-cap re-evaluated against the restricted size.
- `LanceScan.outputPartitioning` falls back to `UnknownPartitioning` when
  soft-capped, disabled via `spark.lance.partition.reporting.enabled`, or
  gated by `SparkVersionUtil.supportsMultiKeySpj()` (allowlist: 3.5.x, 4+).
- `LanceScanBuilder.build()` wrapped in try/finally so the lazily-opened
  dataset is closed on exception paths.

Configs
- `spark.lance.partition.reporting.maxPartitions` (default 10_000): soft
  cap; above this the scan reports `UnknownPartitioning`.
- `spark.lance.partition.reporting.enabled` (default true): global escape
  hatch for SPJ reporting.

Tests
- New `PartitionInfoTest` (12 tests): invariants, defensive copies,
  factory equivalence, `restrictTo`, `withSoftCapped`, Java serialization
  round-trip, immutability.
- New `SparkVersionUtil` helper for the 3.4-vs-3.5+ multi-key gate.
- `LanceScanTest`: multi-column `KeyGroupedPartitioning`, soft-cap Unknown
  fallback.
- `LanceScanBuilderTest`: unknown column, nested path, whitespace, empty,
  delimiter-only, unsupported type — all fall back cleanly. Mismatched
  per-column coverage rejected; identical coverage accepted with correct
  tuple values.
- `ZonemapFragmentPrunerTest` migrated to new API (`forSingleColumn`,
  `getColumnNames`, `getFragmentPartitionKeys`).

316 tests pass on `lance-spark-base_2.12` (up from 311).
PartitionInfo.toSparkValue previously passed ZoneStats values through
unchanged, assuming the returned Comparable<?> already matched Spark's
InternalRow encoding. Verified against lance-core 6.0.0-beta.1 that
ZoneStats.getMin/getMax returns java.lang.Long for every integral Arrow
width (int8/16/32 included) and for Date (epoch-days) / Timestamp
(epoch-micros) - so the pass-through would hand a Long to Spark's
InternalRow.getByte/getShort/getInt on Byte/Short/Int columns and
ClassCastException at scan time.

Make toSparkValue type-dispatched: carry a parallel List<DataType>
alongside columnNames and narrow Long -> byte/short/int for
Byte/Short/Int/Date slots, pass through for Long/Timestamp, wrap
Strings in UTF8String, Booleans pass through. This also closes the
Date/Timestamp deferral since narrowing handles both cleanly.

Changes
- ZonemapFragmentPruner.PartitionInfo: add columnTypes field, bump
  serialVersionUID 2L -> 3L, type-aware toSparkValue, preserve types
  through restrictTo / withSoftCapped. forSingleColumn takes a DataType.
- LanceScanBuilder.isSupportedPartitionType: add DateType, TimestampType
  to the whitelist; WARN message updated.
- LanceScanBuilder.detectPartitioning: resolve per-column types from
  fullSchema and thread them into PartitionInfo.

Tests
- PartitionInfoTest grows from 12 to 20: adds rejectsColumnTypesSize
  Mismatch, explicit narrowing assertions for Byte/Short/Int/Date/
  Timestamp/Boolean/String.
- LanceScanBuilderTest.detectPartitioning* switched to real TEST_SCHEMA
  columns (x, y) and asserts the resolved columnTypes.
- Existing callers (LanceScanTest, ZonemapFragmentPrunerTest) updated
  to the new forSingleColumn / constructor signatures.

All 333 tests pass in lance-spark-base_2.12; lance-spark-3.4_2.12
cross-compile green.
Coverage audit exposed three areas with no direct assertions. Close them:

1. SparkVersionUtil.supportsMultiKeySpj had no unit tests — the running
   Spark version was the only input exercised. Split into a pure-function
   overload that takes the version string and cover it with two table-
   driven tests (accept vs reject): 3.5.x (incl. snapshot / rc / vendor
   suffixes) and 4+/5+ for accept; 3.4.x-and-earlier, conservative 3.6+
   reject, and malformed input (null / empty / no-dot / leading-dot /
   non-numeric major) for reject.

2. parsePartitionColumns had no direct assertion of the dedupe path.
   Promoted to package-private and added one test on "y, x , x, b" →
   ["y", "x", "b"] covering dedupe, whitespace trimming, and declaration-
   order preservation in a single shot.

3. PartitionInfo had no test for unsupported-type handling at encode
   time. Added a test that a non-whitelisted type (DoubleType) reaching
   toSparkValue throws IllegalArgumentException — guards against a
   future bypass of detection.

Visibility change: parsePartitionColumns is now package-private (was
private); SparkVersionUtil gets a package-private string-input overload.

Remaining gap (tracked separately): end-to-end SPJ integration across
Spark versions is blocked on the lance-core describeIndices retrain
limitation.

All tests pass in lance-spark-base_2.12.
@github-actions github-actions Bot added the enhancement New feature or request label Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support multiple partition columns

1 participant