From 56e55acef8e00f59748eba169a6e899ef567ef23 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 07:38:13 +0000 Subject: [PATCH 1/6] feat: support zonemap index segments --- rust/lance-index/src/scalar/zonemap.rs | 185 ++++++++++++++++++++++++- rust/lance/src/dataset/scanner.rs | 23 +-- rust/lance/src/index/scalar.rs | 157 ++++++++++++++++++++- rust/lance/src/io/exec/scalar_index.rs | 26 ++-- 4 files changed, 359 insertions(+), 32 deletions(-) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 9156322b1dc..17d06a81434 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -24,7 +24,9 @@ use crate::scalar::{ }; use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_expr::Accumulator; +use futures::future::try_join_all; use lance_core::cache::{LanceCache, WeakLanceCache}; +use lance_core::utils::mask::NullableRowAddrSet; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; @@ -111,6 +113,12 @@ pub struct ZoneMapIndex { index_cache: WeakLanceCache, } +#[derive(Debug)] +pub struct LogicalZoneMapIndex { + segments: Vec>, + rows_per_zone: u64, +} + impl std::fmt::Debug for ZoneMapIndex { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ZoneMapIndex") @@ -130,6 +138,55 @@ impl DeepSizeOf for ZoneMapIndex { } } +impl LogicalZoneMapIndex { + pub async fn load( + stores: Vec>, + fri: Option>, + index_cache: &LanceCache, + ) -> Result> { + if stores.is_empty() { + return Err(Error::invalid_input( + "LogicalZoneMapIndex requires at least one segment".to_string(), + )); + } + + let segments = try_join_all( + stores + .into_iter() + .map(|store| ZoneMapIndex::load(store, fri.clone(), index_cache)), + ) + .await?; + + let data_type = segments[0].data_type.clone(); + let rows_per_zone = segments[0].rows_per_zone; + for segment in segments.iter().skip(1) { + if segment.data_type != data_type { + return Err(Error::invalid_input(format!( + "LogicalZoneMapIndex requires identical data types across segments, found {:?} and {:?}", + data_type, segment.data_type + ))); + } + if segment.rows_per_zone != rows_per_zone { + return Err(Error::invalid_input(format!( + "LogicalZoneMapIndex requires identical rows_per_zone across segments, found {} and {}", + rows_per_zone, segment.rows_per_zone + ))); + } + } + + Ok(Arc::new(Self { + segments, + rows_per_zone, + })) + } +} + +impl DeepSizeOf for LogicalZoneMapIndex { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + self.segments.deep_size_of_children(context) + } +} + impl ZoneMapIndex { /// Evaluates whether a zone could potentially contain values matching the query /// For NaN, total order is used here @@ -621,6 +678,126 @@ impl ScalarIndex for ZoneMapIndex { } } +#[async_trait] +impl Index for LogicalZoneMapIndex { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_index(self: Arc) -> Arc { + self + } + + fn as_vector_index(self: Arc) -> Result> { + Err(Error::invalid_input_source( + "LogicalZoneMapIndex is not a vector index".into(), + )) + } + + async fn prewarm(&self) -> Result<()> { + for segment in &self.segments { + segment.prewarm().await?; + } + Ok(()) + } + + fn statistics(&self) -> Result { + Ok(serde_json::json!({ + "num_segments": self.segments.len(), + "num_zones": self.segments.iter().map(|segment| segment.zones.len()).sum::(), + "rows_per_zone": self.rows_per_zone, + })) + } + + fn index_type(&self) -> IndexType { + IndexType::ZoneMap + } + + async fn calculate_included_frags(&self) -> Result { + let mut frag_ids = RoaringBitmap::new(); + for segment in &self.segments { + frag_ids |= segment.calculate_included_frags().await?; + } + Ok(frag_ids) + } +} + +#[async_trait] +impl ScalarIndex for LogicalZoneMapIndex { + async fn search( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + ) -> Result { + let results = try_join_all( + self.segments + .iter() + .map(|segment| segment.search(query, metrics)), + ) + .await?; + + let mut selections = Vec::with_capacity(results.len()); + let mut all_exact = true; + for result in results { + match result { + SearchResult::Exact(rows) => selections.push(rows), + SearchResult::AtMost(rows) => { + all_exact = false; + selections.push(rows); + } + SearchResult::AtLeast(_) => { + return Err(Error::not_supported( + "LogicalZoneMapIndex does not support AtLeast search results".to_string(), + )); + } + } + } + + let selection = NullableRowAddrSet::union_all(&selections); + Ok(if all_exact { + SearchResult::Exact(selection) + } else { + SearchResult::AtMost(selection) + }) + } + + fn can_remap(&self) -> bool { + false + } + + async fn remap( + &self, + _mapping: &HashMap>, + _dest_store: &dyn IndexStore, + ) -> Result { + Err(Error::invalid_input_source( + "LogicalZoneMapIndex does not support remap".into(), + )) + } + + async fn update( + &self, + _new_data: SendableRecordBatchStream, + _dest_store: &dyn IndexStore, + _old_data_filter: Option, + ) -> Result { + Err(Error::invalid_input_source( + "LogicalZoneMapIndex does not support update".into(), + )) + } + + fn update_criteria(&self) -> UpdateCriteria { + UpdateCriteria::only_new_data( + TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(), + ) + } + + fn derive_index_params(&self) -> Result { + let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; + Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) + } +} + fn default_rows_per_zone() -> u64 { *DEFAULT_ROWS_PER_ZONE } @@ -918,15 +1095,9 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, request: Box, - fragment_ids: Option>, + _fragment_ids: Option>, _progress: Arc, ) -> Result { - if fragment_ids.is_some() { - return Err(Error::invalid_input_source( - "ZoneMap index does not support fragment training".into(), - )); - } - let request = (request as Box) .downcast::() .map_err(|_| { diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 5f40c79b4f9..7800e7fa94c 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -81,6 +81,7 @@ use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; use crate::dataset::utils::SchemaAdapter; use crate::index::DatasetIndexInternalExt; +use crate::index::scalar::scalar_index_fragment_bitmap; use crate::index::vector::utils::{ default_distance_type_for, get_vector_dim, get_vector_type, validate_distance_type_for, }; @@ -3810,16 +3811,18 @@ impl Scanner { ScalarIndexExpr::Or(lhs, rhs) => Ok(self.fragments_covered_by_index_query(lhs).await? & self.fragments_covered_by_index_query(rhs).await?), ScalarIndexExpr::Not(expr) => self.fragments_covered_by_index_query(expr).await, - ScalarIndexExpr::Query(search) => { - let idx = self - .dataset - .load_scalar_index(IndexCriteria::default().with_name(&search.index_name)) - .await? - .expect("Index not found even though it must have been found earlier"); - Ok(idx - .fragment_bitmap - .expect("scalar indices should always have a fragment bitmap")) - } + ScalarIndexExpr::Query(search) => scalar_index_fragment_bitmap( + self.dataset.as_ref(), + &search.column, + &search.index_name, + ) + .await? + .ok_or_else(|| { + crate::Error::internal(format!( + "Index not found even though it must have been found earlier: {}", + search.index_name + )) + }), } } diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 44739454bec..f75672b886d 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -18,6 +18,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::TryStreamExt; use itertools::Itertools; +use lance_core::cache::LanceCache; use lance_core::datatypes::Field; use lance_core::{Error, ROW_ADDR, ROW_ID, Result}; use lance_datafusion::exec::LanceExecutionOptions; @@ -38,11 +39,12 @@ use lance_index::scalar::registry::{ use lance_index::scalar::{CreatedIndex, InvertedIndexParams}; use lance_index::scalar::{ ScalarIndex, ScalarIndexParams, bitmap::BITMAP_LOOKUP_NAME, inverted::INVERT_LIST_FILE, - lance_format::LanceIndexStore, + lance_format::LanceIndexStore, zonemap::LogicalZoneMapIndex, }; use lance_index::{IndexCriteria, IndexType}; use lance_table::format::{Fragment, IndexMetadata}; use log::info; +use roaring::RoaringBitmap; use tracing::instrument; // Log an update every TRAINING_UPDATE_FREQ million rows processed @@ -399,6 +401,104 @@ pub async fn open_scalar_index( .await } +fn index_intersects_dataset(index: &IndexMetadata, dataset: &Dataset) -> bool { + index + .fragment_bitmap + .as_ref() + .is_some_and(|index_bitmap| index_bitmap.intersection_len(&dataset.fragment_bitmap) > 0) +} + +fn union_fragment_bitmaps(indices: &[IndexMetadata], index_name: &str) -> Result { + let mut combined = RoaringBitmap::new(); + for index in indices { + let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or_else(|| { + Error::invalid_input(format!( + "Scalar index '{}' segment {} is missing fragment coverage", + index_name, index.uuid + )) + })?; + combined |= fragment_bitmap.clone(); + } + Ok(combined) +} + +async fn load_named_zonemap_segments( + dataset: &Dataset, + column: &str, + index_name: &str, +) -> Result>> { + let usable_indices = dataset + .load_indices_by_name(index_name) + .await? + .into_iter() + .filter(|index| index_intersects_dataset(index, dataset)) + .collect::>(); + + if usable_indices.len() <= 1 { + return Ok(None); + } + + for index in &usable_indices { + let index_details = fetch_index_details(dataset, column, index).await?; + if !index_details.type_url.ends_with("ZoneMapIndexDetails") { + return Ok(None); + } + } + + Ok(Some(usable_indices)) +} + +pub(crate) async fn scalar_index_fragment_bitmap( + dataset: &Dataset, + column: &str, + index_name: &str, +) -> Result> { + if let Some(indices) = load_named_zonemap_segments(dataset, column, index_name).await? { + return union_fragment_bitmaps(&indices, index_name).map(Some); + } + + Ok(dataset + .load_scalar_index(IndexCriteria::default().with_name(index_name)) + .await? + .and_then(|index| index.fragment_bitmap)) +} + +pub(crate) async fn open_named_scalar_index( + dataset: &Dataset, + column: &str, + index_name: &str, + metrics: &dyn MetricsCollector, +) -> Result> { + if let Some(indices) = load_named_zonemap_segments(dataset, column, index_name).await? { + let frag_reuse_index = dataset.open_frag_reuse_index(metrics).await?; + let stores = indices + .iter() + .map(|index| { + LanceIndexStore::from_dataset_for_existing(dataset, index) + .map(|store| Arc::new(store) as Arc) + }) + .collect::>>()?; + let logical_cache = LanceCache::no_cache(); + return Ok( + LogicalZoneMapIndex::load(stores, frag_reuse_index, &logical_cache).await? + as Arc, + ); + } + + let index = dataset + .load_scalar_index(IndexCriteria::default().with_name(index_name)) + .await? + .ok_or_else(|| { + Error::internal(format!( + "Scanner created plan for index query on index {} for column {} but no usable index exists with that name", + index_name, column + )) + })?; + dataset + .open_scalar_index(column, &index.uuid.to_string(), metrics) + .await +} + pub(crate) async fn infer_scalar_index_details( dataset: &Dataset, column: &str, @@ -594,6 +694,8 @@ mod tests { use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use super::*; + use crate::dataset::Dataset; + use crate::index::create::CreateIndexBuilder; use arrow::{ array::AsArray, datatypes::{Int32Type, UInt64Type}, @@ -603,8 +705,12 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_core::{datatypes::Field, utils::address::RowAddress}; use lance_datagen::array; + use lance_index::metrics::NoOpMetricsCollector; use lance_index::{IndexType, optimize::OptimizeOptions}; - use lance_index::{pbold::NGramIndexDetails, scalar::BuiltinIndexType}; + use lance_index::{ + pbold::NGramIndexDetails, + scalar::{BuiltinIndexType, ScalarIndexParams}, + }; use lance_table::format::pb::VectorIndexDetails; fn make_index_metadata( @@ -817,6 +923,53 @@ mod tests { assert_eq!(max_frag_id_seen, 3); } + #[tokio::test] + async fn test_open_named_scalar_index_uses_all_zonemap_segments() { + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(16)) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + let mut segments = Vec::new(); + + for fragment in &fragments { + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + segments.push(segment); + } + + dataset + .commit_existing_index_segments("value_zonemap", "value", segments) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("value_zonemap").await.unwrap(); + assert_eq!(committed.len(), fragments.len()); + + let logical = + open_named_scalar_index(&dataset, "value", "value_zonemap", &NoOpMetricsCollector) + .await + .unwrap(); + assert_eq!( + logical.calculate_included_frags().await.unwrap(), + dataset.fragment_bitmap.as_ref().clone() + ); + + let combined_bitmap = scalar_index_fragment_bitmap(&dataset, "value", "value_zonemap") + .await + .unwrap() + .unwrap(); + assert_eq!(combined_bitmap, dataset.fragment_bitmap.as_ref().clone()); + } + #[tokio::test] async fn test_initialize_scalar_index_btree() { use crate::dataset::Dataset; diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index f587ec22a91..347c163eac7 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -7,7 +7,11 @@ use super::utils::{IndexMetrics, InstrumentedRecordBatchStreamAdapter}; use crate::{ Dataset, dataset::rowids::load_row_id_sequences, - index::{DatasetIndexExt, DatasetIndexInternalExt, prefilter::DatasetPreFilter}, + index::{ + DatasetIndexExt, + prefilter::DatasetPreFilter, + scalar::{open_named_scalar_index, scalar_index_fragment_bitmap}, + }, }; use arrow_array::{Array, RecordBatch, UInt64Array}; use arrow_schema::{Schema, SchemaRef}; @@ -62,12 +66,7 @@ impl ScalarIndexLoader for Dataset { index_name: &str, metrics: &dyn MetricsCollector, ) -> Result> { - let idx = self - .load_scalar_index(IndexCriteria::default().with_name(index_name)) - .await? - .ok_or_else(|| Error::internal(format!("Scanner created plan for index query on index {} for column {} but no usable index exists with that name", index_name, column)))?; - self.open_scalar_index(column, &idx.uuid.to_string(), metrics) - .await + open_named_scalar_index(self, column, index_name, metrics).await } } @@ -133,13 +132,14 @@ impl ScalarIndexExec { Self::fragments_covered_by_index_query(expr, dataset).await } ScalarIndexExpr::Query(search_key) => { - let idx = dataset - .load_scalar_index(IndexCriteria::default().with_name(&search_key.index_name)) + scalar_index_fragment_bitmap(dataset, &search_key.column, &search_key.index_name) .await? - .expect("Index not found even though it must have been found earlier"); - Ok(idx - .fragment_bitmap - .expect("scalar indices should always have a fragment bitmap")) + .ok_or_else(|| { + Error::internal(format!( + "Index not found even though it must have been found earlier: {}", + search_key.index_name + )) + }) } } } From a7a3081aa463ce0e1cdebf45191856db76b7724c Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 08:15:53 +0000 Subject: [PATCH 2/6] refactor: move scalar segment logic into lance --- rust/lance-index/src/scalar/zonemap.rs | 177 ---------- rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/index.rs | 1 + rust/lance/src/index/scalar.rs | 151 +-------- rust/lance/src/index/scalar_logical.rs | 431 +++++++++++++++++++++++++ rust/lance/src/io/exec/scalar_index.rs | 2 +- 6 files changed, 435 insertions(+), 329 deletions(-) create mode 100644 rust/lance/src/index/scalar_logical.rs diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 17d06a81434..2eb7a672824 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -24,9 +24,7 @@ use crate::scalar::{ }; use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_expr::Accumulator; -use futures::future::try_join_all; use lance_core::cache::{LanceCache, WeakLanceCache}; -use lance_core::utils::mask::NullableRowAddrSet; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; @@ -113,12 +111,6 @@ pub struct ZoneMapIndex { index_cache: WeakLanceCache, } -#[derive(Debug)] -pub struct LogicalZoneMapIndex { - segments: Vec>, - rows_per_zone: u64, -} - impl std::fmt::Debug for ZoneMapIndex { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ZoneMapIndex") @@ -138,55 +130,6 @@ impl DeepSizeOf for ZoneMapIndex { } } -impl LogicalZoneMapIndex { - pub async fn load( - stores: Vec>, - fri: Option>, - index_cache: &LanceCache, - ) -> Result> { - if stores.is_empty() { - return Err(Error::invalid_input( - "LogicalZoneMapIndex requires at least one segment".to_string(), - )); - } - - let segments = try_join_all( - stores - .into_iter() - .map(|store| ZoneMapIndex::load(store, fri.clone(), index_cache)), - ) - .await?; - - let data_type = segments[0].data_type.clone(); - let rows_per_zone = segments[0].rows_per_zone; - for segment in segments.iter().skip(1) { - if segment.data_type != data_type { - return Err(Error::invalid_input(format!( - "LogicalZoneMapIndex requires identical data types across segments, found {:?} and {:?}", - data_type, segment.data_type - ))); - } - if segment.rows_per_zone != rows_per_zone { - return Err(Error::invalid_input(format!( - "LogicalZoneMapIndex requires identical rows_per_zone across segments, found {} and {}", - rows_per_zone, segment.rows_per_zone - ))); - } - } - - Ok(Arc::new(Self { - segments, - rows_per_zone, - })) - } -} - -impl DeepSizeOf for LogicalZoneMapIndex { - fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { - self.segments.deep_size_of_children(context) - } -} - impl ZoneMapIndex { /// Evaluates whether a zone could potentially contain values matching the query /// For NaN, total order is used here @@ -678,126 +621,6 @@ impl ScalarIndex for ZoneMapIndex { } } -#[async_trait] -impl Index for LogicalZoneMapIndex { - fn as_any(&self) -> &dyn Any { - self - } - - fn as_index(self: Arc) -> Arc { - self - } - - fn as_vector_index(self: Arc) -> Result> { - Err(Error::invalid_input_source( - "LogicalZoneMapIndex is not a vector index".into(), - )) - } - - async fn prewarm(&self) -> Result<()> { - for segment in &self.segments { - segment.prewarm().await?; - } - Ok(()) - } - - fn statistics(&self) -> Result { - Ok(serde_json::json!({ - "num_segments": self.segments.len(), - "num_zones": self.segments.iter().map(|segment| segment.zones.len()).sum::(), - "rows_per_zone": self.rows_per_zone, - })) - } - - fn index_type(&self) -> IndexType { - IndexType::ZoneMap - } - - async fn calculate_included_frags(&self) -> Result { - let mut frag_ids = RoaringBitmap::new(); - for segment in &self.segments { - frag_ids |= segment.calculate_included_frags().await?; - } - Ok(frag_ids) - } -} - -#[async_trait] -impl ScalarIndex for LogicalZoneMapIndex { - async fn search( - &self, - query: &dyn AnyQuery, - metrics: &dyn MetricsCollector, - ) -> Result { - let results = try_join_all( - self.segments - .iter() - .map(|segment| segment.search(query, metrics)), - ) - .await?; - - let mut selections = Vec::with_capacity(results.len()); - let mut all_exact = true; - for result in results { - match result { - SearchResult::Exact(rows) => selections.push(rows), - SearchResult::AtMost(rows) => { - all_exact = false; - selections.push(rows); - } - SearchResult::AtLeast(_) => { - return Err(Error::not_supported( - "LogicalZoneMapIndex does not support AtLeast search results".to_string(), - )); - } - } - } - - let selection = NullableRowAddrSet::union_all(&selections); - Ok(if all_exact { - SearchResult::Exact(selection) - } else { - SearchResult::AtMost(selection) - }) - } - - fn can_remap(&self) -> bool { - false - } - - async fn remap( - &self, - _mapping: &HashMap>, - _dest_store: &dyn IndexStore, - ) -> Result { - Err(Error::invalid_input_source( - "LogicalZoneMapIndex does not support remap".into(), - )) - } - - async fn update( - &self, - _new_data: SendableRecordBatchStream, - _dest_store: &dyn IndexStore, - _old_data_filter: Option, - ) -> Result { - Err(Error::invalid_input_source( - "LogicalZoneMapIndex does not support update".into(), - )) - } - - fn update_criteria(&self) -> UpdateCriteria { - UpdateCriteria::only_new_data( - TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(), - ) - } - - fn derive_index_params(&self) -> Result { - let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; - Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) - } -} - fn default_rows_per_zone() -> u64 { *DEFAULT_ROWS_PER_ZONE } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 7800e7fa94c..7e95e38c347 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -81,7 +81,7 @@ use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; use crate::dataset::utils::SchemaAdapter; use crate::index::DatasetIndexInternalExt; -use crate::index::scalar::scalar_index_fragment_bitmap; +use crate::index::scalar_logical::scalar_index_fragment_bitmap; use crate::index::vector::utils::{ default_distance_type_for, get_vector_dim, get_vector_type, validate_distance_type_for, }; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..f5f7241f070 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -73,6 +73,7 @@ pub mod frag_reuse; pub mod mem_wal; pub mod prefilter; pub mod scalar; +pub(crate) mod scalar_logical; pub mod vector; use self::append::merge_indices; diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index f75672b886d..5a5d88adfd1 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -18,7 +18,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::TryStreamExt; use itertools::Itertools; -use lance_core::cache::LanceCache; use lance_core::datatypes::Field; use lance_core::{Error, ROW_ADDR, ROW_ID, Result}; use lance_datafusion::exec::LanceExecutionOptions; @@ -39,12 +38,11 @@ use lance_index::scalar::registry::{ use lance_index::scalar::{CreatedIndex, InvertedIndexParams}; use lance_index::scalar::{ ScalarIndex, ScalarIndexParams, bitmap::BITMAP_LOOKUP_NAME, inverted::INVERT_LIST_FILE, - lance_format::LanceIndexStore, zonemap::LogicalZoneMapIndex, + lance_format::LanceIndexStore, }; use lance_index::{IndexCriteria, IndexType}; use lance_table::format::{Fragment, IndexMetadata}; use log::info; -use roaring::RoaringBitmap; use tracing::instrument; // Log an update every TRAINING_UPDATE_FREQ million rows processed @@ -401,104 +399,6 @@ pub async fn open_scalar_index( .await } -fn index_intersects_dataset(index: &IndexMetadata, dataset: &Dataset) -> bool { - index - .fragment_bitmap - .as_ref() - .is_some_and(|index_bitmap| index_bitmap.intersection_len(&dataset.fragment_bitmap) > 0) -} - -fn union_fragment_bitmaps(indices: &[IndexMetadata], index_name: &str) -> Result { - let mut combined = RoaringBitmap::new(); - for index in indices { - let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or_else(|| { - Error::invalid_input(format!( - "Scalar index '{}' segment {} is missing fragment coverage", - index_name, index.uuid - )) - })?; - combined |= fragment_bitmap.clone(); - } - Ok(combined) -} - -async fn load_named_zonemap_segments( - dataset: &Dataset, - column: &str, - index_name: &str, -) -> Result>> { - let usable_indices = dataset - .load_indices_by_name(index_name) - .await? - .into_iter() - .filter(|index| index_intersects_dataset(index, dataset)) - .collect::>(); - - if usable_indices.len() <= 1 { - return Ok(None); - } - - for index in &usable_indices { - let index_details = fetch_index_details(dataset, column, index).await?; - if !index_details.type_url.ends_with("ZoneMapIndexDetails") { - return Ok(None); - } - } - - Ok(Some(usable_indices)) -} - -pub(crate) async fn scalar_index_fragment_bitmap( - dataset: &Dataset, - column: &str, - index_name: &str, -) -> Result> { - if let Some(indices) = load_named_zonemap_segments(dataset, column, index_name).await? { - return union_fragment_bitmaps(&indices, index_name).map(Some); - } - - Ok(dataset - .load_scalar_index(IndexCriteria::default().with_name(index_name)) - .await? - .and_then(|index| index.fragment_bitmap)) -} - -pub(crate) async fn open_named_scalar_index( - dataset: &Dataset, - column: &str, - index_name: &str, - metrics: &dyn MetricsCollector, -) -> Result> { - if let Some(indices) = load_named_zonemap_segments(dataset, column, index_name).await? { - let frag_reuse_index = dataset.open_frag_reuse_index(metrics).await?; - let stores = indices - .iter() - .map(|index| { - LanceIndexStore::from_dataset_for_existing(dataset, index) - .map(|store| Arc::new(store) as Arc) - }) - .collect::>>()?; - let logical_cache = LanceCache::no_cache(); - return Ok( - LogicalZoneMapIndex::load(stores, frag_reuse_index, &logical_cache).await? - as Arc, - ); - } - - let index = dataset - .load_scalar_index(IndexCriteria::default().with_name(index_name)) - .await? - .ok_or_else(|| { - Error::internal(format!( - "Scanner created plan for index query on index {} for column {} but no usable index exists with that name", - index_name, column - )) - })?; - dataset - .open_scalar_index(column, &index.uuid.to_string(), metrics) - .await -} - pub(crate) async fn infer_scalar_index_details( dataset: &Dataset, column: &str, @@ -695,7 +595,6 @@ mod tests { use super::*; use crate::dataset::Dataset; - use crate::index::create::CreateIndexBuilder; use arrow::{ array::AsArray, datatypes::{Int32Type, UInt64Type}, @@ -705,7 +604,6 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_core::{datatypes::Field, utils::address::RowAddress}; use lance_datagen::array; - use lance_index::metrics::NoOpMetricsCollector; use lance_index::{IndexType, optimize::OptimizeOptions}; use lance_index::{ pbold::NGramIndexDetails, @@ -923,53 +821,6 @@ mod tests { assert_eq!(max_frag_id_seen, 3); } - #[tokio::test] - async fn test_open_named_scalar_index_uses_all_zonemap_segments() { - let dataset = lance_datagen::gen_batch() - .col("value", array::step::()) - .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(16)) - .await - .unwrap(); - let mut dataset = dataset; - let fragments = dataset.get_fragments(); - let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); - let mut segments = Vec::new(); - - for fragment in &fragments { - let segment = - CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) - .name("value_zonemap".to_string()) - .fragments(vec![fragment.id() as u32]) - .execute_uncommitted() - .await - .unwrap(); - segments.push(segment); - } - - dataset - .commit_existing_index_segments("value_zonemap", "value", segments) - .await - .unwrap(); - - let committed = dataset.load_indices_by_name("value_zonemap").await.unwrap(); - assert_eq!(committed.len(), fragments.len()); - - let logical = - open_named_scalar_index(&dataset, "value", "value_zonemap", &NoOpMetricsCollector) - .await - .unwrap(); - assert_eq!( - logical.calculate_included_frags().await.unwrap(), - dataset.fragment_bitmap.as_ref().clone() - ); - - let combined_bitmap = scalar_index_fragment_bitmap(&dataset, "value", "value_zonemap") - .await - .unwrap() - .unwrap(); - assert_eq!(combined_bitmap, dataset.fragment_bitmap.as_ref().clone()); - } - #[tokio::test] async fn test_initialize_scalar_index_btree() { use crate::dataset::Dataset; diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs new file mode 100644 index 00000000000..b7430d306a9 --- /dev/null +++ b/rust/lance/src/index/scalar_logical.rs @@ -0,0 +1,431 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Query-time logical views over scalar index segments. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use deepsize::{Context, DeepSizeOf}; +use futures::future::try_join_all; +use lance_core::utils::mask::NullableRowAddrSet; +use lance_core::{Error, Result}; +use lance_index::metrics::MetricsCollector; +use lance_index::scalar::{AnyQuery, CreatedIndex, ScalarIndex, SearchResult, UpdateCriteria}; +use lance_index::{Index, IndexType}; +use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; +use serde_json::json; + +use crate::dataset::Dataset; +use crate::index::scalar::fetch_index_details; +use crate::index::{DatasetIndexExt, DatasetIndexInternalExt}; + +#[derive(Debug)] +pub(crate) struct LogicalScalarIndex { + name: String, + column: String, + index_type: IndexType, + segments: Vec>, +} + +impl LogicalScalarIndex { + fn try_new(name: String, column: String, segments: Vec>) -> Result { + let Some(first) = segments.first() else { + return Err(Error::invalid_input(format!( + "LogicalScalarIndex '{}' on column '{}' must contain at least one segment", + name, column + ))); + }; + let index_type = first.index_type(); + if segments + .iter() + .any(|segment| segment.index_type() != index_type) + { + return Err(Error::invalid_input(format!( + "LogicalScalarIndex '{}' on column '{}' mixes scalar index types", + name, column + ))); + } + + Ok(Self { + name, + column, + index_type, + segments, + }) + } +} + +impl DeepSizeOf for LogicalScalarIndex { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + self.name.deep_size_of_children(context) + + self.column.deep_size_of_children(context) + + self.segments.deep_size_of_children(context) + } +} + +#[async_trait] +impl Index for LogicalScalarIndex { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_index(self: Arc) -> Arc { + self + } + + fn as_vector_index(self: Arc) -> Result> { + Err(Error::invalid_input(format!( + "LogicalScalarIndex '{}' is not a vector index", + self.name + ))) + } + + fn statistics(&self) -> Result { + Ok(json!({ + "index_name": self.name, + "column": self.column, + "index_type": self.index_type.to_string(), + "num_segments": self.segments.len(), + })) + } + + async fn prewarm(&self) -> Result<()> { + try_join_all(self.segments.iter().map(|segment| segment.prewarm())).await?; + Ok(()) + } + + fn index_type(&self) -> IndexType { + self.index_type + } + + async fn calculate_included_frags(&self) -> Result { + let fragment_sets = try_join_all( + self.segments + .iter() + .map(|segment| segment.calculate_included_frags()), + ) + .await?; + let mut combined = RoaringBitmap::new(); + for fragment_set in fragment_sets { + combined |= fragment_set; + } + Ok(combined) + } +} + +#[async_trait] +impl ScalarIndex for LogicalScalarIndex { + async fn search( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + ) -> Result { + let results = try_join_all( + self.segments + .iter() + .map(|segment| segment.search(query, metrics)), + ) + .await?; + combine_search_results(results) + } + + fn can_remap(&self) -> bool { + false + } + + async fn remap( + &self, + _mapping: &std::collections::HashMap>, + _dest_store: &dyn lance_index::scalar::IndexStore, + ) -> Result { + Err(Error::invalid_input(format!( + "LogicalScalarIndex '{}' is a query-time wrapper and does not support remap", + self.name + ))) + } + + async fn update( + &self, + _new_data: datafusion::physical_plan::SendableRecordBatchStream, + _dest_store: &dyn lance_index::scalar::IndexStore, + _old_data_filter: Option, + ) -> Result { + Err(Error::invalid_input(format!( + "LogicalScalarIndex '{}' is a query-time wrapper and does not support update", + self.name + ))) + } + + fn update_criteria(&self) -> UpdateCriteria { + self.segments[0].update_criteria() + } + + fn derive_index_params(&self) -> Result { + self.segments[0].derive_index_params() + } +} + +fn combine_search_results(results: Vec) -> Result { + let mut saw_at_most = false; + let mut saw_at_least = false; + let mut sets = Vec::with_capacity(results.len()); + + for result in results { + match result { + SearchResult::Exact(set) => sets.push(set), + SearchResult::AtMost(set) => { + saw_at_most = true; + sets.push(set); + } + SearchResult::AtLeast(set) => { + saw_at_least = true; + sets.push(set); + } + } + } + + if saw_at_most && saw_at_least { + return Err(Error::not_supported( + "Logical scalar index cannot combine mixed AtMost and AtLeast segment results", + )); + } + + let combined = NullableRowAddrSet::union_all(&sets); + Ok(if saw_at_most { + SearchResult::AtMost(combined) + } else if saw_at_least { + SearchResult::AtLeast(combined) + } else { + SearchResult::Exact(combined) + }) +} + +fn index_intersects_dataset(index: &IndexMetadata, dataset: &Dataset) -> bool { + index + .fragment_bitmap + .as_ref() + .is_some_and(|index_bitmap| index_bitmap.intersection_len(&dataset.fragment_bitmap) > 0) +} + +async fn load_named_scalar_segments( + dataset: &Dataset, + column: &str, + index_name: &str, +) -> Result> { + let usable_indices = dataset + .load_indices_by_name(index_name) + .await? + .into_iter() + .filter(|index| index_intersects_dataset(index, dataset)) + .collect::>(); + + let mut index_type_url = None::; + for index in &usable_indices { + let index_details = fetch_index_details(dataset, column, index).await?; + match &index_type_url { + Some(expected) if expected != &index_details.type_url => { + return Err(Error::invalid_input(format!( + "Scalar index '{}' on column '{}' mixes incompatible segment types", + index_name, column + ))); + } + None => index_type_url = Some(index_details.type_url.clone()), + Some(_) => {} + } + } + + Ok(usable_indices) +} + +fn union_fragment_bitmaps(indices: &[IndexMetadata], index_name: &str) -> Result { + let mut combined = RoaringBitmap::new(); + for index in indices { + let fragment_bitmap = index.fragment_bitmap.as_ref().ok_or_else(|| { + Error::invalid_input(format!( + "Scalar index '{}' segment {} is missing fragment coverage", + index_name, index.uuid + )) + })?; + combined |= fragment_bitmap.clone(); + } + Ok(combined) +} + +pub(crate) async fn scalar_index_fragment_bitmap( + dataset: &Dataset, + column: &str, + index_name: &str, +) -> Result> { + let indices = load_named_scalar_segments(dataset, column, index_name).await?; + match indices.len() { + 0 => Ok(None), + 1 => Ok(indices + .into_iter() + .next() + .and_then(|index| index.fragment_bitmap)), + _ => union_fragment_bitmaps(&indices, index_name).map(Some), + } +} + +pub(crate) async fn open_named_scalar_index( + dataset: &Dataset, + column: &str, + index_name: &str, + metrics: &dyn MetricsCollector, +) -> Result> { + let indices = load_named_scalar_segments(dataset, column, index_name).await?; + match indices.len() { + 0 => Err(Error::internal(format!( + "Scanner created plan for index query on index {} for column {} but no usable index exists with that name", + index_name, column + ))), + 1 => { + let uuid = indices[0].uuid.to_string(); + dataset.open_scalar_index(column, &uuid, metrics).await + } + _ => { + let segments = try_join_all(indices.iter().map(|index| { + let uuid = index.uuid.to_string(); + async move { dataset.open_scalar_index(column, &uuid, metrics).await } + })) + .await?; + + Ok(Arc::new(LogicalScalarIndex::try_new( + index_name.to_string(), + column.to_string(), + segments, + )?) as Arc) + } + } +} + +#[cfg(test)] +mod tests { + use std::ops::Bound; + + use arrow::datatypes::Int32Type; + use datafusion::scalar::ScalarValue; + use lance_core::utils::address::RowAddress; + use lance_datagen::array; + use lance_index::IndexType; + use lance_index::metrics::NoOpMetricsCollector; + use lance_index::scalar::{BuiltinIndexType, SargableQuery, ScalarIndexParams}; + + use crate::index::create::CreateIndexBuilder; + use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; + + use super::*; + + #[tokio::test] + async fn test_open_named_scalar_index_uses_all_zonemap_segments() { + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(16)) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + let mut segments = Vec::new(); + + for fragment in &fragments { + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + segments.push(segment); + } + + dataset + .commit_existing_index_segments("value_zonemap", "value", segments) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("value_zonemap").await.unwrap(); + assert_eq!(committed.len(), fragments.len()); + + let logical = + open_named_scalar_index(&dataset, "value", "value_zonemap", &NoOpMetricsCollector) + .await + .unwrap(); + assert_eq!( + logical.calculate_included_frags().await.unwrap(), + dataset.fragment_bitmap.as_ref().clone() + ); + + let combined_bitmap = scalar_index_fragment_bitmap(&dataset, "value", "value_zonemap") + .await + .unwrap() + .unwrap(); + assert_eq!(combined_bitmap, dataset.fragment_bitmap.as_ref().clone()); + } + + #[tokio::test] + async fn test_zonemap_segment_search_keeps_fragment_ids() { + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(16)) + .await + .unwrap(); + let mut dataset = dataset; + let target_fragment = dataset.get_fragments()[2].id() as u32; + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap_single_fragment".to_string()) + .fragments(vec![target_fragment]) + .execute_uncommitted() + .await + .unwrap(); + + dataset + .commit_existing_index_segments("value_zonemap_single_fragment", "value", vec![segment]) + .await + .unwrap(); + + let logical = open_named_scalar_index( + &dataset, + "value", + "value_zonemap_single_fragment", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + assert_eq!( + logical + .calculate_included_frags() + .await + .unwrap() + .iter() + .collect::>(), + vec![target_fragment] + ); + + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(0))), + Bound::Included(ScalarValue::Int32(Some(10_000))), + ); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let searched_fragments = result + .row_addrs() + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert!(!searched_fragments.is_empty()); + assert!( + searched_fragments + .iter() + .all(|fragment_id| *fragment_id == target_fragment) + ); + } +} diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 347c163eac7..a0cc9ae20f0 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -10,7 +10,7 @@ use crate::{ index::{ DatasetIndexExt, prefilter::DatasetPreFilter, - scalar::{open_named_scalar_index, scalar_index_fragment_bitmap}, + scalar_logical::{open_named_scalar_index, scalar_index_fragment_bitmap}, }, }; use arrow_array::{Array, RecordBatch, UInt64Array}; From 31677a3f805978ec977d9a4cb307b0f34ee04c1f Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 08:22:36 +0000 Subject: [PATCH 3/6] test: cover segmented zonemap end to end --- rust/lance/src/dataset/scanner.rs | 115 ++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 7e95e38c347..6070e7f2fdb 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -8183,6 +8183,121 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") ); } + #[tokio::test] + async fn test_like_prefix_with_segmented_zone_map() { + use lance_index::scalar::BuiltinIndexType; + + let data = gen_batch() + .col( + "name", + array::cycle_utf8_literals(&[ + "apple", + "application", + "app", + "banana", + "band", + "testns1", + "testns2", + "test", + "testing", + "zoo", + ]), + ) + .col("id", array::step::()) + .into_reader_rows(RowCount::from(150), BatchCount::from(6)); + + let write_params = WriteParams { + max_rows_per_file: 25, + max_rows_per_group: 10, + ..Default::default() + }; + + let mut dataset = Dataset::write( + data, + "memory://test_like_segmented_zonemap", + Some(write_params), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() > 1, "expected multiple fragments"); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + let mut segments = Vec::with_capacity(fragments.len()); + for fragment in &fragments { + let mut builder = dataset.create_index_builder(&["name"], IndexType::Scalar, ¶ms); + builder = builder + .name("name_zonemap".to_string()) + .fragments(vec![fragment.id() as u32]); + segments.push(builder.execute_uncommitted().await.unwrap()); + } + + dataset + .commit_existing_index_segments("name_zonemap", "name", segments) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("name_zonemap").await.unwrap(); + assert_eq!(committed.len(), fragments.len()); + + let mut scanner = dataset.scan(); + scanner.filter("name LIKE 'app%'").unwrap(); + let plan = scanner.create_plan().await.unwrap(); + let plan_str = format!("{:?}", plan); + assert!( + plan_str.contains("ScalarIndexExec") && plan_str.contains("LikePrefix"), + "segmented zonemap should use LikePrefix pruning, but got: {}", + plan_str + ); + + let with_index = dataset + .scan() + .filter("name LIKE 'app%'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let without_index = dataset + .scan() + .use_scalar_index(false) + .filter("name LIKE 'app%'") + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let with_index_ids = with_index + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect::>(); + let without_index_ids = without_index + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .iter() + .copied() + .collect::>(); + assert_eq!(with_index_ids, without_index_ids); + assert!(!with_index_ids.is_empty()); + + let names = with_index + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|value| value.unwrap()) + .collect::>(); + assert!(names.iter().all(|name| name.starts_with("app"))); + } + #[tokio::test] async fn test_like_prefix_correctness_with_zone_map() { use lance_index::scalar::BuiltinIndexType; From 52a0398ebb8fa5377af719957961cb37be7bb4c4 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 16:57:55 +0000 Subject: [PATCH 4/6] ci: fix zonemap PR clippy and compat checks --- python/python/tests/compat/venv_manager.py | 52 ++++++++++++++++++++-- rust/lance/src/index/scalar_logical.rs | 6 +-- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/python/python/tests/compat/venv_manager.py b/python/python/tests/compat/venv_manager.py index a5e52bbc3fd..3f16b6812a4 100644 --- a/python/python/tests/compat/venv_manager.py +++ b/python/python/tests/compat/venv_manager.py @@ -16,6 +16,28 @@ from pathlib import Path from typing import Any, Optional +from packaging.version import Version + + +LEGACY_NAMESPACE_CUTOFF = Version("2.1.0") +LEGACY_NAMESPACE_VERSION = "0.5.2" + + +def uses_legacy_namespace(version: str) -> bool: + return Version(version) < LEGACY_NAMESPACE_CUTOFF + + +def compat_packages(version: str) -> list[str]: + packages = [f"pylance=={version}", "pytest"] + if uses_legacy_namespace(version): + packages.extend( + [ + f"lance-namespace=={LEGACY_NAMESPACE_VERSION}", + f"lance-namespace-urllib3-client=={LEGACY_NAMESPACE_VERSION}", + ] + ) + return packages + class VenvExecutor: """Manages a virtual environment with a specific Lance version.""" @@ -68,7 +90,29 @@ def _validate_venv(self) -> bool: for line in result.stdout.splitlines(): if line.startswith("Version:"): installed_version = line.split(":", 1)[1].strip() - return installed_version == self.version + if installed_version != self.version: + return False + break + else: + return False + + if uses_legacy_namespace(self.version): + namespace_result = subprocess.run( + [str(self.python_path), "-m", "pip", "show", "lance-namespace"], + capture_output=True, + text=True, + timeout=5, + ) + if namespace_result.returncode != 0: + return False + + for line in namespace_result.stdout.splitlines(): + if line.startswith("Version:"): + namespace_version = line.split(":", 1)[1].strip() + return namespace_version == LEGACY_NAMESPACE_VERSION + return False + + return True except Exception: return False @@ -85,6 +129,9 @@ def create(self): self._created = True return + if self.persistent and self.venv_path.exists(): + self.cleanup() + # Create virtual environment subprocess.run( [sys.executable, "-m", "venv", str(self.venv_path)], @@ -105,8 +152,7 @@ def create(self): "https://pypi.fury.io/lance-format/", "--extra-index-url", "https://pypi.fury.io/lancedb/", - f"pylance=={self.version}", - "pytest", + *compat_packages(self.version), ], check=True, capture_output=True, diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index b7430d306a9..b4b02b45e27 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -23,7 +23,7 @@ use crate::index::scalar::fetch_index_details; use crate::index::{DatasetIndexExt, DatasetIndexInternalExt}; #[derive(Debug)] -pub(crate) struct LogicalScalarIndex { +pub struct LogicalScalarIndex { name: String, column: String, index_type: IndexType, @@ -254,7 +254,7 @@ fn union_fragment_bitmaps(indices: &[IndexMetadata], index_name: &str) -> Result Ok(combined) } -pub(crate) async fn scalar_index_fragment_bitmap( +pub async fn scalar_index_fragment_bitmap( dataset: &Dataset, column: &str, index_name: &str, @@ -270,7 +270,7 @@ pub(crate) async fn scalar_index_fragment_bitmap( } } -pub(crate) async fn open_named_scalar_index( +pub async fn open_named_scalar_index( dataset: &Dataset, column: &str, index_name: &str, From b3b292463ce0eb0ee95f3612a707aa23763abc6b Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 17:39:24 +0000 Subject: [PATCH 5/6] fix: validate scalar index segments at commit time --- rust/lance/src/index.rs | 78 ++++++++++++++++++++++++++ rust/lance/src/index/scalar_logical.rs | 26 +++++++-- 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index f5f7241f070..de1d65d8eb5 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -127,6 +127,30 @@ fn validate_segment_metadata(index_name: &str, segments: &[IndexMetadata]) -> Re Ok(()) } +fn validate_segment_index_details(index_name: &str, segments: &[IndexMetadata]) -> Result<()> { + let mut type_url = None::<&str>; + for segment in segments { + let segment_type_url = segment.index_details.as_ref().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing index details", + segment.uuid + )) + })?; + match type_url { + Some(expected) if expected != segment_type_url.type_url => { + return Err(Error::invalid_input(format!( + "CreateIndex: segment set for index '{}' mixes incompatible index detail types", + index_name + ))); + } + None => type_url = Some(segment_type_url.type_url.as_str()), + Some(_) => {} + } + } + + Ok(()) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub struct ScalarIndexCacheKey<'a> { @@ -968,6 +992,7 @@ impl DatasetIndexExt for Dataset { }; validate_segment_metadata(index_name, &segments)?; + validate_segment_index_details(index_name, &segments)?; let mut new_indices = Vec::with_capacity(segments.len()); for mut segment in segments { @@ -2254,6 +2279,7 @@ mod tests { use lance_core::utils::tempfile::TempStrDir; use lance_datagen::gen_batch; use lance_datagen::{BatchCount, ByteCount, Dimension, RowCount, array}; + use lance_index::pbold::BTreeIndexDetails; use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME; use lance_index::scalar::{ BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams, @@ -6029,6 +6055,58 @@ mod tests { assert!(err.to_string().contains("overlapping fragment coverage")); } + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_mixed_index_detail_types() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(20), BatchCount::from(2)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let field_id = dataset.schema().field("vector").unwrap().id; + let seg0 = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, + Uuid::new_v4(), + [0_u32], + b"seg0", + ) + .await; + let seg1 = IndexMetadata { + uuid: Uuid::new_v4(), + name: "vector_idx".to_string(), + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(std::iter::once(1_u32).collect()), + index_details: Some(Arc::new( + prost_types::Any::from_msg(&BTreeIndexDetails::default()).unwrap(), + )), + index_version: IndexType::BTree.version(), + created_at: Some(chrono::Utc::now()), + base_id: None, + files: seg0.files.clone(), + }; + + let err = dataset + .commit_existing_index_segments("vector_idx", "vector", vec![seg0, seg1]) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("mixes incompatible index detail types") + ); + } + #[tokio::test] async fn test_resolve_index_column_error_cases() { use lance_datagen::{BatchCount, RowCount, array}; diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index b4b02b45e27..99cdd6fd0b8 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -142,7 +142,7 @@ impl ScalarIndex for LogicalScalarIndex { _dest_store: &dyn lance_index::scalar::IndexStore, ) -> Result { Err(Error::invalid_input(format!( - "LogicalScalarIndex '{}' is a query-time wrapper and does not support remap", + "LogicalScalarIndex '{}' is a query-time wrapper and does not support remap; rebuild the index to consolidate segments before remapping", self.name ))) } @@ -154,7 +154,7 @@ impl ScalarIndex for LogicalScalarIndex { _old_data_filter: Option, ) -> Result { Err(Error::invalid_input(format!( - "LogicalScalarIndex '{}' is a query-time wrapper and does not support update", + "LogicalScalarIndex '{}' is a query-time wrapper and does not support update; rebuild the index to consolidate segments before updating", self.name ))) } @@ -222,17 +222,33 @@ async fn load_named_scalar_segments( .filter(|index| index_intersects_dataset(index, dataset)) .collect::>(); + let needs_fallback_fetch = usable_indices + .iter() + .any(|index| index.index_details.is_none()); + let mut index_type_url = None::; for index in &usable_indices { - let index_details = fetch_index_details(dataset, column, index).await?; + let segment_type_url = if needs_fallback_fetch { + fetch_index_details(dataset, column, index) + .await? + .type_url + .clone() + } else { + index + .index_details + .as_ref() + .expect("checked above") + .type_url + .clone() + }; match &index_type_url { - Some(expected) if expected != &index_details.type_url => { + Some(expected) if expected != &segment_type_url => { return Err(Error::invalid_input(format!( "Scalar index '{}' on column '{}' mixes incompatible segment types", index_name, column ))); } - None => index_type_url = Some(index_details.type_url.clone()), + None => index_type_url = Some(segment_type_url), Some(_) => {} } } From 1ede7a1dfddfca6e8cf3224d332bbe70ebdc0252 Mon Sep 17 00:00:00 2001 From: beinan Date: Wed, 22 Apr 2026 18:25:52 +0000 Subject: [PATCH 6/6] chore: drop unrelated compat CI change --- python/python/tests/compat/venv_manager.py | 52 ++-------------------- 1 file changed, 3 insertions(+), 49 deletions(-) diff --git a/python/python/tests/compat/venv_manager.py b/python/python/tests/compat/venv_manager.py index 3f16b6812a4..a5e52bbc3fd 100644 --- a/python/python/tests/compat/venv_manager.py +++ b/python/python/tests/compat/venv_manager.py @@ -16,28 +16,6 @@ from pathlib import Path from typing import Any, Optional -from packaging.version import Version - - -LEGACY_NAMESPACE_CUTOFF = Version("2.1.0") -LEGACY_NAMESPACE_VERSION = "0.5.2" - - -def uses_legacy_namespace(version: str) -> bool: - return Version(version) < LEGACY_NAMESPACE_CUTOFF - - -def compat_packages(version: str) -> list[str]: - packages = [f"pylance=={version}", "pytest"] - if uses_legacy_namespace(version): - packages.extend( - [ - f"lance-namespace=={LEGACY_NAMESPACE_VERSION}", - f"lance-namespace-urllib3-client=={LEGACY_NAMESPACE_VERSION}", - ] - ) - return packages - class VenvExecutor: """Manages a virtual environment with a specific Lance version.""" @@ -90,29 +68,7 @@ def _validate_venv(self) -> bool: for line in result.stdout.splitlines(): if line.startswith("Version:"): installed_version = line.split(":", 1)[1].strip() - if installed_version != self.version: - return False - break - else: - return False - - if uses_legacy_namespace(self.version): - namespace_result = subprocess.run( - [str(self.python_path), "-m", "pip", "show", "lance-namespace"], - capture_output=True, - text=True, - timeout=5, - ) - if namespace_result.returncode != 0: - return False - - for line in namespace_result.stdout.splitlines(): - if line.startswith("Version:"): - namespace_version = line.split(":", 1)[1].strip() - return namespace_version == LEGACY_NAMESPACE_VERSION - return False - - return True + return installed_version == self.version except Exception: return False @@ -129,9 +85,6 @@ def create(self): self._created = True return - if self.persistent and self.venv_path.exists(): - self.cleanup() - # Create virtual environment subprocess.run( [sys.executable, "-m", "venv", str(self.venv_path)], @@ -152,7 +105,8 @@ def create(self): "https://pypi.fury.io/lance-format/", "--extra-index-url", "https://pypi.fury.io/lancedb/", - *compat_packages(self.version), + f"pylance=={self.version}", + "pytest", ], check=True, capture_output=True,