feat(partitioning): support multiple partition columns#468
Open
LuciferYang wants to merge 3 commits intolance-format:mainfrom
Open
feat(partitioning): support multiple partition columns#468LuciferYang wants to merge 3 commits intolance-format:mainfrom
LuciferYang wants to merge 3 commits intolance-format:mainfrom
Conversation
) Extend `lance.partition.columns` TBLPROPERTY from a single column name to a comma-separated list of columns. When every declared column is partition- constant per fragment with identical coverage, the connector reports a multi-key `KeyGroupedPartitioning` so Spark's SPJ can join on the full key tuple without a shuffle. Core changes - `PartitionInfo` refactored from `(columnName, Map<Integer, Comparable<?>>)` to `(List<String>, Map<Integer, Comparable<?>[]>)` with constructor- enforced width invariants, defensive tuple clones, `restrictTo`, `withSoftCapped`, and `forSingleColumn` factory. `serialVersionUID` bumped to 2L for the shape change. - `LanceScanBuilder.parsePartitionColumns` tokenizes on `,`, trims, drops empties, dedupes with WARN (preserving first), rejects nested paths. - Type whitelist restricted to types that pass through Spark's InternalRow encoding without a converter (Boolean/Byte/Short/Int/Long/String). Date/Timestamp deferred pending JNI-returned-class pinning; Float/Double /Decimal/complex rejected for NaN/scale/equality reasons. - `detectPartitioning` requires identical per-column fragment coverage; strict-subset intersection is rejected to avoid phantom null-key SPJ groups. Iterates in declaration order for deterministic diagnostics. - Filter-pushdown path restricts `PartitionInfo` to the surviving fragment set; soft-cap re-evaluated against the restricted size. - `LanceScan.outputPartitioning` falls back to `UnknownPartitioning` when soft-capped, disabled via `spark.lance.partition.reporting.enabled`, or gated by `SparkVersionUtil.supportsMultiKeySpj()` (allowlist: 3.5.x, 4+). - `LanceScanBuilder.build()` wrapped in try/finally so the lazily-opened dataset is closed on exception paths. Configs - `spark.lance.partition.reporting.maxPartitions` (default 10_000): soft cap; above this the scan reports `UnknownPartitioning`. - `spark.lance.partition.reporting.enabled` (default true): global escape hatch for SPJ reporting. Tests - New `PartitionInfoTest` (12 tests): invariants, defensive copies, factory equivalence, `restrictTo`, `withSoftCapped`, Java serialization round-trip, immutability. - New `SparkVersionUtil` helper for the 3.4-vs-3.5+ multi-key gate. - `LanceScanTest`: multi-column `KeyGroupedPartitioning`, soft-cap Unknown fallback. - `LanceScanBuilderTest`: unknown column, nested path, whitespace, empty, delimiter-only, unsupported type — all fall back cleanly. Mismatched per-column coverage rejected; identical coverage accepted with correct tuple values. - `ZonemapFragmentPrunerTest` migrated to new API (`forSingleColumn`, `getColumnNames`, `getFragmentPartitionKeys`). 316 tests pass on `lance-spark-base_2.12` (up from 311).
PartitionInfo.toSparkValue previously passed ZoneStats values through unchanged, assuming the returned Comparable<?> already matched Spark's InternalRow encoding. Verified against lance-core 6.0.0-beta.1 that ZoneStats.getMin/getMax returns java.lang.Long for every integral Arrow width (int8/16/32 included) and for Date (epoch-days) / Timestamp (epoch-micros) - so the pass-through would hand a Long to Spark's InternalRow.getByte/getShort/getInt on Byte/Short/Int columns and ClassCastException at scan time. Make toSparkValue type-dispatched: carry a parallel List<DataType> alongside columnNames and narrow Long -> byte/short/int for Byte/Short/Int/Date slots, pass through for Long/Timestamp, wrap Strings in UTF8String, Booleans pass through. This also closes the Date/Timestamp deferral since narrowing handles both cleanly. Changes - ZonemapFragmentPruner.PartitionInfo: add columnTypes field, bump serialVersionUID 2L -> 3L, type-aware toSparkValue, preserve types through restrictTo / withSoftCapped. forSingleColumn takes a DataType. - LanceScanBuilder.isSupportedPartitionType: add DateType, TimestampType to the whitelist; WARN message updated. - LanceScanBuilder.detectPartitioning: resolve per-column types from fullSchema and thread them into PartitionInfo. Tests - PartitionInfoTest grows from 12 to 20: adds rejectsColumnTypesSize Mismatch, explicit narrowing assertions for Byte/Short/Int/Date/ Timestamp/Boolean/String. - LanceScanBuilderTest.detectPartitioning* switched to real TEST_SCHEMA columns (x, y) and asserts the resolved columnTypes. - Existing callers (LanceScanTest, ZonemapFragmentPrunerTest) updated to the new forSingleColumn / constructor signatures. All 333 tests pass in lance-spark-base_2.12; lance-spark-3.4_2.12 cross-compile green.
Coverage audit exposed three areas with no direct assertions. Close them: 1. SparkVersionUtil.supportsMultiKeySpj had no unit tests — the running Spark version was the only input exercised. Split into a pure-function overload that takes the version string and cover it with two table- driven tests (accept vs reject): 3.5.x (incl. snapshot / rc / vendor suffixes) and 4+/5+ for accept; 3.4.x-and-earlier, conservative 3.6+ reject, and malformed input (null / empty / no-dot / leading-dot / non-numeric major) for reject. 2. parsePartitionColumns had no direct assertion of the dedupe path. Promoted to package-private and added one test on "y, x , x, b" → ["y", "x", "b"] covering dedupe, whitespace trimming, and declaration- order preservation in a single shot. 3. PartitionInfo had no test for unsupported-type handling at encode time. Added a test that a non-whitelisted type (DoubleType) reaching toSparkValue throws IllegalArgumentException — guards against a future bypass of detection. Visibility change: parsePartitionColumns is now package-private (was private); SparkVersionUtil gets a package-private string-input overload. Remaining gap (tracked separately): end-to-end SPJ integration across Spark versions is blocked on the lance-core describeIndices retrain limitation. All tests pass in lance-spark-base_2.12.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Extends
lance.partition.columnsfrom a single column to a comma-separated list. When every declared column is partition-constant per fragment with matching coverage, the connector now reports a multi-keyKeyGroupedPartitioningso SPJ can join on the full tuple without a shuffle. This is the read-side half of the work that #445 started on the write side: a table declaringlance.partition.columns='region,year'now writes fragments partitioned by both columns and reads report a 2-key grouping.Closes #429.
Changes
ZonemapFragmentPruner.PartitionInfonow holds aList<String>of column names, a parallelList<DataType>of column types, and aMap<Integer, Comparable<?>[]>of per-fragment tuples instead of a single column plus scalar map. The constructor checks both the names/types width invariant and the tuple-width invariant, and copies tuples into immutable storage.A few helpers were added:
forSingleColumnis a factory so the existing single-column call site stays ergonomic;restrictTonarrows the info to a surviving fragment set after filter pushdown;withSoftCappedmarks the cap branch.serialVersionUIDwas bumped from1L(the single-column shape on upstream main) to2Lbecause both the field set and the map value type changed.TABLE_OPT_PARTITION_COLUMNSis reused fromLanceConstant(added by #445) instead of redeclared here.Parsing lives in
LanceScanBuilder.parsePartitionColumns. It tokenizes on,, trims, drops empties, and dedupes with a WARN — first occurrence is kept rather than hard-failing, so a table accidentally carryingregion,regiontoday doesn't regress. Nested paths are rejected until we pick a quoting syntax.The type whitelist is Boolean, Byte, Short, Int, Long, String, Date, and Timestamp.
PartitionInfo.toSparkValueis type-aware: I verified againstlance-core 6.0.0-beta.1thatZoneStats.getMin/getMaxreturnsjava.lang.Longfor every integral Arrow width (int8/16/32 included) and for Date (epoch-days) / Timestamp (epoch-micros). The converter narrowsLong → byte/short/intfor Byte/Short/Int/Date slots, passes through for Long/Timestamp, and wraps strings inUTF8String; anything else throws because detection never lets it reach this code path. This also fixes a latentClassCastExceptionin the single-column path: the old pass-through would hand aLongto Spark'sInternalRow.getInt/getShort/getBytefor Byte/Short/Int columns. Float, Double, Decimal, and complex types are rejected outright because NaN and scale-drift break tuple equality. Lookup usesfullSchema(not the prunedschema) with afieldIndextry/catch so column pruning and typos fall back cleanly instead of throwing.detectPartitioningruns the existing per-column zone-constancy check, then requires all per-column fragment sets to be identical. If a column's coverage differs, any uncovered fragment would end up with a null-filled tuple at scan time, which Spark's SPJ would mis-group — so the whole detection rejects. The mismatch WARN iterates in declaration order so diagnostics are deterministic. Per-column Spark types are resolved fromfullSchemaat detection time and threaded intoPartitionInfoalongside the tuples.Filter pushdown narrows
PartitionInfoviarestrictTobefore the scan is built.restrictTodrops the soft-cap flag since the cap is size-dependent, and the builder re-applies it if the restricted set is still over the threshold.build()is wrapped in a try/finally so the lazily-opened dataset closes when any helper throws.LanceScan.outputPartitioningemitsExpression[]ofFieldReferencein declaration order. It returnsUnknownPartitioningwhen the info is soft-capped, whenspark.lance.partition.reporting.enabled=false, or whenSparkVersionUtil.supportsMultiKeySpj()gates off N>1 keys. The gate is version-string based: it returns true for3.5.xand for any major>= 4, and false otherwise. So Spark 4.x and 5.x both pass the gate without code changes; we've only actually exercised it on 3.5.x in CI. Everything else (3.4.x, custom forks, unparseable version strings) falls back to the safe single-key-or-Unknown path, and single-key SPJ is unchanged everywhere.Configs
spark.lance.partition.reporting.maxPartitions(default10000) — scans with more partitions than this reportUnknownPartitioning. The threshold is drawn from Spark's own planner scale:spark.sql.shuffle.partitionsdefaults to 200 and AQE coalescing targets low thousands, so above ~10k the driver-side planning cost (O(partitions × columns)for key-array materialization and matching) starts to eat the SPJ win.spark.lance.partition.reporting.enabled(defaulttrue) is a session-level escape hatch. Flip it to disable SPJ reporting entirely without editing table metadata.Tests
PartitionInfoTestcovers constructor invariants (including the new names-vs-types width check), defensive copies,forSingleColumnequivalence to the list form,restrictToandwithSoftCapped(both preserve the types list), multi-column key-row width, a Java-serialization round-trip, a per-type encoding suite that assertsLong → byte/short/intnarrowing for Byte/Short/Int/Date, pass-through for Boolean/Long/Timestamp, andUTF8Stringwrapping for String, and a guard that a non-whitelisted type reachingtoSparkValuethrowsIllegalArgumentException.LanceScanBuilderTestpicks up six parser cases (unknown column, nested path, whitespace, empty, pure-delimiter, unsupported type), a combined dedupe + whitespace-trim + declaration-order assertion, and twodetectPartitioningcoverage checks (identical vs mismatched, with tuple-value and column-type assertions).LanceScanTestadds multi-columnKeyGroupedPartitioningreporting and the soft-capUnknownfallback.ZonemapFragmentPrunerTestwas migrated to the new API.SparkVersionUtilTestis new: two table-driven tests cover the version-string allowlist (3.5.x incl. snapshot/rc/vendor suffixes, 4+/5+ major) and the denylist (3.4.x and earlier, conservative 3.6+, plus null / empty / no-dot / leading-dot / non-numeric major).End-to-end SPJ validation across Spark versions isn't exercised here. It needs a working btree-zonemap index setup, which is currently blocked by a
describeIndicesretrain limitation inlance-core 6.0.0-beta.1— tracked separately.