From 6409bc693bd146bd90c8e29e6318e3f968a21677 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 05:08:44 +0000 Subject: [PATCH 1/3] feat: add partitioned zonemap index --- java/lance-jni/src/blocking_dataset.rs | 1 + java/lance-jni/src/index.rs | 2 + .../main/java/org/lance/index/IndexType.java | 1 + protos/index.proto | 4 +- python/python/lance/dataset.py | 15 +- python/python/lance/indices/__init__.py | 1 + python/src/dataset.rs | 5 + rust/lance-index/src/lib.rs | 11 +- rust/lance-index/src/registry.rs | 22 +- rust/lance-index/src/scalar.rs | 3 + rust/lance-index/src/scalar/zonemap.rs | 431 ++++++++++++++++-- rust/lance-namespace-impls/src/dir.rs | 5 + rust/lance/src/dataset.rs | 8 + rust/lance/src/index.rs | 1 + rust/lance/src/index/create.rs | 89 ++++ rust/lance/src/index/scalar.rs | 27 +- rust/lance/tests/utils/mod.rs | 1 + 17 files changed, 568 insertions(+), 59 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index ab2333266da..13659870a4c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -942,6 +942,7 @@ fn inner_create_index<'local>( | IndexType::Inverted | IndexType::NGram | IndexType::ZoneMap + | IndexType::PartitionedZoneMap | IndexType::BloomFilter | IndexType::RTree => { // For scalar indices, create a scalar IndexParams diff --git a/java/lance-jni/src/index.rs b/java/lance-jni/src/index.rs index 1e533eed9fc..17d9a8db2e6 100644 --- a/java/lance-jni/src/index.rs +++ b/java/lance-jni/src/index.rs @@ -169,6 +169,8 @@ fn determine_index_type<'local>( Some("INVERTED") } else if lower.contains("ngram") { Some("NGRAM") + } else if lower.contains("partitionedzonemap") { + Some("PARTITIONED_ZONEMAP") } else if lower.contains("zonemap") { Some("ZONEMAP") } else if lower.contains("bloomfilter") { diff --git a/java/src/main/java/org/lance/index/IndexType.java b/java/src/main/java/org/lance/index/IndexType.java index 3a03934effd..45ead44af42 100644 --- a/java/src/main/java/org/lance/index/IndexType.java +++ b/java/src/main/java/org/lance/index/IndexType.java @@ -24,6 +24,7 @@ public enum IndexType { MEM_WAL(7), ZONEMAP(8), BLOOM_FILTER(9), + PARTITIONED_ZONEMAP(11), VECTOR(100), IVF_FLAT(101), IVF_SQ(102), diff --git a/protos/index.proto b/protos/index.proto index 1fb51f3291c..1c06de130bf 100644 --- a/protos/index.proto +++ b/protos/index.proto @@ -190,4 +190,6 @@ message JsonIndexDetails { } message BloomFilterIndexDetails {} -message RTreeIndexDetails {} \ No newline at end of file +message PartitionedZoneMapIndexDetails {} + +message RTreeIndexDetails {} diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 79641ab4438..c6cb0676dce 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2789,6 +2789,7 @@ def create_scalar_index( Literal["FTS"], Literal["NGRAM"], Literal["ZONEMAP"], + Literal["PARTITIONED_ZONEMAP"], Literal["BLOOMFILTER"], Literal["RTREE"], IndexConfig, @@ -2834,7 +2835,7 @@ def create_scalar_index( ) - There are 5 types of scalar indices available today. + There are several types of scalar indices available today. * ``BTREE``. The most common type is ``BTREE``. This index is inspired by the btree data structure although only the first few layers of the btree @@ -2858,6 +2859,9 @@ def create_scalar_index( called zones and stores summary statistics for each zone (min, max, null_count, nan_count, fragment_id, local_row_offset). It's very small but only effective if the column is at least approximately in sorted order. + * ``PARTITIONED_ZONEMAP``. This is a distributed variant of zonemap that stores + one shard per fragment build. It is intended for executor-parallel index + creation and uses the same pruning semantics as ``ZONEMAP`` at query time. * ``INVERTED`` (alias: ``FTS``). It is used to index document columns. This index can conduct full-text searches. For example, a column that contains any word @@ -2879,7 +2883,8 @@ def create_scalar_index( or string column. index_type : str The type of the index. One of ``"BTREE"``, ``"BITMAP"``, - ``"LABEL_LIST"``, ``"NGRAM"``, ``"ZONEMAP"``, ``"INVERTED"``, + ``"LABEL_LIST"``, ``"NGRAM"``, ``"ZONEMAP"``, + ``"PARTITIONED_ZONEMAP"``, ``"INVERTED"``, ``"FTS"``, ``"BLOOMFILTER"``, ``"RTREE"``. name : str, optional The index name. If not provided, it will be generated from the @@ -3012,6 +3017,7 @@ def create_scalar_index( "BITMAP", "NGRAM", "ZONEMAP", + "PARTITIONED_ZONEMAP", "LABEL_LIST", "INVERTED", "FTS", @@ -3020,8 +3026,9 @@ def create_scalar_index( ]: raise NotImplementedError( ( - 'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", ' - '"INVERTED", "BLOOMFILTER" or "RTREE" are supported for ' + 'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", ' + '"PARTITIONED_ZONEMAP", "LABEL_LIST", "INVERTED", ' + '"BLOOMFILTER" or "RTREE" are supported for ' f"scalar columns. Received {index_type}", ) ) diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index edf9e5091ff..7695a4dcad5 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -29,6 +29,7 @@ class SupportedDistributedIndices(str, Enum): # Scalar index types BTREE = "BTREE" INVERTED = "INVERTED" + PARTITIONED_ZONEMAP = "PARTITIONED_ZONEMAP" # Precise vector index types supported by distributed merge IVF_FLAT = "IVF_FLAT" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 1885b78bd64..230a6ebae37 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1972,6 +1972,7 @@ impl Dataset { "BITMAP" => IndexType::Bitmap, "NGRAM" => IndexType::NGram, "ZONEMAP" => IndexType::ZoneMap, + "PARTITIONED_ZONEMAP" => IndexType::PartitionedZoneMap, "BLOOMFILTER" => IndexType::BloomFilter, "LABEL_LIST" => IndexType::LabelList, "RTREE" => IndexType::RTree, @@ -2003,6 +2004,10 @@ impl Dataset { index_type: "zonemap".to_string(), params: None, }), + "PARTITIONED_ZONEMAP" => Box::new(ScalarIndexParams { + index_type: "partitioned_zonemap".to_string(), + params: None, + }), "LABEL_LIST" => Box::new(ScalarIndexParams { index_type: "label_list".to_string(), params: None, diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 62ae68414a6..89a5935f318 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -125,6 +125,8 @@ pub enum IndexType { RTree = 10, // RTree + PartitionedZoneMap = 11, // Distributed zonemap stored as partition shards + // 100+ and up for vector index. /// Flat vector index. Vector = 100, // Legacy vector index, alias to IvfPq @@ -150,6 +152,7 @@ impl std::fmt::Display for IndexType { Self::ZoneMap => write!(f, "ZoneMap"), Self::BloomFilter => write!(f, "BloomFilter"), Self::RTree => write!(f, "RTree"), + Self::PartitionedZoneMap => write!(f, "PartitionedZoneMap"), Self::Vector | Self::IvfPq => write!(f, "IVF_PQ"), Self::IvfFlat => write!(f, "IVF_FLAT"), Self::IvfSq => write!(f, "IVF_SQ"), @@ -176,6 +179,7 @@ impl TryFrom for IndexType { v if v == Self::MemWal as i32 => Ok(Self::MemWal), v if v == Self::ZoneMap as i32 => Ok(Self::ZoneMap), v if v == Self::BloomFilter as i32 => Ok(Self::BloomFilter), + v if v == Self::PartitionedZoneMap as i32 => Ok(Self::PartitionedZoneMap), v if v == Self::Vector as i32 => Ok(Self::Vector), v if v == Self::IvfFlat as i32 => Ok(Self::IvfFlat), v if v == Self::IvfSq as i32 => Ok(Self::IvfSq), @@ -202,6 +206,9 @@ impl TryFrom<&str> for IndexType { "Inverted" | "INVERTED" => Ok(Self::Inverted), "NGram" | "NGRAM" => Ok(Self::NGram), "ZoneMap" | "ZONEMAP" => Ok(Self::ZoneMap), + "PartitionedZoneMap" | "PARTITIONED_ZONEMAP" | "PARTITIONED_ZONE_MAP" => { + Ok(Self::PartitionedZoneMap) + } "Vector" | "VECTOR" => Ok(Self::Vector), "IVF_FLAT" => Ok(Self::IvfFlat), "IVF_SQ" => Ok(Self::IvfSq), @@ -232,7 +239,8 @@ impl IndexType { | Self::NGram | Self::ZoneMap | Self::BloomFilter - | Self::RTree, + | Self::RTree + | Self::PartitionedZoneMap, ) } @@ -272,6 +280,7 @@ impl IndexType { Self::ZoneMap => 0, Self::BloomFilter => 0, Self::RTree => 0, + Self::PartitionedZoneMap => 0, // IMPORTANT: if any vector index subtype needs a format bump that is // not backward compatible, its new version must be set to diff --git a/rust/lance-index/src/registry.rs b/rust/lance-index/src/registry.rs index 6b3d89c2dac..849ef47d854 100644 --- a/rust/lance-index/src/registry.rs +++ b/rust/lance-index/src/registry.rs @@ -9,9 +9,15 @@ use crate::scalar::rtree::RTreeIndexPlugin; use crate::{ pb, pbold, scalar::{ - bitmap::BitmapIndexPlugin, bloomfilter::BloomFilterIndexPlugin, btree::BTreeIndexPlugin, - inverted::InvertedIndexPlugin, json::JsonIndexPlugin, label_list::LabelListIndexPlugin, - ngram::NGramIndexPlugin, registry::ScalarIndexPlugin, zonemap::ZoneMapIndexPlugin, + bitmap::BitmapIndexPlugin, + bloomfilter::BloomFilterIndexPlugin, + btree::BTreeIndexPlugin, + inverted::InvertedIndexPlugin, + json::JsonIndexPlugin, + label_list::LabelListIndexPlugin, + ngram::NGramIndexPlugin, + registry::ScalarIndexPlugin, + zonemap::{PartitionedZoneMapIndexPlugin, ZoneMapIndexPlugin}, }, }; @@ -49,6 +55,14 @@ impl IndexPluginRegistry { .insert(plugin_name, Box::new(PluginType::default())); } + pub fn add_alias( + &mut self, + alias: &str, + ) { + self.plugins + .insert(alias.to_string(), Box::new(PluginType::default())); + } + /// Create a registry with the default plugins pub fn with_default_plugins() -> Arc { let mut registry = Self { @@ -59,6 +73,8 @@ impl IndexPluginRegistry { registry.add_plugin::(); registry.add_plugin::(); registry.add_plugin::(); + registry.add_plugin::(); + registry.add_alias::("partitioned_zonemap"); registry.add_plugin::(); registry.add_plugin::(); registry.add_plugin::(); diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index dbce2ec1aa7..b24ec6914d6 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -63,6 +63,7 @@ pub enum BuiltinIndexType { LabelList, NGram, ZoneMap, + PartitionedZoneMap, BloomFilter, RTree, Inverted, @@ -76,6 +77,7 @@ impl BuiltinIndexType { Self::LabelList => "labellist", Self::NGram => "ngram", Self::ZoneMap => "zonemap", + Self::PartitionedZoneMap => "partitioned_zonemap", Self::Inverted => "inverted", Self::BloomFilter => "bloomfilter", Self::RTree => "rtree", @@ -93,6 +95,7 @@ impl TryFrom for BuiltinIndexType { IndexType::LabelList => Ok(Self::LabelList), IndexType::NGram => Ok(Self::NGram), IndexType::ZoneMap => Ok(Self::ZoneMap), + IndexType::PartitionedZoneMap => Ok(Self::PartitionedZoneMap), IndexType::Inverted => Ok(Self::Inverted), IndexType::BloomFilter => Ok(Self::BloomFilter), IndexType::RTree => Ok(Self::RTree), diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 9156322b1dc..b6bb5f3cc30 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -19,8 +19,7 @@ use crate::scalar::registry::{ ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, }; use crate::scalar::{ - BuiltinIndexType, CreatedIndex, SargableQuery, ScalarIndexParams, UpdateCriteria, - compute_next_prefix, + CreatedIndex, SargableQuery, ScalarIndexParams, UpdateCriteria, compute_next_prefix, }; use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_expr::Accumulator; @@ -42,15 +41,46 @@ use async_trait::async_trait; use deepsize::DeepSizeOf; use lance_core::Error; use lance_core::Result; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; use roaring::RoaringBitmap; use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones}; const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches const ZONEMAP_FILENAME: &str = "zonemap.lance"; +const PARTITIONED_ZONEMAP_FILENAME_SUFFIX: &str = "_zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; const ZONEMAP_INDEX_VERSION: u32 = 0; +fn partitioned_zonemap_filename(fragment_id: u32) -> String { + format!("part_{fragment_id}{PARTITIONED_ZONEMAP_FILENAME_SUFFIX}") +} + +fn is_partitioned_zonemap_filename(name: &str) -> bool { + name.starts_with("part_") && name.ends_with(PARTITIONED_ZONEMAP_FILENAME_SUFFIX) +} + +fn zonemap_index_details(index_type: IndexType) -> Result { + match index_type { + IndexType::ZoneMap => { + prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()).map_err(Into::into) + } + IndexType::PartitionedZoneMap => { + prost_types::Any::from_msg(&crate::pb::PartitionedZoneMapIndexDetails::default()) + .map_err(Into::into) + } + _ => Err(Error::invalid_input(format!( + "unsupported zonemap index type: {index_type}" + ))), + } +} + +fn zonemap_index_params(index_type: IndexType, rows_per_zone: u64) -> Result { + let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(rows_per_zone))?; + Ok(ScalarIndexParams::for_builtin(index_type.try_into()?).with_params(¶ms)) +} + /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] struct ZoneMapStatistics { @@ -106,6 +136,7 @@ pub struct ZoneMapIndex { data_type: DataType, // The maximum rows per zone provided by user rows_per_zone: u64, + index_type: IndexType, store: Arc, fri: Option>, index_cache: WeakLanceCache, @@ -117,6 +148,7 @@ impl std::fmt::Debug for ZoneMapIndex { .field("zones", &self.zones) .field("data_type", &self.data_type) .field("rows_per_zone", &self.rows_per_zone) + .field("index_type", &self.index_type) .field("store", &self.store) .field("fri", &self.fri) .field("index_cache", &self.index_cache) @@ -382,6 +414,18 @@ impl ZoneMapIndex { fri: Option>, index_cache: &LanceCache, ) -> Result> + where + Self: Sized, + { + Self::load_single(store, fri, index_cache, IndexType::ZoneMap).await + } + + async fn load_single( + store: Arc, + fri: Option>, + index_cache: &LanceCache, + index_type: IndexType, + ) -> Result> where Self: Sized, { @@ -402,17 +446,80 @@ impl ZoneMapIndex { fri, index_cache, rows_per_zone, + index_type, )?)) } - fn try_from_serialized( - data: RecordBatch, + async fn load_partitioned( store: Arc, fri: Option>, index_cache: &LanceCache, - rows_per_zone: u64, - ) -> Result { - // The RecordBatch should have columns: min, max, null_count + ) -> Result> + where + Self: Sized, + { + let mut files = store.list_files_with_sizes().await?; + files.sort_by(|a, b| a.path.cmp(&b.path)); + let partition_files = files + .into_iter() + .filter(|file| is_partitioned_zonemap_filename(&file.path)) + .map(|file| file.path) + .collect::>(); + + if partition_files.is_empty() { + return Self::load_single(store, fri, index_cache, IndexType::PartitionedZoneMap).await; + } + + let mut rows_per_zone = None; + let mut data_type = None; + let mut zones = Vec::new(); + + for file in partition_files { + let index_file = store.open_index_file(&file).await?; + let zone_maps = index_file + .read_range(0..index_file.num_rows(), None) + .await?; + let this_rows_per_zone = index_file + .schema() + .metadata + .get(ZONEMAP_SIZE_META_KEY) + .and_then(|value| value.parse().ok()) + .unwrap_or(ROWS_PER_ZONE_DEFAULT); + if let Some(existing) = rows_per_zone { + if existing != this_rows_per_zone { + return Err(Error::invalid_input(format!( + "partitioned zonemap shard {file} has rows_per_zone={this_rows_per_zone}, expected {existing}" + ))); + } + } else { + rows_per_zone = Some(this_rows_per_zone); + } + + let (mut file_zones, file_data_type) = Self::deserialize_zones(&zone_maps)?; + if let Some(existing) = &data_type { + if existing != &file_data_type { + return Err(Error::invalid_input(format!( + "partitioned zonemap shard {file} has mismatched data type {file_data_type:?}, expected {existing:?}" + ))); + } + } else { + data_type = Some(file_data_type); + } + zones.append(&mut file_zones); + } + + Ok(Arc::new(Self { + zones, + data_type: data_type.unwrap_or(DataType::Null), + rows_per_zone: rows_per_zone.unwrap_or(ROWS_PER_ZONE_DEFAULT), + index_type: IndexType::PartitionedZoneMap, + store, + fri, + index_cache: WeakLanceCache::from(index_cache), + })) + } + + fn deserialize_zones(data: &RecordBatch) -> Result<(Vec, DataType)> { let min_col = data .column_by_name("min") .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'min' column"))?; @@ -443,7 +550,6 @@ impl ZoneMapIndex { .ok_or_else(|| { Error::invalid_input("ZoneMapIndex: 'zone_length' column is not UInt64") })?; - let fragment_id_col = data .column_by_name("fragment_id") .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'fragment_id' column"))? @@ -452,7 +558,6 @@ impl ZoneMapIndex { .ok_or_else(|| { Error::invalid_input("ZoneMapIndex: 'fragment_id' column is not UInt64") })?; - let zone_start_col = data .column_by_name("zone_start") .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'zone_start' column"))? @@ -462,44 +567,56 @@ impl ZoneMapIndex { Error::invalid_input("ZoneMapIndex: 'zone_start' column is not UInt64") })?; - let data_type = min_col.data_type().clone(); + let mut zones = Vec::with_capacity(data.num_rows()); + for i in 0..data.num_rows() { + zones.push(ZoneMapStatistics { + min: ScalarValue::try_from_array(min_col, i)?, + max: ScalarValue::try_from_array(max_col, i)?, + null_count: null_count_col.value(i), + nan_count: nan_count_col.value(i), + bound: ZoneBound { + fragment_id: fragment_id_col.value(i), + start: zone_start_col.value(i), + length: zone_length.value(i) as usize, + }, + }); + } + Ok((zones, min_col.data_type().clone())) + } + + fn try_from_serialized( + data: RecordBatch, + store: Arc, + fri: Option>, + index_cache: &LanceCache, + rows_per_zone: u64, + index_type: IndexType, + ) -> Result { + let data_type = data + .column_by_name("min") + .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'min' column"))? + .data_type() + .clone(); if data.num_rows() == 0 { return Ok(Self { zones: Vec::new(), data_type, rows_per_zone, + index_type, store, fri, index_cache: WeakLanceCache::from(index_cache), }); } - let num_zones = data.num_rows(); - let mut zones = Vec::with_capacity(num_zones); - - for i in 0..num_zones { - let min = ScalarValue::try_from_array(min_col, i)?; - let max = ScalarValue::try_from_array(max_col, i)?; - let null_count = null_count_col.value(i); - let nan_count = nan_count_col.value(i); - zones.push(ZoneMapStatistics { - min, - max, - null_count, - nan_count, - bound: ZoneBound { - fragment_id: fragment_id_col.value(i), - start: zone_start_col.value(i), - length: zone_length.value(i) as usize, - }, - }); - } + let (zones, _) = Self::deserialize_zones(&data)?; Ok(Self { zones, data_type, rows_per_zone, + index_type, store, fri, index_cache: WeakLanceCache::from(index_cache), @@ -530,13 +647,17 @@ impl Index for ZoneMapIndex { fn statistics(&self) -> Result { Ok(serde_json::json!({ + "index_type": match self.index_type { + IndexType::PartitionedZoneMap => "partitioned_zonemap", + _ => "zonemap", + }, "num_zones": self.zones.len(), "rows_per_zone": self.rows_per_zone, })) } fn index_type(&self) -> IndexType { - IndexType::ZoneMap + self.index_type } async fn calculate_included_frags(&self) -> Result { @@ -599,11 +720,10 @@ impl ScalarIndex for ZoneMapIndex { let mut builder = ZoneMapIndexBuilder::try_new(options, self.data_type.clone())?; builder.options.rows_per_zone = self.rows_per_zone; builder.maps = updated_zones; - builder.write_index(dest_store).await?; + builder.write_index(dest_store, ZONEMAP_FILENAME).await?; Ok(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) - .unwrap(), + index_details: zonemap_index_details(self.index_type)?, index_version: ZONEMAP_INDEX_VERSION, files: Some(dest_store.list_files_with_sizes().await?), }) @@ -616,8 +736,7 @@ impl ScalarIndex for ZoneMapIndex { } fn derive_index_params(&self) -> Result { - let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; - Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) + zonemap_index_params(self.index_type, self.rows_per_zone) } } @@ -732,7 +851,7 @@ impl ZoneMapIndexBuilder { Ok(RecordBatch::try_new(schema, columns)?) } - pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<()> { + pub async fn write_index(self, index_store: &dyn IndexStore, file_name: &str) -> Result<()> { let record_batch = self.zonemap_stats_as_batch()?; let mut file_schema = record_batch.schema().as_ref().clone(); @@ -742,7 +861,7 @@ impl ZoneMapIndexBuilder { ); let mut index_file = index_store - .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) + .new_index_file(file_name, Arc::new(file_schema)) .await?; index_file.write_record_batch(record_batch).await?; index_file.finish().await?; @@ -839,6 +958,7 @@ impl ZoneMapIndexPlugin { batches_source: SendableRecordBatchStream, index_store: &dyn IndexStore, options: Option, + file_name: &str, ) -> Result<()> { // train_zonemap_index: calling scan_aligned_chunks let value_type = batches_source.schema().field(0).data_type().clone(); @@ -847,7 +967,7 @@ impl ZoneMapIndexPlugin { builder.train(batches_source).await?; - builder.write_index(index_store).await?; + builder.write_index(index_store, file_name).await?; Ok(()) } } @@ -934,10 +1054,108 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { "must provide training request created by new_training_request".into(), ) })?; - Self::train_zonemap_index(data, index_store, Some(request.params)).await?; + Self::train_zonemap_index(data, index_store, Some(request.params), ZONEMAP_FILENAME) + .await?; + Ok(CreatedIndex { + index_details: zonemap_index_details(IndexType::ZoneMap)?, + index_version: ZONEMAP_INDEX_VERSION, + files: Some(index_store.list_files_with_sizes().await?), + }) + } + + async fn load_index( + &self, + index_store: Arc, + _index_details: &prost_types::Any, + frag_reuse_index: Option>, + cache: &LanceCache, + ) -> Result> { + Ok( + ZoneMapIndex::load_single(index_store, frag_reuse_index, cache, IndexType::ZoneMap) + .await? as Arc, + ) + } +} + +#[derive(Debug, Default)] +pub struct PartitionedZoneMapIndexPlugin; + +#[async_trait] +impl ScalarIndexPlugin for PartitionedZoneMapIndexPlugin { + fn name(&self) -> &str { + "partitioned_zonemap" + } + + fn new_training_request( + &self, + params: &str, + field: &Field, + ) -> Result> { + if field.data_type().is_nested() { + return Err(Error::invalid_input_source( + "A zone map index can only be created on a non-nested field.".into(), + )); + } + + let params = serde_json::from_str::(params)?; + Ok(Box::new(ZoneMapIndexTrainingRequest::new(params))) + } + + fn provides_exact_answer(&self) -> bool { + false + } + + fn version(&self) -> u32 { + ZONEMAP_INDEX_VERSION + } + + fn new_query_parser( + &self, + index_name: String, + _index_details: &prost_types::Any, + ) -> Option> { + Some(Box::new(SargableQueryParser::new(index_name, true))) + } + + async fn train_index( + &self, + data: SendableRecordBatchStream, + index_store: &dyn IndexStore, + request: Box, + fragment_ids: Option>, + _progress: Arc, + ) -> Result { + let request = (request as Box) + .downcast::() + .map_err(|_| { + Error::invalid_input_source( + "must provide training request created by new_training_request".into(), + ) + })?; + let file_name = match fragment_ids.as_deref() { + Some([fragment_id]) => partitioned_zonemap_filename(*fragment_id), + Some(fragment_ids) => { + return Err(Error::invalid_input_source( + format!( + "partitioned zonemap expects exactly one fragment per shard, got {}", + fragment_ids.len() + ) + .into(), + )); + } + None => ZONEMAP_FILENAME.to_string(), + }; + + ZoneMapIndexPlugin::train_zonemap_index( + data, + index_store, + Some(request.params), + &file_name, + ) + .await?; + Ok(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) - .unwrap(), + index_details: zonemap_index_details(IndexType::PartitionedZoneMap)?, index_version: ZONEMAP_INDEX_VERSION, files: Some(index_store.list_files_with_sizes().await?), }) @@ -950,10 +1168,53 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { - Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) + Ok( + ZoneMapIndex::load_partitioned(index_store, frag_reuse_index, cache).await? + as Arc, + ) + } + + fn details_as_json(&self, _details: &prost_types::Any) -> Result { + Ok(serde_json::json!({ + "layout": "partitioned" + })) } } +pub async fn merge_index_files( + object_store: &ObjectStore, + index_dir: &Path, + progress: Arc, +) -> Result<()> { + use futures::StreamExt; + + progress + .stage_start("validate_partition_files", None, "files") + .await?; + + let mut list_stream = object_store.list(Some(index_dir.clone())); + let mut matched = 0u64; + while let Some(item) = list_stream.next().await { + let meta = item?; + let file_name = meta.location.filename().unwrap_or_default(); + if file_name == ZONEMAP_FILENAME || is_partitioned_zonemap_filename(file_name) { + matched += 1; + progress + .stage_progress("validate_partition_files", matched) + .await?; + } + } + progress.stage_complete("validate_partition_files").await?; + + if matched == 0 { + return Err(Error::invalid_input(format!( + "no zonemap shard files found in index directory: {index_dir}" + ))); + } + + Ok(()) +} + #[cfg(test)] mod tests { use crate::scalar::registry::VALUE_COLUMN_NAME; @@ -990,8 +1251,8 @@ mod tests { }; // Add missing imports for the tests - use crate::Index; // Import Index trait to access calculate_included_frags use crate::metrics::NoOpMetricsCollector; + use crate::{Index, IndexType}; // Import Index trait to access calculate_included_frags use roaring::RoaringBitmap; // Import RoaringBitmap for the test use std::collections::Bound; @@ -1016,6 +1277,65 @@ mod tests { Box::pin(RecordBatchStreamAdapter::new(schema, stream)) } + #[tokio::test] + async fn test_partitioned_zonemap_loads_multiple_shards() { + let tmpdir = TempObjDir::default(); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Int32, false), + Field::new(ROW_ADDR, DataType::UInt64, false), + ])); + + for fragment_id in 0..2u64 { + let values = arrow_array::Int32Array::from_iter_values( + (0..4).map(|offset| (fragment_id as i32) * 10 + offset), + ); + let row_ids = + UInt64Array::from_iter_values((0..4).map(|offset| (fragment_id << 32) | offset)); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(values), Arc::new(row_ids)]) + .unwrap(); + let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + stream::once(std::future::ready(Ok(batch))), + )); + ZoneMapIndexPlugin::train_zonemap_index( + stream, + test_store.as_ref(), + Some(ZoneMapIndexBuilderParams::new(4)), + &partitioned_zonemap_filename(fragment_id as u32), + ) + .await + .unwrap(); + } + + let index = ZoneMapIndex::load_partitioned(test_store, None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.index_type(), IndexType::PartitionedZoneMap); + assert_eq!(index.rows_per_zone, 4); + assert_eq!( + index.calculate_included_frags().await.unwrap(), + RoaringBitmap::from_iter(0..2) + ); + + let result = index + .search( + &SargableQuery::Equals(ScalarValue::Int32(Some(11))), + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let mut expected = RowAddrTreeMap::new(); + expected.insert_range(1u64 << 32..((1u64 << 32) + 4)); + assert_eq!(result, SearchResult::at_most(expected)); + } + #[tokio::test] async fn test_empty_zonemap_index() { let tmpdir = TempObjDir::default(); @@ -1039,9 +1359,14 @@ mod tests { stream::once(std::future::ready(Ok(data))), )); - ZoneMapIndexPlugin::train_zonemap_index(data_stream, test_store.as_ref(), None) - .await - .unwrap(); + ZoneMapIndexPlugin::train_zonemap_index( + data_stream, + test_store.as_ref(), + None, + ZONEMAP_FILENAME, + ) + .await + .unwrap(); log::debug!("Successfully wrote the index file"); @@ -1083,6 +1408,7 @@ mod tests { stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::new(5000)), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -1208,7 +1534,7 @@ mod tests { let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); // Train and write the zonemap index - ZoneMapIndexPlugin::train_zonemap_index(stream, store.as_ref(), None) + ZoneMapIndexPlugin::train_zonemap_index(stream, store.as_ref(), None, ZONEMAP_FILENAME) .await .unwrap(); @@ -1311,6 +1637,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::new(100)), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -1495,6 +1822,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::new(100)), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -1696,6 +2024,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::default()), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -1858,6 +2187,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::new(5000)), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -2066,6 +2396,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::default()), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -2141,6 +2472,7 @@ mod tests { data_stream, test_store.as_ref(), Some(ZoneMapIndexBuilderParams::new(ROWS_PER_ZONE_DEFAULT * 3)), + ZONEMAP_FILENAME, ) .await .unwrap(); @@ -2342,6 +2674,7 @@ mod tests { zones, data_type: DataType::Utf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + index_type: IndexType::ZoneMap, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), @@ -2414,6 +2747,7 @@ mod tests { zones, data_type: DataType::Utf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + index_type: IndexType::ZoneMap, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), @@ -2481,6 +2815,7 @@ mod tests { zones, data_type: DataType::LargeUtf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + index_type: IndexType::ZoneMap, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index afd9267dffc..69b75f6de51 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -1200,6 +1200,7 @@ impl DirectoryNamespace { "INVERTED" | "FTS" => Ok(IndexType::Inverted), "NGRAM" => Ok(IndexType::NGram), "ZONEMAP" | "ZONE_MAP" => Ok(IndexType::ZoneMap), + "PARTITIONED_ZONEMAP" | "PARTITIONED_ZONE_MAP" => Ok(IndexType::PartitionedZoneMap), "BLOOMFILTER" | "BLOOM_FILTER" => Ok(IndexType::BloomFilter), "RTREE" | "R_TREE" => Ok(IndexType::RTree), "VECTOR" | "IVF_PQ" => Ok(IndexType::IvfPq), @@ -1251,6 +1252,10 @@ impl DirectoryNamespace { index_type, params: ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap), }, + IndexType::PartitionedZoneMap => DirectoryIndexParams::Scalar { + index_type, + params: ScalarIndexParams::for_builtin(BuiltinIndexType::PartitionedZoneMap), + }, IndexType::BloomFilter => DirectoryIndexParams::Scalar { index_type, params: ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter), diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8a7a9cf3636..c2b287fe371 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2959,6 +2959,14 @@ impl Dataset { ) .await } + IndexType::PartitionedZoneMap => { + lance_index::scalar::zonemap::merge_index_files( + self.object_store(), + &index_dir, + progress, + ) + .await + } IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { Err(Error::invalid_input( "Vector distributed indexing no longer supports merge_index_metadata; \ diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..79b5f1c7ede 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -317,6 +317,7 @@ fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String { "LabelList" => IndexType::LabelList.to_string(), "NGram" => IndexType::NGram.to_string(), "ZoneMap" => IndexType::ZoneMap.to_string(), + "PartitionedZoneMap" => IndexType::PartitionedZoneMap.to_string(), "BloomFilter" => IndexType::BloomFilter.to_string(), "Inverted" => IndexType::Inverted.to_string(), "Json" => IndexType::Scalar.to_string(), diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 8aed939787a..9340fa6b845 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -216,6 +216,7 @@ impl<'a> CreateIndexBuilder<'a> { | IndexType::Inverted | IndexType::NGram | IndexType::ZoneMap + | IndexType::PartitionedZoneMap | IndexType::BloomFilter | IndexType::LabelList | IndexType::RTree, @@ -1138,6 +1139,94 @@ mod tests { ); } + #[tokio::test] + async fn test_merge_index_metadata_partitioned_zonemap_reports_progress() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = ScalarIndexParams::for_builtin( + lance_index::scalar::BuiltinIndexType::PartitionedZoneMap, + ); + let fragment_ids: Vec = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + let shared_uuid = Uuid::new_v4().to_string(); + + for &fragment_id in &fragment_ids { + let index_metadata = CreateIndexBuilder::new( + &mut dataset, + &["id"], + IndexType::PartitionedZoneMap, + ¶ms, + ) + .name("distributed_partitioned_zonemap".to_string()) + .fragments(vec![fragment_id]) + .index_uuid(shared_uuid.clone()) + .execute_uncommitted() + .await + .unwrap(); + + let fragment_bitmap = index_metadata.fragment_bitmap.as_ref().unwrap(); + let indexed_fragments: Vec = fragment_bitmap.iter().collect(); + assert_eq!(indexed_fragments, vec![fragment_id]); + } + + let merge_progress = Arc::new(RecordingProgress::default()); + dataset + .merge_index_metadata( + &shared_uuid, + IndexType::PartitionedZoneMap, + None, + merge_progress.clone(), + ) + .await + .unwrap(); + + let merge_tags = merge_progress + .recorded_events() + .iter() + .map(|(kind, stage, _)| format!("{kind}:{stage}")) + .collect::>(); + assert!( + merge_tags + .iter() + .any(|e| e == "start:validate_partition_files"), + "expected validate_partition_files start during public merge" + ); + assert!( + merge_tags + .iter() + .any(|e| e == "progress:validate_partition_files"), + "expected validate_partition_files progress during public merge" + ); + assert!( + merge_tags + .iter() + .any(|e| e == "complete:validate_partition_files"), + "expected validate_partition_files complete during public merge" + ); + } + #[tokio::test] async fn test_vector_execute_uncommitted_segments_commit_without_staging() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 44739454bec..f562a213a0a 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -16,7 +16,7 @@ use crate::{ use arrow_schema::DataType; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use lance_core::datatypes::Field; use lance_core::{Error, ROW_ADDR, ROW_ID, Result}; @@ -24,6 +24,7 @@ use lance_datafusion::exec::LanceExecutionOptions; use lance_index::metrics::{MetricsCollector, NoOpMetricsCollector}; use lance_index::pbold::{ BTreeIndexDetails, BitmapIndexDetails, InvertedIndexDetails, LabelListIndexDetails, + ZoneMapIndexDetails, }; use lance_index::progress::IndexBuildProgress; use lance_index::registry::IndexPluginRegistry; @@ -420,12 +421,15 @@ pub(crate) async fn infer_scalar_index_details( )))?; let bitmap_page_lookup = index_dir.child(BITMAP_LOOKUP_NAME); + let zonemap_lookup = index_dir.child("zonemap.lance"); let inverted_list_lookup = index_dir.child(METADATA_FILE); let legacy_inverted_list_lookup = index_dir.child(INVERT_LIST_FILE); let index_details = if let DataType::List(_) = col.data_type() { prost_types::Any::from_msg(&LabelListIndexDetails::default()).unwrap() } else if dataset.object_store.exists(&bitmap_page_lookup).await? { prost_types::Any::from_msg(&BitmapIndexDetails::default()).unwrap() + } else if dataset.object_store.exists(&zonemap_lookup).await? { + prost_types::Any::from_msg(&ZoneMapIndexDetails::default()).unwrap() } else if dataset.object_store.exists(&inverted_list_lookup).await? { // Try to infer inverted index details from metadata file to capture with_position and other params // Fall back to defaults if anything goes wrong @@ -446,7 +450,22 @@ pub(crate) async fn infer_scalar_index_details( { prost_types::Any::from_msg(&InvertedIndexDetails::default()).unwrap() } else { - prost_types::Any::from_msg(&BTreeIndexDetails::default()).unwrap() + let mut list_stream = dataset.object_store.list(Some(index_dir.clone())); + let mut found_partitioned_zonemap = false; + while let Some(item) = list_stream.next().await { + let meta = item?; + let file_name = meta.location.filename().unwrap_or_default(); + if file_name.starts_with("part_") && file_name.ends_with("_zonemap.lance") { + found_partitioned_zonemap = true; + break; + } + } + if found_partitioned_zonemap { + prost_types::Any::from_msg(&lance_index::pb::PartitionedZoneMapIndexDetails::default()) + .unwrap() + } else { + prost_types::Any::from_msg(&BTreeIndexDetails::default()).unwrap() + } }; let index_details = Arc::new(index_details); @@ -623,6 +642,10 @@ mod tests { IndexType::NGram => { prost_types::Any::from_msg(&NGramIndexDetails::default()).unwrap() } + IndexType::PartitionedZoneMap => prost_types::Any::from_msg( + &lance_index::pb::PartitionedZoneMapIndexDetails::default(), + ) + .unwrap(), IndexType::Vector => { prost_types::Any::from_msg(&VectorIndexDetails::default()).unwrap() } diff --git a/rust/lance/tests/utils/mod.rs b/rust/lance/tests/utils/mod.rs index b8a034a50b0..52bae543a2c 100644 --- a/rust/lance/tests/utils/mod.rs +++ b/rust/lance/tests/utils/mod.rs @@ -195,6 +195,7 @@ async fn build_dataset( | IndexType::LabelList | IndexType::NGram | IndexType::ZoneMap + | IndexType::PartitionedZoneMap | IndexType::BloomFilter => Box::new(ScalarIndexParams::for_builtin( (*index_type).try_into().unwrap(), )), From a670357f92181dad5ae0194cfc69232f74b49721 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 05:29:31 +0000 Subject: [PATCH 2/3] fix: address partitioned zonemap CI failures --- rust/lance-index/src/scalar/zonemap.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index b6bb5f3cc30..80219cdb2b0 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -1070,10 +1070,7 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { - Ok( - ZoneMapIndex::load_single(index_store, frag_reuse_index, cache, IndexType::ZoneMap) - .await? as Arc, - ) + Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) } } @@ -1222,7 +1219,9 @@ mod tests { use std::sync::Arc; use crate::scalar::zoned::ZoneBound; - use crate::scalar::zonemap::{ZoneMapIndexPlugin, ZoneMapStatistics}; + use crate::scalar::zonemap::{ + ZoneMapIndexPlugin, ZoneMapStatistics, partitioned_zonemap_filename, + }; use arrow::datatypes::Float32Type; use arrow_array::{Array, RecordBatch, UInt64Array, record_batch}; use arrow_schema::{DataType, Field, Schema}; From 92ecb39002fa10328ac28c25e26e66a442c666b7 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 06:13:54 +0000 Subject: [PATCH 3/3] fix: tighten partitioned zonemap validation --- rust/lance-index/src/scalar/zonemap.rs | 180 +++++++++++++++++++++---- rust/lance/src/index/scalar.rs | 3 + 2 files changed, 160 insertions(+), 23 deletions(-) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 80219cdb2b0..89b90802942 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -467,7 +467,9 @@ impl ZoneMapIndex { .collect::>(); if partition_files.is_empty() { - return Self::load_single(store, fri, index_cache, IndexType::PartitionedZoneMap).await; + return Err(Error::invalid_input( + "partitioned zonemap index is missing part_*_zonemap.lance shards", + )); } let mut rows_per_zone = None; @@ -1140,7 +1142,11 @@ impl ScalarIndexPlugin for PartitionedZoneMapIndexPlugin { .into(), )); } - None => ZONEMAP_FILENAME.to_string(), + None => { + return Err(Error::invalid_input_source( + "partitioned zonemap requires exactly one fragment id per shard".into(), + )); + } }; ZoneMapIndexPlugin::train_zonemap_index( @@ -1191,21 +1197,30 @@ pub async fn merge_index_files( let mut list_stream = object_store.list(Some(index_dir.clone())); let mut matched = 0u64; + let mut found_single_file_layout = false; while let Some(item) = list_stream.next().await { let meta = item?; let file_name = meta.location.filename().unwrap_or_default(); - if file_name == ZONEMAP_FILENAME || is_partitioned_zonemap_filename(file_name) { + if is_partitioned_zonemap_filename(file_name) { matched += 1; progress .stage_progress("validate_partition_files", matched) .await?; + } else if file_name == ZONEMAP_FILENAME { + found_single_file_layout = true; } } progress.stage_complete("validate_partition_files").await?; + if found_single_file_layout { + return Err(Error::invalid_input(format!( + "unexpected zonemap.lance found in partitioned zonemap index directory: {index_dir}" + ))); + } + if matched == 0 { return Err(Error::invalid_input(format!( - "no zonemap shard files found in index directory: {index_dir}" + "no partitioned zonemap shard files found in index directory: {index_dir}" ))); } @@ -1214,13 +1229,20 @@ pub async fn merge_index_files( #[cfg(test)] mod tests { + use crate::Result; use crate::scalar::registry::VALUE_COLUMN_NAME; + use crate::scalar::registry::ScalarIndexPlugin; use crate::scalar::{IndexStore, zonemap::ROWS_PER_ZONE_DEFAULT}; + use crate::{ + Index, IndexType, + progress::{IndexBuildProgress, NoopIndexBuildProgress}, + }; use std::sync::Arc; use crate::scalar::zoned::ZoneBound; use crate::scalar::zonemap::{ - ZoneMapIndexPlugin, ZoneMapStatistics, partitioned_zonemap_filename, + PartitionedZoneMapIndexPlugin, ZoneMapIndexPlugin, ZoneMapStatistics, merge_index_files, + partitioned_zonemap_filename, }; use arrow::datatypes::Float32Type; use arrow_array::{Array, RecordBatch, UInt64Array, record_batch}; @@ -1249,12 +1271,12 @@ mod tests { }, }; - // Add missing imports for the tests use crate::metrics::NoOpMetricsCollector; - use crate::{Index, IndexType}; // Import Index trait to access calculate_included_frags - use roaring::RoaringBitmap; // Import RoaringBitmap for the test + use roaring::RoaringBitmap; use std::collections::Bound; + lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>); + // Adds a _rowaddr column emulating each batch as a new fragment fn add_row_addr(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { let schema = stream.schema(); @@ -1276,6 +1298,26 @@ mod tests { Box::pin(RecordBatchStreamAdapter::new(schema, stream)) } + fn zonemap_batch_stream( + values: impl IntoIterator, + fragment_id: u64, + ) -> SendableRecordBatchStream { + let values = arrow_array::Int32Array::from_iter_values(values); + let row_ids = UInt64Array::from_iter_values( + (0..values.len() as u64).map(|offset| (fragment_id << 32) | offset), + ); + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Int32, false), + Field::new(ROW_ADDR, DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(values), Arc::new(row_ids)]) + .unwrap(); + Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(batch))), + )) + } + #[tokio::test] async fn test_partitioned_zonemap_loads_multiple_shards() { let tmpdir = TempObjDir::default(); @@ -1285,24 +1327,11 @@ mod tests { Arc::new(LanceCache::no_cache()), )); - let schema = Arc::new(Schema::new(vec![ - Field::new(VALUE_COLUMN_NAME, DataType::Int32, false), - Field::new(ROW_ADDR, DataType::UInt64, false), - ])); - for fragment_id in 0..2u64 { - let values = arrow_array::Int32Array::from_iter_values( + let stream = zonemap_batch_stream( (0..4).map(|offset| (fragment_id as i32) * 10 + offset), + fragment_id, ); - let row_ids = - UInt64Array::from_iter_values((0..4).map(|offset| (fragment_id << 32) | offset)); - let batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(values), Arc::new(row_ids)]) - .unwrap(); - let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( - schema.clone(), - stream::once(std::future::ready(Ok(batch))), - )); ZoneMapIndexPlugin::train_zonemap_index( stream, test_store.as_ref(), @@ -1335,6 +1364,111 @@ mod tests { assert_eq!(result, SearchResult::at_most(expected)); } + #[tokio::test] + async fn test_partitioned_zonemap_rejects_missing_fragment_id() { + let plugin = PartitionedZoneMapIndexPlugin; + let tmpdir = TempObjDir::default(); + let store = LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.as_ref().clone(), + Arc::new(LanceCache::no_cache()), + ); + let request = plugin + .new_training_request("{}", &Field::new(VALUE_COLUMN_NAME, DataType::Int32, false)) + .unwrap(); + let err = match plugin + .train_index( + zonemap_batch_stream(0..4, 0), + &store, + request, + None, + Arc::new(NoopIndexBuildProgress), + ) + .await + { + Ok(_) => panic!("partitioned zonemap should reject training without a fragment id"), + Err(err) => err, + }; + assert!( + err.to_string() + .contains("partitioned zonemap requires exactly one fragment id per shard"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_zonemap_load_rejects_single_file_layout() { + let tmpdir = TempObjDir::default(); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + ZoneMapIndexPlugin::train_zonemap_index( + zonemap_batch_stream(0..4, 0), + test_store.as_ref(), + Some(ZoneMapIndexBuilderParams::new(4)), + ZONEMAP_FILENAME, + ) + .await + .unwrap(); + + let err = ZoneMapIndex::load_partitioned(test_store, None, &LanceCache::no_cache()) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("partitioned zonemap index is missing part_*_zonemap.lance shards"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_zonemap_merge_rejects_mixed_layout() { + let tmpdir = TempObjDir::default(); + let object_store = Arc::new(ObjectStore::local()); + let test_store = Arc::new(LanceIndexStore::new( + object_store.clone(), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + ZoneMapIndexPlugin::train_zonemap_index( + zonemap_batch_stream(0..4, 0), + test_store.as_ref(), + Some(ZoneMapIndexBuilderParams::new(4)), + &partitioned_zonemap_filename(0), + ) + .await + .unwrap(); + ZoneMapIndexPlugin::train_zonemap_index( + zonemap_batch_stream(10..14, 1), + test_store.as_ref(), + Some(ZoneMapIndexBuilderParams::new(4)), + ZONEMAP_FILENAME, + ) + .await + .unwrap(); + + let progress = Arc::new(RecordingProgress::default()); + let err = merge_index_files(object_store.as_ref(), tmpdir.as_ref(), progress.clone()) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("unexpected zonemap.lance found in partitioned zonemap index directory"), + "unexpected error: {err}" + ); + assert!( + progress + .recorded_events() + .iter() + .any(|(kind, stage, _)| kind == "complete" && stage == "validate_partition_files"), + "expected validate_partition_files completion before reporting the mixed-layout error" + ); + } + #[tokio::test] async fn test_empty_zonemap_index() { let tmpdir = TempObjDir::default(); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index f562a213a0a..42ba389a71b 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -450,6 +450,9 @@ pub(crate) async fn infer_scalar_index_details( { prost_types::Any::from_msg(&InvertedIndexDetails::default()).unwrap() } else { + // Missing details can come from older/manual commits, so fall back to a directory scan. + // This is intentionally only in the inference path to avoid adding a list operation to + // normal indexed reads. let mut list_stream = dataset.object_store.list(Some(index_dir.clone())); let mut found_partitioned_zonemap = false; while let Some(item) = list_stream.next().await {