diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index f3dbb0ffd0a..dd672da2928 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -205,5 +205,9 @@ harness = false name = "mem_wal_read" harness = false +[[bench]] +name = "index_stats" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/index_stats.rs b/rust/lance/benches/index_stats.rs new file mode 100644 index 00000000000..7008ccf57e7 --- /dev/null +++ b/rust/lance/benches/index_stats.rs @@ -0,0 +1,224 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for `Dataset::index_statistics`, comparing the current +//! implementation against the pre-P1/P2 baseline. +//! +//! Two optimizations are measured: +//! +//! * **P1** — derive the total row count from the manifest instead of calling +//! `Dataset::count_rows(None)`, which fans out per-fragment and can read +//! fragment data files on datasets written by older Lance versions. +//! * **P2** — cache `index_statistics` JSON keyed by manifest version in the +//! dataset's `DSIndexCache`, so repeat calls at the same version are served +//! from memory. +//! +//! The benchmark runs four functions on the same fixture in the same process: +//! +//! * `index_stats/legacy_cold` — pre-P1/P2 behavior. Always calls +//! `count_rows(None)`, never hits the cache. Exercised via the +//! `#[doc(hidden)]` `bench_legacy_index_statistics` entry point. +//! * `index_stats/cold` — current behavior with a cold dataset cache +//! (session reopened every iteration). Isolates the P1 win against +//! `legacy_cold`. +//! * `index_stats/cached` — current behavior after the first call. Isolates +//! the P2 win against `cold`. +//! * `index_stats/count_rows_baseline` — wall time of the `count_rows(None)` +//! fan-out alone, for context on what P1 removes from the hot path. + +use std::sync::{Arc, OnceLock}; + +use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use lance::Dataset; +use lance::dataset::WriteParams; +use lance::dataset::builder::DatasetBuilder; +use lance::index::DatasetIndexExt; +use lance::index::bench_legacy_index_statistics; +use lance_core::utils::tempfile::TempStrDir; +use lance_index::IndexType; +use lance_index::scalar::ScalarIndexParams; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +/// Fixture shape. 10 M rows split into 10 000 fragments of 1 000 rows each — +/// large enough that `count_rows`'s per-fragment fan-out dominates, at a +/// fragment count closer to what's seen in production on tables that haven't +/// been compacted recently. +const NUM_FRAGMENTS: usize = 10_000; +const ROWS_PER_FRAGMENT: usize = 1_000; +const TOTAL_ROWS: usize = NUM_FRAGMENTS * ROWS_PER_FRAGMENT; +const INDEX_NAME: &str = "id_idx"; + +struct Fixture { + // Kept alive for the lifetime of the benchmark so the on-disk data stays valid. + _tempdir: TempStrDir, + uri: String, +} + +async fn build_fixture() -> Fixture { + let tempdir = TempStrDir::default(); + let uri = tempdir.as_str().to_string(); + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + // One `Dataset::write` call produces multiple fragments when the input + // exceeds `max_rows_per_file` — vastly cheaper than committing per fragment. + // Feed the data in batches of `ROWS_PER_FRAGMENT` so memory stays bounded. + let batches: Vec = (0..NUM_FRAGMENTS) + .map(|f| { + let start = (f * ROWS_PER_FRAGMENT) as i32; + let values: Vec = (start..start + ROWS_PER_FRAGMENT as i32).collect(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap() + }) + .collect(); + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let params = WriteParams { + max_rows_per_file: ROWS_PER_FRAGMENT, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &uri, Some(params)).await.unwrap(); + assert_eq!(dataset.fragments().len(), NUM_FRAGMENTS); + assert_eq!(dataset.count_rows(None).await.unwrap(), TOTAL_ROWS); + + dataset + .create_index( + &["id"], + IndexType::BTree, + Some(INDEX_NAME.into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + Fixture { + _tempdir: tempdir, + uri, + } +} + +/// Reopen the dataset so we get a fresh session (and therefore a fresh +/// `DSIndexCache`). This lets each iteration measure the cold path. +async fn open_cold(uri: &str) -> Dataset { + DatasetBuilder::from_uri(uri).load().await.unwrap() +} + +/// Process-wide runtime + fixture. Built lazily on first access so we pay the +/// 10 M-row dataset write once, not four times. +struct BenchEnv { + rt: tokio::runtime::Runtime, + fixture: Fixture, +} + +fn env() -> &'static BenchEnv { + static ENV: OnceLock = OnceLock::new(); + ENV.get_or_init(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let fixture = rt.block_on(build_fixture()); + // Parity check against the fresh fixture: legacy and current paths must + // agree on every counter in the JSON payload before measurement begins. + rt.block_on(async { + let ds = open_cold(&fixture.uri).await; + let legacy: serde_json::Value = serde_json::from_str( + &bench_legacy_index_statistics(&ds, INDEX_NAME).await.unwrap(), + ) + .unwrap(); + let current: serde_json::Value = + serde_json::from_str(&ds.index_statistics(INDEX_NAME).await.unwrap()).unwrap(); + for key in [ + "num_indexed_rows", + "num_unindexed_rows", + "num_indexed_fragments", + "num_unindexed_fragments", + "num_indices", + ] { + assert_eq!( + legacy[key], current[key], + "legacy and current paths disagree on {key}", + ); + } + }); + BenchEnv { rt, fixture } + }) +} + +fn bench_count_rows(c: &mut Criterion) { + let env = env(); + let dataset = env.rt.block_on(open_cold(&env.fixture.uri)); + + c.bench_function("index_stats/count_rows_baseline", |b| { + b.iter(|| { + env.rt.block_on(async { + let _ = dataset.count_rows(None).await.unwrap(); + }) + }); + }); +} + +fn bench_legacy_cold(c: &mut Criterion) { + let env = env(); + + // Pre-P1/P2 behavior: no cache, `count_rows(None)` every call. + c.bench_function("index_stats/legacy_cold", |b| { + b.iter(|| { + env.rt.block_on(async { + let ds = open_cold(&env.fixture.uri).await; + let _ = bench_legacy_index_statistics(&ds, INDEX_NAME) + .await + .unwrap(); + }) + }); + }); +} + +fn bench_cold(c: &mut Criterion) { + let env = env(); + + // Current behavior, cold cache. Difference vs `legacy_cold` is the P1 win. + c.bench_function("index_stats/cold", |b| { + b.iter(|| { + env.rt.block_on(async { + let ds = open_cold(&env.fixture.uri).await; + let _ = ds.index_statistics(INDEX_NAME).await.unwrap(); + }) + }); + }); +} + +fn bench_cached(c: &mut Criterion) { + let env = env(); + let dataset = env.rt.block_on(open_cold(&env.fixture.uri)); + + // Prime the cache. + env.rt.block_on(async { + let _ = dataset.index_statistics(INDEX_NAME).await.unwrap(); + }); + + // Current behavior, warm cache. Difference vs `cold` is the P2 win. + c.bench_function("index_stats/cached", |b| { + b.iter(|| { + env.rt.block_on(async { + let _ = dataset.index_statistics(INDEX_NAME).await.unwrap(); + }) + }); + }); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(30) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_count_rows, bench_legacy_cold, bench_cold, bench_cached +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name = benches; + config = Criterion::default().significance_level(0.1).sample_size(30); + targets = bench_count_rows, bench_legacy_cold, bench_cold, bench_cached +); + +criterion_main!(benches); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 40788265172..64aab64d3e4 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -87,7 +87,7 @@ use crate::index::mem_wal::open_mem_wal_index; pub use crate::index::prefilter::{FilterLoader, PreFilter}; use crate::index::scalar::{IndexDetails, fetch_index_details, load_training_data}; pub use crate::index::vector::{LogicalIvfView, LogicalVectorIndex}; -use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey}; +use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey, IndexStatisticsKey}; use crate::{Error, Result, dataset::Dataset}; pub use create::CreateIndexBuilder; pub use lance_index::IndexDescription; @@ -1131,17 +1131,32 @@ impl DatasetIndexExt for Dataset { return Err(Error::index_not_found(format!("name={}", index_name))); } - if index_name == FRAG_REUSE_INDEX_NAME { - return index_statistics_frag_reuse(self).boxed().await; + // The result is a pure function of (dataset URI, manifest version, index name): + // the cache is already prefixed by dataset URI, and any mutation that can change + // the answer (append, delete, compaction, index create/optimize/drop) bumps + // `manifest.version`, so we get automatic invalidation. + let cache_key = IndexStatisticsKey { + version: self.manifest.version, + index_name, + }; + if let Some(cached) = self.index_cache.get_with_key(&cache_key).await { + return Ok(cached.as_ref().clone()); } - if index_name == MEM_WAL_INDEX_NAME { - return index_statistics_mem_wal(self).boxed().await; - } + let stats = if index_name == FRAG_REUSE_INDEX_NAME { + index_statistics_frag_reuse(self).boxed().await? + } else if index_name == MEM_WAL_INDEX_NAME { + index_statistics_mem_wal(self).boxed().await? + } else { + index_statistics_scalar(self, index_name, metadatas) + .boxed() + .await? + }; - index_statistics_scalar(self, index_name, metadatas) - .boxed() - .await + self.index_cache + .insert_with_key(&cache_key, Arc::new(stats.clone())) + .await; + Ok(stats) } async fn read_index_partition( @@ -1183,6 +1198,82 @@ fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec]) -> Ok(rows_per_delta) } +/// Sum row counts across every fragment using only the manifest. +/// +/// Returns `Some(total)` when every fragment has `physical_rows` and any deletion +/// count resolved in-memory. Returns `None` if any fragment is missing this +/// metadata — callers should fall back to [`Dataset::count_rows`], which can +/// read fragment data files to recover the count for legacy datasets written +/// before these fields were always populated. +fn manifest_total_rows(ds: &Dataset) -> Option { + let mut total = 0usize; + for frag in ds.fragments().iter() { + total = total.checked_add(frag.num_rows()?)?; + } + Some(total) +} + +/// Benchmark-only entry point that reproduces the pre-P1/P2 behavior of +/// [`DatasetIndexExt::index_statistics`]: skips the `DSIndexCache` lookup and +/// computes the total row count via [`Dataset::count_rows`] rather than the +/// in-memory manifest sum. Exists solely so `benches/index_stats.rs` can +/// measure the wall-time delta against the current implementation within a +/// single process. Not part of the stable API. +#[doc(hidden)] +pub async fn bench_legacy_index_statistics(ds: &Dataset, index_name: &str) -> Result { + let metadatas = ds.load_indices_by_name(index_name).await?; + if metadatas.is_empty() { + return Err(Error::index_not_found(format!("name={}", index_name))); + } + + if index_name == FRAG_REUSE_INDEX_NAME { + return index_statistics_frag_reuse(ds).boxed().await; + } + if index_name == MEM_WAL_INDEX_NAME { + return index_statistics_mem_wal(ds).boxed().await; + } + + let field_id = metadatas[0].fields[0]; + let field_path = ds.schema().field_path(field_id)?; + + let (indices_stats, index_uri, num_indices, updated_at) = + collect_regular_indices_statistics(ds, metadatas, &field_path).await?; + + let index_type_hint = indices_stats + .first() + .and_then(|stats| stats.get("index_type")) + .and_then(|v| v.as_str()); + let index_type = legacy_type_name(&index_uri, index_type_hint); + + let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?; + let num_indexed_rows_per_delta = sum_indexed_rows_per_delta(&indexed_fragments_per_delta)?; + let num_indexed_fragments = unique_indexed_fragment_count(&indexed_fragments_per_delta) + .ok_or_else(|| Error::internal("overlap in indexed fragments".to_string()))?; + let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments; + let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); + drop(indexed_fragments_per_delta); + + // Pre-P1: unconditional count_rows fan-out across every fragment. + let total_rows = ds.count_rows(None).await?; + let num_unindexed_rows = total_rows - num_indexed_rows; + + let stats = json!({ + "index_type": index_type, + "name": index_name, + "num_indices": num_indices, + "num_segments": num_indices, + "indices": indices_stats.clone(), + "segments": indices_stats, + "num_indexed_fragments": num_indexed_fragments, + "num_indexed_rows": num_indexed_rows, + "num_unindexed_fragments": num_unindexed_fragments, + "num_unindexed_rows": num_unindexed_rows, + "num_indexed_rows_per_delta": num_indexed_rows_per_delta, + "updated_at_timestamp_ms": updated_at, + }); + serialize_index_statistics(&stats) +} + fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec]) -> Option { let mut fragment_ids = HashSet::new(); for frags in indexed_fragments_per_delta { @@ -1358,8 +1449,15 @@ async fn gather_fragment_statistics( let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); drop(indexed_fragments_per_delta); - let total_rows = ds.count_rows(None).await?; - let num_unindexed_rows = total_rows - num_indexed_rows; + // Prefer the in-memory sum of per-fragment row counts. `count_rows(None)` + // falls back to opening the first data file of each fragment when the + // manifest lacks `physical_rows` / `writer_version`, which dominates + // `index_stats` latency on tables with many fragments or slow storage. + let total_rows = match manifest_total_rows(ds) { + Some(n) => n, + None => ds.count_rows(None).await?, + }; + let num_unindexed_rows = total_rows.saturating_sub(num_indexed_rows); Ok(Some(( num_indexed_rows_per_delta, @@ -2939,6 +3037,276 @@ mod tests { assert_eq!(stats["num_indexed_rows"], 512); } + /// P1: `gather_fragment_statistics` should derive the total row count from + /// the manifest when every fragment carries `physical_rows`, matching the + /// answer that `Dataset::count_rows(None)` would produce. + #[tokio::test] + async fn test_index_statistics_row_counts_match_count_rows() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + // Five separate write batches so the dataset has five fragments — the + // case where `count_rows` would fan out per fragment. + let rows_per_frag: i32 = 100; + let num_frags = 5; + let batches: Vec = (0..num_frags) + .map(|f| { + let start = f * rows_per_frag; + let values: Vec = (start..start + rows_per_frag).collect(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]) + .unwrap() + }) + .collect(); + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write( + reader, + test_dir.as_str(), + Some(WriteParams { + max_rows_per_file: rows_per_frag as usize, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.fragments().len(), num_frags as usize); + + dataset + .create_index( + &["id"], + IndexType::BTree, + Some("id_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Straddle fragment boundaries with the delete to exercise the + // deletion-count path in `Fragment::num_rows()`. + dataset + .delete("id < 25 OR (id >= 150 AND id < 175)") + .await + .unwrap(); + + // Append a new (unindexed) fragment. + let extra: Vec = (500..600).collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(extra))]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + dataset.append(reader, None).await.unwrap(); + + // Ground truth via count_rows — must equal indexed + unindexed from stats. + let total_from_count_rows = dataset.count_rows(None).await.unwrap(); + let manifest_total = super::manifest_total_rows(&dataset).unwrap(); + assert_eq!(manifest_total, total_from_count_rows); + + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + let indexed = stats["num_indexed_rows"].as_u64().unwrap() as usize; + let unindexed = stats["num_unindexed_rows"].as_u64().unwrap() as usize; + assert_eq!(indexed + unindexed, total_from_count_rows); + // 500 rows initially, 50 deleted (all from indexed fragments), 100 appended. + assert_eq!(indexed, 450); + assert_eq!(unindexed, 100); + } + + /// P1: `manifest_total_rows` returns `None` when any fragment lacks + /// `physical_rows`, so the caller falls back to `count_rows(None)`. + #[tokio::test] + async fn test_manifest_total_rows_missing_metadata_returns_none() { + use lance_table::format::Fragment as FragmentFmt; + + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((0..10).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + + // Sanity: happy path returns a value. + assert!(super::manifest_total_rows(&dataset).is_some()); + + // Corrupt one fragment's metadata in-memory so the helper must bail. + let mut manifest = (*dataset.manifest).clone(); + let mut fragments: Vec = manifest.fragments.as_ref().clone(); + fragments[0].physical_rows = None; + manifest.fragments = Arc::new(fragments); + dataset.manifest = Arc::new(manifest); + + assert!(super::manifest_total_rows(&dataset).is_none()); + } + + /// P2: repeat `index_statistics` calls at the same manifest version are + /// served from the `DSIndexCache` without reopening the index file. + #[tokio::test] + async fn test_index_statistics_cache_hit_avoids_io() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new( + "status", + DataType::Int32, + false, + )])); + let values: Vec = (0..10_000).collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(values))]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + + // BTree lacks `load_statistics` today, so the uncached path opens the + // index file — exactly the cost we want the cache to skip. + dataset + .create_index( + &["status"], + IndexType::BTree, + Some("status_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let io_tracker = dataset.object_store().io_tracker().clone(); + + // Prime the cache. + let first = dataset.index_statistics("status_idx").await.unwrap(); + + // Reset I/O stats and call again — the second call must be served + // entirely from the session index cache. + io_tracker.incremental_stats(); + let second = dataset.index_statistics("status_idx").await.unwrap(); + let stats = io_tracker.incremental_stats(); + + assert_eq!(first, second, "cache must return identical JSON"); + assert_io_eq!( + stats, + read_iops, + 0, + "cached index_statistics should perform no reads; got {} ops", + stats.read_iops + ); + assert_io_eq!( + stats, + read_bytes, + 0, + "cached index_statistics should read zero bytes; got {} bytes", + stats.read_bytes + ); + } + + /// P2: mutations (append, delete, optimize) bump the manifest version and + /// therefore implicitly invalidate the cache — stale numbers must never + /// leak through. + #[tokio::test] + async fn test_index_statistics_cache_invalidates_on_manifest_bump() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((0..200).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + dataset + .create_index( + &["id"], + IndexType::BTree, + Some("id_idx".into()), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let before: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(before["num_indexed_rows"], 200); + assert_eq!(before["num_unindexed_rows"], 0); + + // Append bumps manifest.version — the next call must see 50 unindexed rows. + let extra = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from((200..250).collect::>()))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(extra)], schema.clone()); + dataset.append(reader, None).await.unwrap(); + + let after_append: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(after_append["num_indexed_rows"], 200); + assert_eq!(after_append["num_unindexed_rows"], 50); + + // `optimize_indices(append)` rolls the new fragment into a delta; the + // old manifest version's entry must not shadow the new counts. + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + let after_optimize: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("id_idx").await.unwrap()).unwrap(); + assert_eq!(after_optimize["num_indexed_rows"], 250); + assert_eq!(after_optimize["num_unindexed_rows"], 0); + } + + /// P2: two indices on the same dataset at the same version must get + /// independent cache entries — the name-based suffix in the key prevents + /// collisions. + #[tokio::test] + async fn test_index_statistics_cache_distinguishes_index_names() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let a_values: Vec = (0..100).collect(); + let b_values: Vec = (1000..1100).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(a_values)), + Arc::new(Int32Array::from(b_values)), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), None) + .await + .unwrap(); + for col in ["a", "b"] { + dataset + .create_index( + &[col], + IndexType::BTree, + Some(format!("{col}_idx")), + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + } + + let a1 = dataset.index_statistics("a_idx").await.unwrap(); + let b1 = dataset.index_statistics("b_idx").await.unwrap(); + assert_ne!(a1, b1, "different indices must yield different stats"); + + // Cached reads preserve the per-name distinction. + let a2 = dataset.index_statistics("a_idx").await.unwrap(); + let b2 = dataset.index_statistics("b_idx").await.unwrap(); + assert_eq!(a1, a2); + assert_eq!(b1, b2); + } + #[tokio::test] async fn test_optimize_delta_indices() { let dimensions = 16; diff --git a/rust/lance/src/session/index_caches.rs b/rust/lance/src/session/index_caches.rs index 3ae777880aa..694c4af81b2 100644 --- a/rust/lance/src/session/index_caches.rs +++ b/rust/lance/src/session/index_caches.rs @@ -145,3 +145,29 @@ impl CacheKey for ScalarIndexDetailsKey<'_> { "ScalarIndexDetails" } } + +/// Cache key for the serialized `index_statistics` JSON for a single named index +/// at a specific manifest version. +/// +/// `index_statistics` is a pure function of `(dataset URI, manifest version, index name)`: +/// the `DSIndexCache` already prefixes entries by dataset URI, and any mutation that can +/// change the answer (append, delete, compaction, index create/optimize/drop) bumps the +/// manifest version. Keying on the version therefore gives automatic invalidation — we +/// never need to touch this cache from a write path. +#[derive(Debug)] +pub struct IndexStatisticsKey<'a> { + pub version: u64, + pub index_name: &'a str, +} + +impl CacheKey for IndexStatisticsKey<'_> { + type ValueType = String; + + fn key(&self) -> Cow<'_, str> { + Cow::Owned(format!("stats/{}/{}", self.version, self.index_name)) + } + + fn type_name() -> &'static str { + "IndexStatisticsJson" + } +}