Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,15 +918,9 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin {
data: SendableRecordBatchStream,
index_store: &dyn IndexStore,
request: Box<dyn TrainingRequest>,
fragment_ids: Option<Vec<u32>>,
_fragment_ids: Option<Vec<u32>>,
_progress: Arc<dyn crate::progress::IndexBuildProgress>,
) -> Result<CreatedIndex> {
if fragment_ids.is_some() {
return Err(Error::invalid_input_source(
"ZoneMap index does not support fragment training".into(),
));
}

let request = (request as Box<dyn std::any::Any>)
.downcast::<ZoneMapIndexTrainingRequest>()
.map_err(|_| {
Expand Down
138 changes: 128 additions & 10 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use super::Dataset;
use crate::dataset::row_offsets_to_row_addresses;
use crate::dataset::utils::SchemaAdapter;
use crate::index::DatasetIndexInternalExt;
use crate::index::scalar_logical::scalar_index_fragment_bitmap;
use crate::index::vector::utils::{
default_distance_type_for, get_vector_dim, get_vector_type, validate_distance_type_for,
};
Expand Down Expand Up @@ -3810,16 +3811,18 @@ impl Scanner {
ScalarIndexExpr::Or(lhs, rhs) => Ok(self.fragments_covered_by_index_query(lhs).await?
& self.fragments_covered_by_index_query(rhs).await?),
ScalarIndexExpr::Not(expr) => self.fragments_covered_by_index_query(expr).await,
ScalarIndexExpr::Query(search) => {
let idx = self
.dataset
.load_scalar_index(IndexCriteria::default().with_name(&search.index_name))
.await?
.expect("Index not found even though it must have been found earlier");
Ok(idx
.fragment_bitmap
.expect("scalar indices should always have a fragment bitmap"))
}
ScalarIndexExpr::Query(search) => scalar_index_fragment_bitmap(
self.dataset.as_ref(),
&search.column,
&search.index_name,
)
.await?
.ok_or_else(|| {
crate::Error::internal(format!(
"Index not found even though it must have been found earlier: {}",
search.index_name
))
}),
}
}

Expand Down Expand Up @@ -8180,6 +8183,121 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
);
}

#[tokio::test]
async fn test_like_prefix_with_segmented_zone_map() {
use lance_index::scalar::BuiltinIndexType;

let data = gen_batch()
.col(
"name",
array::cycle_utf8_literals(&[
"apple",
"application",
"app",
"banana",
"band",
"testns1",
"testns2",
"test",
"testing",
"zoo",
]),
)
.col("id", array::step::<Int32Type>())
.into_reader_rows(RowCount::from(150), BatchCount::from(6));

let write_params = WriteParams {
max_rows_per_file: 25,
max_rows_per_group: 10,
..Default::default()
};

let mut dataset = Dataset::write(
data,
"memory://test_like_segmented_zonemap",
Some(write_params),
)
.await
.unwrap();

let fragments = dataset.get_fragments();
assert!(fragments.len() > 1, "expected multiple fragments");

let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap);
let mut segments = Vec::with_capacity(fragments.len());
for fragment in &fragments {
let mut builder = dataset.create_index_builder(&["name"], IndexType::Scalar, &params);
builder = builder
.name("name_zonemap".to_string())
.fragments(vec![fragment.id() as u32]);
segments.push(builder.execute_uncommitted().await.unwrap());
}

dataset
.commit_existing_index_segments("name_zonemap", "name", segments)
.await
.unwrap();

let committed = dataset.load_indices_by_name("name_zonemap").await.unwrap();
assert_eq!(committed.len(), fragments.len());

let mut scanner = dataset.scan();
scanner.filter("name LIKE 'app%'").unwrap();
let plan = scanner.create_plan().await.unwrap();
let plan_str = format!("{:?}", plan);
assert!(
plan_str.contains("ScalarIndexExec") && plan_str.contains("LikePrefix"),
"segmented zonemap should use LikePrefix pruning, but got: {}",
plan_str
);

let with_index = dataset
.scan()
.filter("name LIKE 'app%'")
.unwrap()
.try_into_batch()
.await
.unwrap();
let without_index = dataset
.scan()
.use_scalar_index(false)
.filter("name LIKE 'app%'")
.unwrap()
.try_into_batch()
.await
.unwrap();

let with_index_ids = with_index
.column_by_name("id")
.unwrap()
.as_primitive::<Int32Type>()
.values()
.iter()
.copied()
.collect::<BTreeSet<_>>();
let without_index_ids = without_index
.column_by_name("id")
.unwrap()
.as_primitive::<Int32Type>()
.values()
.iter()
.copied()
.collect::<BTreeSet<_>>();
assert_eq!(with_index_ids, without_index_ids);
assert!(!with_index_ids.is_empty());

let names = with_index
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|value| value.unwrap())
.collect::<Vec<_>>();
assert!(names.iter().all(|name| name.starts_with("app")));
}

#[tokio::test]
async fn test_like_prefix_correctness_with_zone_map() {
use lance_index::scalar::BuiltinIndexType;
Expand Down
79 changes: 79 additions & 0 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub mod frag_reuse;
pub mod mem_wal;
pub mod prefilter;
pub mod scalar;
pub(crate) mod scalar_logical;
pub mod vector;

use self::append::merge_indices;
Expand Down Expand Up @@ -126,6 +127,30 @@ fn validate_segment_metadata(index_name: &str, segments: &[IndexMetadata]) -> Re
Ok(())
}

fn validate_segment_index_details(index_name: &str, segments: &[IndexMetadata]) -> Result<()> {
let mut type_url = None::<&str>;
for segment in segments {
let segment_type_url = segment.index_details.as_ref().ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing index details",
segment.uuid
))
})?;
match type_url {
Some(expected) if expected != segment_type_url.type_url => {
return Err(Error::invalid_input(format!(
"CreateIndex: segment set for index '{}' mixes incompatible index detail types",
index_name
)));
}
None => type_url = Some(segment_type_url.type_url.as_str()),
Some(_) => {}
}
}

Ok(())
}

// Cache keys for different index types
#[derive(Debug, Clone)]
pub struct ScalarIndexCacheKey<'a> {
Expand Down Expand Up @@ -967,6 +992,7 @@ impl DatasetIndexExt for Dataset {
};

validate_segment_metadata(index_name, &segments)?;
validate_segment_index_details(index_name, &segments)?;

let mut new_indices = Vec::with_capacity(segments.len());
for mut segment in segments {
Expand Down Expand Up @@ -2253,6 +2279,7 @@ mod tests {
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::gen_batch;
use lance_datagen::{BatchCount, ByteCount, Dimension, RowCount, array};
use lance_index::pbold::BTreeIndexDetails;
use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME;
use lance_index::scalar::{
BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams,
Expand Down Expand Up @@ -6028,6 +6055,58 @@ mod tests {
assert!(err.to_string().contains("overlapping fragment coverage"));
}

#[tokio::test]
async fn test_commit_existing_index_segments_rejects_mixed_index_detail_types() {
use lance_datagen::{BatchCount, RowCount, array};

let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let reader = lance_datagen::gen_batch()
.col("id", array::step::<arrow_array::types::Int32Type>())
.col(
"vector",
array::rand_vec::<arrow_array::types::Float32Type>(8.into()),
)
.into_reader_rows(RowCount::from(20), BatchCount::from(2));

let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();

let field_id = dataset.schema().field("vector").unwrap().id;
let seg0 = write_vector_segment_metadata(
&dataset,
"vector_idx",
field_id,
Uuid::new_v4(),
[0_u32],
b"seg0",
)
.await;
let seg1 = IndexMetadata {
uuid: Uuid::new_v4(),
name: "vector_idx".to_string(),
fields: vec![field_id],
dataset_version: dataset.manifest.version,
fragment_bitmap: Some(std::iter::once(1_u32).collect()),
index_details: Some(Arc::new(
prost_types::Any::from_msg(&BTreeIndexDetails::default()).unwrap(),
)),
index_version: IndexType::BTree.version(),
created_at: Some(chrono::Utc::now()),
base_id: None,
files: seg0.files.clone(),
};

let err = dataset
.commit_existing_index_segments("vector_idx", "vector", vec![seg0, seg1])
.await
.unwrap_err();
assert!(
err.to_string()
.contains("mixes incompatible index detail types")
);
}

#[tokio::test]
async fn test_resolve_index_column_error_cases() {
use lance_datagen::{BatchCount, RowCount, array};
Expand Down
6 changes: 5 additions & 1 deletion rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ mod tests {
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};

use super::*;
use crate::dataset::Dataset;
use arrow::{
array::AsArray,
datatypes::{Int32Type, UInt64Type},
Expand All @@ -604,7 +605,10 @@ mod tests {
use lance_core::{datatypes::Field, utils::address::RowAddress};
use lance_datagen::array;
use lance_index::{IndexType, optimize::OptimizeOptions};
use lance_index::{pbold::NGramIndexDetails, scalar::BuiltinIndexType};
use lance_index::{
pbold::NGramIndexDetails,
scalar::{BuiltinIndexType, ScalarIndexParams},
};
use lance_table::format::pb::VectorIndexDetails;

fn make_index_metadata(
Expand Down
Loading
Loading