From de717e8ac91e987d2b62abd8b6bbeb37bbc86e61 Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Fri, 19 Dec 2025 15:37:39 -0500 Subject: [PATCH 1/7] add list_index_size_info endpoint and migration 25 --- .../postgresql/25_add-split-size.down.sql | 3 + .../postgresql/25_add-split-size.up.sql | 3 + .../src/metastore/control_plane_metastore.rs | 22 +- .../file_backed/file_backed_index/mod.rs | 24 +- .../src/metastore/file_backed/mod.rs | 54 +++- .../src/metastore/postgres/metastore.rs | 34 ++- .../quickwit-metastore/src/tests/index.rs | 251 ++++++++++++++++- quickwit/quickwit-metastore/src/tests/mod.rs | 15 + .../protos/quickwit/metastore.proto | 16 ++ .../codegen/quickwit/quickwit.metastore.rs | 263 ++++++++++++++++++ 10 files changed, 660 insertions(+), 25 deletions(-) create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql diff --git a/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql new file mode 100644 index 00000000000..3e1c1746b04 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_splits_stats; + +ALTER TABLE splits DROP COLUMN IF EXISTS split_size_bytes; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql new file mode 100644 index 00000000000..407809cc6b2 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE splits ADD COLUMN IF NOT EXISTS split_size_bytes BIGINT NOT NULL GENERATED ALWAYS AS ((split_metadata_json::json->'footer_offsets'->>'end')::bigint) STORED; + +CREATE INDEX IF NOT EXISTS idx_splits_stats ON splits (index_uid, split_state) INCLUDE (split_size_bytes); diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 9d3fc7bd95f..337e3fb3138 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -26,13 +26,14 @@ use quickwit_proto::metastore::{ GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, - ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, - ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, - ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, - OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + ListDeleteTasksResponse, ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, + ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, + ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, + ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, + MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, + OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -162,6 +163,13 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.list_splits(request).await } + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> MetastoreResult { + self.metastore.list_index_size_info(request).await + } + async fn list_stale_splits( &self, request: ListStaleSplitsRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 15aeeb21a5c..b75aa2bb36d 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,8 +30,9 @@ use quickwit_config::{ }; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, - DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, - MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, + DeleteShardsResponse, DeleteTask, EntityKind, IndexSizeInfo, ListShardsSubrequest, + ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, + OpenShardSubresponse, PruneShardsRequest, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -497,6 +498,25 @@ impl FileBackedIndex { Ok(()) } + /// Gets IndexSizeInfo { index_id, num_splits, total_size } for this index + /// Only counts splits that are in published state + pub(crate) fn get_size(&self) -> MetastoreResult { + let splits: Vec<&Split> = self + .splits + .values() + .filter(|split| split.split_state == SplitState::Published) + .collect(); + let total_size = splits + .iter() + .map(|split| split.split_metadata.footer_offsets.end as i64) + .sum(); + Ok(IndexSizeInfo { + index_id: self.index_id().to_string(), + num_splits: splits.len() as i64, + total_size, + }) + } + /// Adds a source. pub(crate) fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> { let index_uid = self.index_uid().clone(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 6c2ecbc196f..6aab2119a21 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -49,14 +49,14 @@ use quickwit_proto::metastore::{ IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, - ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, - MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, - OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, - serde_utils, + ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, + ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, + MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, + OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils, }; use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; @@ -809,6 +809,44 @@ impl MetastoreService for FileBackedMetastore { Ok(ServiceStream::new(splits_responses_stream)) } + async fn list_index_size_info( + &self, + _request: ListIndexSizeInfoRequest, + ) -> MetastoreResult { + // searching across all indexes + let index_id_incarnation_id_opts: Vec<(IndexId, Option)> = { + let inner_rlock_guard = self.state.read().await; + inner_rlock_guard + .indexes + .iter() + .filter_map(|(index_id, index_state)| match index_state { + LazyIndexStatus::Active(_) => Some(index_id), + _ => None, + }) + .map(|index_id| (index_id.clone(), None)) + .collect() + }; + + let mut index_sizes = Vec::new(); + for (index_id, incarnation_id_opt) in index_id_incarnation_id_opts { + match self + .read_any(&index_id, incarnation_id_opt, |index| index.get_size()) + .await + { + Ok(index_size) => { + index_sizes.push(index_size); + } + Err(MetastoreError::NotFound(_)) => { + // If the index does not exist, we just skip it. + continue; + } + Err(error) => return Err(error), + } + } + + Ok(ListIndexSizeInfoResponse { index_sizes }) + } + async fn list_stale_splits( &self, request: ListStaleSplitsRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index e63a9400688..26b6560b072 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -34,8 +34,9 @@ use quickwit_proto::metastore::{ FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, - IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, + IndexSizeInfo, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, + LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, + ListDeleteTasksResponse, ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, @@ -904,6 +905,35 @@ impl MetastoreService for PostgresqlMetastore { Ok(service_stream) } + async fn list_index_size_info( + &self, + _request: ListIndexSizeInfoRequest, + ) -> MetastoreResult { + let sql = "SELECT + i.index_uid, + COUNT(split_id) AS num_splits, + COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size + FROM indexes i + LEFT JOIN splits s ON s.index_uid = i.index_uid AND s.split_state = 'Published' + GROUP BY i.index_uid"; + + let rows: Vec<(String, i64, i64)> = + sqlx::query_as(sql).fetch_all(&self.connection_pool).await?; + + let mut index_sizes = Vec::new(); + for (index_uid, num_splits, total_size) in rows { + let delimiter = index_uid.find(':').unwrap_or(0); + let index_id = index_uid[..delimiter].to_string(); + index_sizes.push(IndexSizeInfo { + index_id, + num_splits, + total_size, + }); + } + + Ok(ListIndexSizeInfoResponse { index_sizes }) + } + #[instrument(skip(self))] async fn mark_splits_for_deletion( &self, diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 50e0f695a3c..a5e3b55e7d7 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -21,6 +21,7 @@ // - delete_index use std::num::NonZeroUsize; +use std::time::Duration; use quickwit_common::rand::append_random_suffix; use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; @@ -31,18 +32,19 @@ use quickwit_config::{ use quickwit_doc_mapper::{Cardinality, FieldMappingEntry, FieldMappingType, QuickwitJsonOptions}; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, - IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, - IndexesMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, - StageSplitsRequest, UpdateIndexRequest, + IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, IndexSizeInfo, + IndexesMetadataRequest, ListIndexSizeInfoRequest, ListIndexesMetadataRequest, MetastoreError, + MetastoreService, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid}; +use time::OffsetDateTime; use super::DefaultForTest; -use crate::tests::cleanup_index; +use crate::tests::{cleanup_index, to_btree_set}; use crate::{ CreateIndexRequestExt, IndexMetadataResponseExt, IndexesMetadataResponseExt, - ListIndexesMetadataResponseExt, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, - UpdateIndexRequestExt, + ListIndexesMetadataResponseExt, MetastoreServiceExt, SplitMaturity, SplitMetadata, + StageSplitsRequestExt, UpdateIndexRequestExt, }; pub async fn test_metastore_create_index< @@ -826,3 +828,240 @@ pub async fn test_metastore_delete_index< cleanup_index(&mut metastore, index_uid).await; } + +pub async fn test_metastore_list_index_size_info< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let metastore = MetastoreToTest::default_for_test().await; + + let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + + let index_id_1 = append_random_suffix("test-list-index-sizes"); + let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-index-sizes"); + let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let split_id_1 = format!("{index_id_1}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + time_range: Some(0..=99), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(0), + }, + tags: to_btree_set(&["tag!", "tag:foo", "$tag!", "$tag:bar"]), + delete_opstamp: 3, + footer_offsets: 0..2048, + uncompressed_docs_size_in_bytes: 2048, + num_docs: 100, + ..Default::default() + }; + + let split_id_2 = format!("{index_id_1}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_1.clone(), + time_range: Some(100..=199), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(10), + }, + tags: to_btree_set(&["tag!", "$tag!", "$tag:bar"]), + delete_opstamp: 1, + footer_offsets: 0..2048, + uncompressed_docs_size_in_bytes: 2048, + num_docs: 100, + ..Default::default() + }; + + let split_id_3 = format!("{index_id_1}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_2.clone(), + time_range: Some(200..=299), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(20), + }, + tags: to_btree_set(&["tag!", "tag:foo", "tag:baz", "$tag!"]), + delete_opstamp: 5, + footer_offsets: 0..1000, + uncompressed_docs_size_in_bytes: 1000, + num_docs: 100, + ..Default::default() + }; + + { + // add split-1 and split-2 to index-1 + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + vec![split_metadata_1.clone(), split_metadata_2.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + } + { + // add split-3 to index-2 + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + vec![split_metadata_3.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_id_3.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + } + + let response = metastore + .list_index_size_info(ListIndexSizeInfoRequest {}) + .await + .unwrap(); + + // we use the same postgres db for all tests. we need to filter out the indexes that don't + // belong to this test + let indexes = response + .index_sizes + .iter() + .filter(|index_size| index_size.index_id == index_id_1 || index_size.index_id == index_id_2) + .collect::>(); + + assert_eq!(indexes.len(), 2); + + let index_1 = indexes + .iter() + .find(|index| index.index_id == index_id_1) + .expect("Should find index 1"); + assert_eq!(index_1.num_splits, 2); + assert_eq!(index_1.total_size, 4096); + + let index_2 = indexes + .iter() + .find(|index| index.index_id == index_id_2) + .expect("Should find index 2"); + assert_eq!(index_2.num_splits, 1); + assert_eq!(index_2.total_size, 1000); +} + +pub async fn test_metastore_list_index_size_info_no_publish< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let metastore = MetastoreToTest::default_for_test().await; + + let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + + let index_id = append_random_suffix("test-list-index-sizes"); + let index_uid = IndexUid::new_with_random_ulid(&index_id); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let split_id_1 = format!("{index_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid.clone(), + time_range: Some(0..=99), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(0), + }, + tags: to_btree_set(&["tag!", "tag:foo", "$tag!", "$tag:bar"]), + delete_opstamp: 3, + footer_offsets: 0..2048, + uncompressed_docs_size_in_bytes: 2048, + num_docs: 100, + ..Default::default() + }; + + let split_id_2 = format!("{index_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid.clone(), + time_range: Some(100..=199), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(10), + }, + tags: to_btree_set(&["tag!", "$tag!", "$tag:bar"]), + delete_opstamp: 1, + footer_offsets: 0..2048, + uncompressed_docs_size_in_bytes: 2048, + num_docs: 100, + ..Default::default() + }; + + { + // add split-1 and split-2 to index-1 + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + vec![split_metadata_1.clone(), split_metadata_2.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + } + + let response = metastore + .list_index_size_info(ListIndexSizeInfoRequest {}) + .await + .unwrap(); + // we use the same postgres db for all tests. we need to filter out the indexes that don't + // belong to this test + let indexes = response + .index_sizes + .iter() + .filter(|index_size| index_size.index_id == index_id) + .collect::>(); + + assert_eq!(indexes.len(), 1); + assert_eq!(indexes[0].num_splits, 0); + assert_eq!(indexes[0].total_size, 0); +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 37d14a34c69..36926ae2f04 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -178,6 +178,7 @@ macro_rules! metastore_test_suite { // - indexes_metadata // - list_indexes // - delete_index + // - list_index_size_info #[tokio::test] #[serial_test::file_serial] @@ -280,6 +281,20 @@ macro_rules! metastore_test_suite { $crate::tests::index::test_metastore_delete_index::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_index_size_info() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_list_index_size_info::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_index_size_info_no_publish() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_list_index_size_info_no_publish::<$metastore_type>().await; + } + // Split API tests // // - stage_splits diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 4e6a0e717ef..86891813c12 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -107,6 +107,9 @@ service MetastoreService { // Deletes an index rpc DeleteIndex(DeleteIndexRequest) returns (EmptyResponse); + // Returns a list of size info for each index. + rpc ListIndexSizeInfo(ListIndexSizeInfoRequest) returns (ListIndexSizeInfoResponse); + // Streams splits from index. rpc ListSplits(ListSplitsRequest) returns (stream ListSplitsResponse); @@ -286,6 +289,19 @@ enum IndexMetadataFailureReason { INDEX_METADATA_FAILURE_REASON_INTERNAL = 2; } +message ListIndexSizeInfoRequest {} + +message ListIndexSizeInfoResponse { + // list of IndexSizeInfo. each one has the index id, the number of splits and the total size. + repeated IndexSizeInfo index_sizes = 1; +} + +message IndexSizeInfo { + string index_id = 1; + int64 num_splits = 2; + int64 total_size = 3; +} + message ListSplitsRequest { // Predicate used to filter splits. // The predicate is expressed as a JSON serialized diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 1ba6096d031..ffbb708d554 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -117,6 +117,26 @@ pub struct IndexMetadataFailure { pub reason: i32, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListIndexSizeInfoRequest {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListIndexSizeInfoResponse { + /// list of IndexSizeInfo. each one has the index id, the number of splits and the total size. + #[prost(message, repeated, tag = "1")] + pub index_sizes: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct IndexSizeInfo { + #[prost(string, tag = "1")] + pub index_id: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub num_splits: i64, + #[prost(int64, tag = "3")] + pub total_size: i64, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ListSplitsRequest { /// Predicate used to filter splits. @@ -627,6 +647,11 @@ impl RpcName for DeleteIndexRequest { "delete_index" } } +impl RpcName for ListIndexSizeInfoRequest { + fn rpc_name() -> &'static str { + "list_index_size_info" + } +} impl RpcName for ListSplitsRequest { fn rpc_name() -> &'static str { "list_splits" @@ -796,6 +821,11 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { &self, request: DeleteIndexRequest, ) -> crate::metastore::MetastoreResult; + ///Returns a list of size info for each index. + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult; ///Streams splits from index. async fn list_splits( &self, @@ -1085,6 +1115,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.0.delete_index(request).await } + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.list_index_size_info(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -1293,6 +1329,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_index(request).await } + async fn list_index_size_info( + &self, + request: super::ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.list_index_size_info(request).await + } async fn list_splits( &self, request: super::ListSplitsRequest, @@ -1560,6 +1602,22 @@ impl tower::Service for InnerMetastoreServiceClient { Box::pin(fut) } } +impl tower::Service for InnerMetastoreServiceClient { + type Response = ListIndexSizeInfoResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: ListIndexSizeInfoRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.list_index_size_info(request).await }; + Box::pin(fut) + } +} impl tower::Service for InnerMetastoreServiceClient { type Response = MetastoreServiceStream; type Error = crate::metastore::MetastoreError; @@ -2011,6 +2069,11 @@ struct MetastoreServiceTowerServiceStack { EmptyResponse, crate::metastore::MetastoreError, >, + list_index_size_info_svc: quickwit_common::tower::BoxService< + ListIndexSizeInfoRequest, + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, + >, list_splits_svc: quickwit_common::tower::BoxService< ListSplitsRequest, MetastoreServiceStream, @@ -2180,6 +2243,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.delete_index_svc.clone().ready().await?.call(request).await } + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult { + self.list_index_size_info_svc.clone().ready().await?.call(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -2403,6 +2472,16 @@ type DeleteIndexLayer = quickwit_common::tower::BoxLayer< EmptyResponse, crate::metastore::MetastoreError, >; +type ListIndexSizeInfoLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListIndexSizeInfoRequest, + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, + >, + ListIndexSizeInfoRequest, + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, +>; type ListSplitsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< ListSplitsRequest, @@ -2671,6 +2750,7 @@ pub struct MetastoreServiceTowerLayerStack { indexes_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, + list_index_size_info_layers: Vec, list_splits_layers: Vec, stage_splits_layers: Vec, publish_splits_layers: Vec, @@ -2853,6 +2933,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexSizeInfoRequest, + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + ListIndexSizeInfoRequest, + Response = ListIndexSizeInfoResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< ListSplitsRequest, @@ -3532,6 +3637,8 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_index_size_info_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_splits_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.stage_splits_layers @@ -3703,6 +3810,26 @@ impl MetastoreServiceTowerLayerStack { self.delete_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_list_index_size_info_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexSizeInfoRequest, + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + ListIndexSizeInfoRequest, + Response = ListIndexSizeInfoResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.list_index_size_info_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_list_splits_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -4332,6 +4459,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let list_index_size_info_svc = self + .list_index_size_info_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let list_splits_svc = self .list_splits_layers .into_iter() @@ -4548,6 +4683,7 @@ impl MetastoreServiceTowerLayerStack { indexes_metadata_svc, list_indexes_metadata_svc, delete_index_svc, + list_index_size_info_svc, list_splits_svc, stage_splits_svc, publish_splits_svc, @@ -4689,6 +4825,15 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + ListIndexSizeInfoRequest, + Response = ListIndexSizeInfoResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture< + ListIndexSizeInfoResponse, + crate::metastore::MetastoreError, + >, + > + tower::Service< ListSplitsRequest, Response = MetastoreServiceStream, @@ -4903,6 +5048,12 @@ where ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult { + self.clone().call(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -5190,6 +5341,20 @@ where DeleteIndexRequest::rpc_name(), )) } + async fn list_index_size_info( + &self, + request: ListIndexSizeInfoRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .clone() + .list_index_size_info(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListIndexSizeInfoRequest::rpc_name(), + )) + } async fn list_splits( &self, request: ListSplitsRequest, @@ -5661,6 +5826,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn list_index_size_info( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .list_index_size_info(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } type ListSplitsStream = quickwit_common::ServiceStream< tonic::Result, >; @@ -6269,6 +6445,36 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Returns a list of size info for each index. + pub async fn list_index_size_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/ListIndexSizeInfo", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "ListIndexSizeInfo", + ), + ); + self.inner.unary(req, path, codec).await + } /// Streams splits from index. pub async fn list_splits( &mut self, @@ -7068,6 +7274,14 @@ pub mod metastore_service_grpc_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Returns a list of size info for each index. + async fn list_index_size_info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ListSplits method. type ListSplitsStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, @@ -7667,6 +7881,55 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/ListIndexSizeInfo" => { + #[allow(non_camel_case_types)] + struct ListIndexSizeInfoSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for ListIndexSizeInfoSvc { + type Response = super::ListIndexSizeInfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_index_size_info( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListIndexSizeInfoSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/ListSplits" => { #[allow(non_camel_case_types)] struct ListSplitsSvc(pub Arc); From dbb4bdcf6df9d117d00b4f278ddd904bc7b37671 Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Mon, 22 Dec 2025 16:47:25 -0500 Subject: [PATCH 2/7] add filtering, change endpoint name, nits --- .../src/metastore/control_plane_metastore.rs | 10 +- .../file_backed/file_backed_index/mod.rs | 28 +-- .../src/metastore/file_backed/mod.rs | 25 ++- .../src/metastore/postgres/metastore.rs | 61 +++--- .../quickwit-metastore/src/tests/index.rs | 191 +++++++++--------- quickwit/quickwit-metastore/src/tests/mod.rs | 8 +- .../protos/quickwit/metastore.proto | 21 +- .../codegen/quickwit/quickwit.metastore.rs | 190 ++++++++--------- 8 files changed, 279 insertions(+), 255 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 337e3fb3138..bcb07d79020 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -26,7 +26,7 @@ use quickwit_proto::metastore::{ GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, - ListDeleteTasksResponse, ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, + ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, @@ -163,11 +163,11 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.list_splits(request).await } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> MetastoreResult { - self.metastore.list_index_size_info(request).await + request: ListIndexStatsRequest, + ) -> MetastoreResult { + self.metastore.list_index_stats(request).await } async fn list_stale_splits( diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index b75aa2bb36d..ef03c8e7376 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,7 @@ use quickwit_config::{ }; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, - DeleteShardsResponse, DeleteTask, EntityKind, IndexSizeInfo, ListShardsSubrequest, + DeleteShardsResponse, DeleteTask, EntityKind, IndexStats, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, }; @@ -498,22 +498,24 @@ impl FileBackedIndex { Ok(()) } - /// Gets IndexSizeInfo { index_id, num_splits, total_size } for this index + /// Gets IndexStats { index_uid, num_splits, total_size_bytes } for this index /// Only counts splits that are in published state - pub(crate) fn get_size(&self) -> MetastoreResult { - let splits: Vec<&Split> = self + pub(crate) fn get_stats(&self) -> MetastoreResult { + let (num_splits, total_size_bytes) = self .splits .values() .filter(|split| split.split_state == SplitState::Published) - .collect(); - let total_size = splits - .iter() - .map(|split| split.split_metadata.footer_offsets.end as i64) - .sum(); - Ok(IndexSizeInfo { - index_id: self.index_id().to_string(), - num_splits: splits.len() as i64, - total_size, + .fold((0, 0), |(count, size), split| { + ( + count + 1, + size + split.split_metadata.footer_offsets.end as i64, + ) + }); + + Ok(IndexStats { + index_uid: Some(self.index_uid().clone()), + num_splits, + total_size_bytes, }) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 6aab2119a21..f6328445368 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -49,7 +49,7 @@ use quickwit_proto::metastore::{ IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, ListIndexTemplatesRequest, + ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, @@ -809,32 +809,35 @@ impl MetastoreService for FileBackedMetastore { Ok(ServiceStream::new(splits_responses_stream)) } - async fn list_index_size_info( + async fn list_index_stats( &self, - _request: ListIndexSizeInfoRequest, - ) -> MetastoreResult { - // searching across all indexes + request: ListIndexStatsRequest, + ) -> MetastoreResult { + let index_id_matcher = + IndexIdMatcher::try_from_index_id_patterns(&request.index_id_patterns)?; let index_id_incarnation_id_opts: Vec<(IndexId, Option)> = { let inner_rlock_guard = self.state.read().await; inner_rlock_guard .indexes .iter() .filter_map(|(index_id, index_state)| match index_state { - LazyIndexStatus::Active(_) => Some(index_id), + LazyIndexStatus::Active(_) if index_id_matcher.is_match(index_id) => { + Some(index_id) + } _ => None, }) .map(|index_id| (index_id.clone(), None)) .collect() }; - let mut index_sizes = Vec::new(); + let mut index_stats = Vec::new(); for (index_id, incarnation_id_opt) in index_id_incarnation_id_opts { match self - .read_any(&index_id, incarnation_id_opt, |index| index.get_size()) + .read_any(&index_id, incarnation_id_opt, |index| index.get_stats()) .await { - Ok(index_size) => { - index_sizes.push(index_size); + Ok(stats) => { + index_stats.push(stats); } Err(MetastoreError::NotFound(_)) => { // If the index does not exist, we just skip it. @@ -844,7 +847,7 @@ impl MetastoreService for FileBackedMetastore { } } - Ok(ListIndexSizeInfoResponse { index_sizes }) + Ok(ListIndexStatsResponse { index_stats }) } async fn list_stale_splits( diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 26b6560b072..c3fd96ee9bb 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt::{self, Write}; +use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; @@ -34,9 +35,9 @@ use quickwit_proto::metastore::{ FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, - IndexSizeInfo, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, + IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, - ListDeleteTasksResponse, ListIndexSizeInfoRequest, ListIndexSizeInfoResponse, + ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, @@ -905,33 +906,45 @@ impl MetastoreService for PostgresqlMetastore { Ok(service_stream) } - async fn list_index_size_info( + async fn list_index_stats( &self, - _request: ListIndexSizeInfoRequest, - ) -> MetastoreResult { - let sql = "SELECT - i.index_uid, - COUNT(split_id) AS num_splits, - COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size - FROM indexes i - LEFT JOIN splits s ON s.index_uid = i.index_uid AND s.split_state = 'Published' - GROUP BY i.index_uid"; - - let rows: Vec<(String, i64, i64)> = - sqlx::query_as(sql).fetch_all(&self.connection_pool).await?; - - let mut index_sizes = Vec::new(); - for (index_uid, num_splits, total_size) in rows { - let delimiter = index_uid.find(':').unwrap_or(0); - let index_id = index_uid[..delimiter].to_string(); - index_sizes.push(IndexSizeInfo { - index_id, + request: ListIndexStatsRequest, + ) -> MetastoreResult { + let index_pattern_sql = build_index_id_patterns_sql_query(&request.index_id_patterns) + .map_err(|error| MetastoreError::Internal { + message: "failed to build `list_index_stats` SQL query".to_string(), + cause: error.to_string(), + })?; + let sql = format!( + "SELECT + i.index_uid, + COUNT(split_id) AS num_splits, + COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes + FROM ({index_pattern_sql}) i + LEFT JOIN splits s ON s.index_uid = i.index_uid AND s.split_state = 'Published' + GROUP BY i.index_uid" + ); + + let rows: Vec<(String, i64, i64)> = sqlx::query_as(&sql) + .fetch_all(&self.connection_pool) + .await?; + + let mut index_stats = Vec::new(); + for (index_uid_str, num_splits, total_size_bytes) in rows { + let Ok(index_uid) = IndexUid::from_str(&index_uid_str) else { + return Err(MetastoreError::Internal { + message: "failed to parse index_uid".to_string(), + cause: index_uid_str.to_string(), + }); + }; + index_stats.push(IndexStats { + index_uid: Some(index_uid), num_splits, - total_size, + total_size_bytes, }); } - Ok(ListIndexSizeInfoResponse { index_sizes }) + Ok(ListIndexStatsResponse { index_stats }) } #[instrument(skip(self))] diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index a5e3b55e7d7..8361cfb85f8 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -32,8 +32,8 @@ use quickwit_config::{ use quickwit_doc_mapper::{Cardinality, FieldMappingEntry, FieldMappingType, QuickwitJsonOptions}; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, - IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, IndexSizeInfo, - IndexesMetadataRequest, ListIndexSizeInfoRequest, ListIndexesMetadataRequest, MetastoreError, + IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, IndexStats, + IndexesMetadataRequest, ListIndexStatsRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid}; @@ -829,19 +829,19 @@ pub async fn test_metastore_delete_index< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_list_index_size_info< +pub async fn test_metastore_list_index_stats< MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); - let index_id_1 = append_random_suffix("test-list-index-sizes"); + let index_id_1 = append_random_suffix("test-list-index-stats"); let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); let index_uri_1 = format!("ram:///indexes/{index_id_1}"); let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); - let index_id_2 = append_random_suffix("test-list-index-sizes"); + let index_id_2 = append_random_suffix("test-list-index-stats"); let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); let index_uri_2 = format!("ram:///indexes/{index_id_2}"); let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); @@ -897,101 +897,101 @@ pub async fn test_metastore_list_index_size_info< ..Default::default() }; - { - // add split-1 and split-2 to index-1 - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); + // add split-1 and split-2 to index-1 + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); - let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - vec![split_metadata_1.clone(), split_metadata_2.clone()], - ) + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![split_metadata_1.clone(), split_metadata_2.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await .unwrap(); - metastore.stage_splits(stage_splits_request).await.unwrap(); - let publish_splits_request = PublishSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], - ..Default::default() - }; - metastore - .publish_splits(publish_splits_request) - .await - .unwrap(); - } - { - // add split-3 to index-2 - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); + // add split-3 to index-2 + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); - let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - vec![split_metadata_3.clone()], - ) - .unwrap(); - metastore.stage_splits(stage_splits_request).await.unwrap(); + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![split_metadata_3.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); - let publish_splits_request = PublishSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: vec![split_id_3.clone()], - ..Default::default() - }; - metastore - .publish_splits(publish_splits_request) - .await - .unwrap(); - } + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_3.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); let response = metastore - .list_index_size_info(ListIndexSizeInfoRequest {}) + .list_index_stats(ListIndexStatsRequest { + index_id_patterns: vec!["test-list-index-stats*".to_string()], + }) .await .unwrap(); - // we use the same postgres db for all tests. we need to filter out the indexes that don't - // belong to this test + // we use the same postgres db for multiple executions of this test. we need to filter out the + // indexes that don't belong to the current execution let indexes = response - .index_sizes + .index_stats .iter() - .filter(|index_size| index_size.index_id == index_id_1 || index_size.index_id == index_id_2) - .collect::>(); + .filter(|stats| { + stats.index_uid == Some(index_uid_1.clone()) + || stats.index_uid == Some(index_uid_2.clone()) + }) + .collect::>(); assert_eq!(indexes.len(), 2); let index_1 = indexes .iter() - .find(|index| index.index_id == index_id_1) + .find(|index| index.index_uid == Some(index_uid_1.clone())) .expect("Should find index 1"); assert_eq!(index_1.num_splits, 2); - assert_eq!(index_1.total_size, 4096); + assert_eq!(index_1.total_size_bytes, 4096); let index_2 = indexes .iter() - .find(|index| index.index_id == index_id_2) + .find(|index| index.index_uid == Some(index_uid_2.clone())) .expect("Should find index 2"); assert_eq!(index_2.num_splits, 1); - assert_eq!(index_2.total_size, 1000); + assert_eq!(index_2.total_size_bytes, 1000); } -pub async fn test_metastore_list_index_size_info_no_publish< +pub async fn test_metastore_list_index_stats_no_publish< MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); - let index_id = append_random_suffix("test-list-index-sizes"); + let index_id = append_random_suffix("test-list-index-stats"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(&index_id, &index_uri); @@ -1030,38 +1030,37 @@ pub async fn test_metastore_list_index_size_info_no_publish< ..Default::default() }; - { - // add split-1 and split-2 to index-1 - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); + // add split-1 and split-2 to index-1 + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); - let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - vec![split_metadata_1.clone(), split_metadata_2.clone()], - ) - .unwrap(); - metastore.stage_splits(stage_splits_request).await.unwrap(); - } + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + vec![split_metadata_1.clone(), split_metadata_2.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); let response = metastore - .list_index_size_info(ListIndexSizeInfoRequest {}) + .list_index_stats(ListIndexStatsRequest { + index_id_patterns: vec!["test-list-index-stats*".to_string()], + }) .await .unwrap(); - // we use the same postgres db for all tests. we need to filter out the indexes that don't - // belong to this test - let indexes = response - .index_sizes - .iter() - .filter(|index_size| index_size.index_id == index_id) - .collect::>(); - assert_eq!(indexes.len(), 1); - assert_eq!(indexes[0].num_splits, 0); - assert_eq!(indexes[0].total_size, 0); + // we use the same postgres db for multiple executions of this test. we need to filter out the + // indexes that don't belong to the current execution + let index_stats = response + .index_stats + .iter() + .filter(|stats| stats.index_uid == Some(index_uid.clone())) + .collect::>(); + assert_eq!(index_stats.len(), 1); + assert_eq!(index_stats[0].num_splits, 0); + assert_eq!(index_stats[0].total_size_bytes, 0); } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 36926ae2f04..62d87d35a34 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -283,16 +283,16 @@ macro_rules! metastore_test_suite { #[tokio::test] #[serial_test::file_serial] - async fn test_metastore_list_index_size_info() { + async fn test_metastore_list_index_stats() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_list_index_size_info::<$metastore_type>().await; + $crate::tests::index::test_metastore_list_index_stats::<$metastore_type>().await; } #[tokio::test] #[serial_test::file_serial] - async fn test_metastore_list_index_size_info_no_publish() { + async fn test_metastore_list_index_stats_no_publish() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_list_index_size_info_no_publish::<$metastore_type>().await; + $crate::tests::index::test_metastore_list_index_stats_no_publish::<$metastore_type>().await; } // Split API tests diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 86891813c12..9d9fbcc99da 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -108,7 +108,7 @@ service MetastoreService { rpc DeleteIndex(DeleteIndexRequest) returns (EmptyResponse); // Returns a list of size info for each index. - rpc ListIndexSizeInfo(ListIndexSizeInfoRequest) returns (ListIndexSizeInfoResponse); + rpc ListIndexStats(ListIndexStatsRequest) returns (ListIndexStatsResponse); // Streams splits from index. rpc ListSplits(ListSplitsRequest) returns (stream ListSplitsResponse); @@ -289,17 +289,22 @@ enum IndexMetadataFailureReason { INDEX_METADATA_FAILURE_REASON_INTERNAL = 2; } -message ListIndexSizeInfoRequest {} +message ListIndexStatsRequest { + // List of patterns an index should match or not match to get considered + // An index must match at least one positive pattern (a pattern not starting + // with a '-'), and no negative pattern (a pattern starting with a '-'). + repeated string index_id_patterns = 2; +} -message ListIndexSizeInfoResponse { - // list of IndexSizeInfo. each one has the index id, the number of splits and the total size. - repeated IndexSizeInfo index_sizes = 1; +message ListIndexStatsResponse { + // list of IndexStats. each one has the index id, the number of splits and the total size. + repeated IndexStats index_stats = 1; } -message IndexSizeInfo { - string index_id = 1; +message IndexStats { + quickwit.common.IndexUid index_uid = 1; int64 num_splits = 2; - int64 total_size = 3; + int64 total_size_bytes = 3; } message ListSplitsRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index ffbb708d554..3fe8609de1f 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -117,24 +117,30 @@ pub struct IndexMetadataFailure { pub reason: i32, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] -pub struct ListIndexSizeInfoRequest {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListIndexStatsRequest { + /// List of patterns an index should match or not match to get considered + /// An index must match at least one positive pattern (a pattern not starting + /// with a '-'), and no negative pattern (a pattern starting with a '-'). + #[prost(string, repeated, tag = "2")] + pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListIndexSizeInfoResponse { - /// list of IndexSizeInfo. each one has the index id, the number of splits and the total size. +pub struct ListIndexStatsResponse { + /// list of IndexStats. each one has the index id, the number of splits and the total size. #[prost(message, repeated, tag = "1")] - pub index_sizes: ::prost::alloc::vec::Vec, + pub index_stats: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] -pub struct IndexSizeInfo { - #[prost(string, tag = "1")] - pub index_id: ::prost::alloc::string::String, +pub struct IndexStats { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, #[prost(int64, tag = "2")] pub num_splits: i64, #[prost(int64, tag = "3")] - pub total_size: i64, + pub total_size_bytes: i64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -647,9 +653,9 @@ impl RpcName for DeleteIndexRequest { "delete_index" } } -impl RpcName for ListIndexSizeInfoRequest { +impl RpcName for ListIndexStatsRequest { fn rpc_name() -> &'static str { - "list_index_size_info" + "list_index_stats" } } impl RpcName for ListSplitsRequest { @@ -822,10 +828,10 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { request: DeleteIndexRequest, ) -> crate::metastore::MetastoreResult; ///Returns a list of size info for each index. - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult; + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult; ///Streams splits from index. async fn list_splits( &self, @@ -1115,11 +1121,11 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.0.delete_index(request).await } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult { - self.inner.0.list_index_size_info(request).await + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.list_index_stats(request).await } async fn list_splits( &self, @@ -1329,11 +1335,11 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_index(request).await } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: super::ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult { - self.inner.lock().await.list_index_size_info(request).await + request: super::ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.list_index_stats(request).await } async fn list_splits( &self, @@ -1602,8 +1608,8 @@ impl tower::Service for InnerMetastoreServiceClient { Box::pin(fut) } } -impl tower::Service for InnerMetastoreServiceClient { - type Response = ListIndexSizeInfoResponse; +impl tower::Service for InnerMetastoreServiceClient { + type Response = ListIndexStatsResponse; type Error = crate::metastore::MetastoreError; type Future = BoxFuture; fn poll_ready( @@ -1612,9 +1618,9 @@ impl tower::Service for InnerMetastoreServiceClient { ) -> std::task::Poll> { std::task::Poll::Ready(Ok(())) } - fn call(&mut self, request: ListIndexSizeInfoRequest) -> Self::Future { + fn call(&mut self, request: ListIndexStatsRequest) -> Self::Future { let svc = self.clone(); - let fut = async move { svc.0.list_index_size_info(request).await }; + let fut = async move { svc.0.list_index_stats(request).await }; Box::pin(fut) } } @@ -2069,9 +2075,9 @@ struct MetastoreServiceTowerServiceStack { EmptyResponse, crate::metastore::MetastoreError, >, - list_index_size_info_svc: quickwit_common::tower::BoxService< - ListIndexSizeInfoRequest, - ListIndexSizeInfoResponse, + list_index_stats_svc: quickwit_common::tower::BoxService< + ListIndexStatsRequest, + ListIndexStatsResponse, crate::metastore::MetastoreError, >, list_splits_svc: quickwit_common::tower::BoxService< @@ -2243,11 +2249,11 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.delete_index_svc.clone().ready().await?.call(request).await } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult { - self.list_index_size_info_svc.clone().ready().await?.call(request).await + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.list_index_stats_svc.clone().ready().await?.call(request).await } async fn list_splits( &self, @@ -2472,14 +2478,14 @@ type DeleteIndexLayer = quickwit_common::tower::BoxLayer< EmptyResponse, crate::metastore::MetastoreError, >; -type ListIndexSizeInfoLayer = quickwit_common::tower::BoxLayer< +type ListIndexStatsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< - ListIndexSizeInfoRequest, - ListIndexSizeInfoResponse, + ListIndexStatsRequest, + ListIndexStatsResponse, crate::metastore::MetastoreError, >, - ListIndexSizeInfoRequest, - ListIndexSizeInfoResponse, + ListIndexStatsRequest, + ListIndexStatsResponse, crate::metastore::MetastoreError, >; type ListSplitsLayer = quickwit_common::tower::BoxLayer< @@ -2750,7 +2756,7 @@ pub struct MetastoreServiceTowerLayerStack { indexes_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, - list_index_size_info_layers: Vec, + list_index_stats_layers: Vec, list_splits_layers: Vec, stage_splits_layers: Vec, publish_splits_layers: Vec, @@ -2935,29 +2941,29 @@ impl MetastoreServiceTowerLayerStack { >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< - ListIndexSizeInfoRequest, - ListIndexSizeInfoResponse, + ListIndexStatsRequest, + ListIndexStatsResponse, crate::metastore::MetastoreError, >, > + Clone + Send + Sync + 'static, , >>::Service: tower::Service< - ListIndexSizeInfoRequest, - Response = ListIndexSizeInfoResponse, + ListIndexStatsRequest, + Response = ListIndexStatsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, <, - >>::Service as tower::Service>::Future: Send + 'static, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< ListSplitsRequest, @@ -3637,7 +3643,7 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.list_index_size_info_layers + self.list_index_stats_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_splits_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -3810,24 +3816,23 @@ impl MetastoreServiceTowerLayerStack { self.delete_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn stack_list_index_size_info_layer(mut self, layer: L) -> Self + pub fn stack_list_index_stats_layer(mut self, layer: L) -> Self where L: tower::Layer< quickwit_common::tower::BoxService< - ListIndexSizeInfoRequest, - ListIndexSizeInfoResponse, + ListIndexStatsRequest, + ListIndexStatsResponse, crate::metastore::MetastoreError, >, > + Send + Sync + 'static, L::Service: tower::Service< - ListIndexSizeInfoRequest, - Response = ListIndexSizeInfoResponse, + ListIndexStatsRequest, + Response = ListIndexStatsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + >::Future: Send + 'static, { - self.list_index_size_info_layers - .push(quickwit_common::tower::BoxLayer::new(layer)); + self.list_index_stats_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn stack_list_splits_layer(mut self, layer: L) -> Self @@ -4459,8 +4464,8 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); - let list_index_size_info_svc = self - .list_index_size_info_layers + let list_index_stats_svc = self + .list_index_stats_layers .into_iter() .rev() .fold( @@ -4683,7 +4688,7 @@ impl MetastoreServiceTowerLayerStack { indexes_metadata_svc, list_indexes_metadata_svc, delete_index_svc, - list_index_size_info_svc, + list_index_stats_svc, list_splits_svc, stage_splits_svc, publish_splits_svc, @@ -4826,13 +4831,10 @@ where Future = BoxFuture, > + tower::Service< - ListIndexSizeInfoRequest, - Response = ListIndexSizeInfoResponse, + ListIndexStatsRequest, + Response = ListIndexStatsResponse, Error = crate::metastore::MetastoreError, - Future = BoxFuture< - ListIndexSizeInfoResponse, - crate::metastore::MetastoreError, - >, + Future = BoxFuture, > + tower::Service< ListSplitsRequest, @@ -5048,10 +5050,10 @@ where ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult { + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } async fn list_splits( @@ -5341,18 +5343,18 @@ where DeleteIndexRequest::rpc_name(), )) } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: ListIndexSizeInfoRequest, - ) -> crate::metastore::MetastoreResult { + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { self.inner .clone() - .list_index_size_info(request) + .list_index_stats(request) .await .map(|response| response.into_inner()) .map_err(|status| crate::error::grpc_status_to_service_error( status, - ListIndexSizeInfoRequest::rpc_name(), + ListIndexStatsRequest::rpc_name(), )) } async fn list_splits( @@ -5826,13 +5828,13 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } - async fn list_index_size_info( + async fn list_index_stats( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { self.inner .0 - .list_index_size_info(request.into_inner()) + .list_index_stats(request.into_inner()) .await .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) @@ -6446,11 +6448,11 @@ pub mod metastore_service_grpc_client { self.inner.unary(req, path, codec).await } /// Returns a list of size info for each index. - pub async fn list_index_size_info( + pub async fn list_index_stats( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -6463,14 +6465,14 @@ pub mod metastore_service_grpc_client { })?; let codec = tonic_prost::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit.metastore.MetastoreService/ListIndexSizeInfo", + "/quickwit.metastore.MetastoreService/ListIndexStats", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( "quickwit.metastore.MetastoreService", - "ListIndexSizeInfo", + "ListIndexStats", ), ); self.inner.unary(req, path, codec).await @@ -7275,11 +7277,11 @@ pub mod metastore_service_grpc_server { request: tonic::Request, ) -> std::result::Result, tonic::Status>; /// Returns a list of size info for each index. - async fn list_index_size_info( + async fn list_index_stats( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// Server streaming response type for the ListSplits method. @@ -7881,25 +7883,25 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } - "/quickwit.metastore.MetastoreService/ListIndexSizeInfo" => { + "/quickwit.metastore.MetastoreService/ListIndexStats" => { #[allow(non_camel_case_types)] - struct ListIndexSizeInfoSvc(pub Arc); + struct ListIndexStatsSvc(pub Arc); impl< T: MetastoreServiceGrpc, - > tonic::server::UnaryService - for ListIndexSizeInfoSvc { - type Response = super::ListIndexSizeInfoResponse; + > tonic::server::UnaryService + for ListIndexStatsSvc { + type Response = super::ListIndexStatsResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::list_index_size_info( + ::list_index_stats( &inner, request, ) @@ -7914,7 +7916,7 @@ pub mod metastore_service_grpc_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = ListIndexSizeInfoSvc(inner); + let method = ListIndexStatsSvc(inner); let codec = tonic_prost::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( From 23a623d4db5a70d2bcce3a7b9e39b9ed2839169b Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Wed, 24 Dec 2025 11:47:12 -0500 Subject: [PATCH 3/7] add breakdown by split state --- .../file_backed/file_backed_index/mod.rs | 37 +++-- .../src/metastore/postgres/metastore.rs | 61 ++++++-- .../quickwit-metastore/src/split_metadata.rs | 11 +- .../quickwit-metastore/src/tests/index.rs | 139 ++++++++---------- quickwit/quickwit-metastore/src/tests/mod.rs | 4 +- .../protos/quickwit/metastore.proto | 10 +- .../codegen/quickwit/quickwit.metastore.rs | 18 ++- quickwit/quickwit-proto/src/metastore/mod.rs | 7 + 8 files changed, 166 insertions(+), 121 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index ef03c8e7376..b42049d3833 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -32,7 +32,7 @@ use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind, IndexStats, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, - OpenShardSubresponse, PruneShardsRequest, + OpenShardSubresponse, PruneShardsRequest, SplitStats, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -498,24 +498,31 @@ impl FileBackedIndex { Ok(()) } - /// Gets IndexStats { index_uid, num_splits, total_size_bytes } for this index - /// Only counts splits that are in published state + /// Gets IndexStats for this index pub(crate) fn get_stats(&self) -> MetastoreResult { - let (num_splits, total_size_bytes) = self - .splits - .values() - .filter(|split| split.split_state == SplitState::Published) - .fold((0, 0), |(count, size), split| { - ( - count + 1, - size + split.split_metadata.footer_offsets.end as i64, - ) - }); + let mut staged_stats = SplitStats::default(); + let mut published_stats = SplitStats::default(); + let mut marked_for_deletion_stats = SplitStats::default(); + + for split in self.splits.values() { + match split.split_state { + SplitState::Staged => { + staged_stats.add_split(split.split_metadata.footer_offsets.end) + } + SplitState::Published => { + published_stats.add_split(split.split_metadata.footer_offsets.end) + } + SplitState::MarkedForDeletion => { + marked_for_deletion_stats.add_split(split.split_metadata.footer_offsets.end) + } + } + } Ok(IndexStats { index_uid: Some(self.index_uid().clone()), - num_splits, - total_size_bytes, + staged: Some(staged_stats), + published: Some(published_stats), + marked_for_deletion: Some(marked_for_deletion_stats), }) } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c3fd96ee9bb..44f57f65849 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -43,8 +43,8 @@ use quickwit_proto::metastore::{ ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, - PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, - UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, + PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest, + ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId}; @@ -918,33 +918,68 @@ impl MetastoreService for PostgresqlMetastore { let sql = format!( "SELECT i.index_uid, + s.split_state, COUNT(split_id) AS num_splits, COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes FROM ({index_pattern_sql}) i - LEFT JOIN splits s ON s.index_uid = i.index_uid AND s.split_state = 'Published' - GROUP BY i.index_uid" + LEFT JOIN splits s ON s.index_uid = i.index_uid + GROUP BY i.index_uid, s.split_state" ); - let rows: Vec<(String, i64, i64)> = sqlx::query_as(&sql) + let rows: Vec<(String, Option, i64, i64)> = sqlx::query_as(&sql) .fetch_all(&self.connection_pool) .await?; - let mut index_stats = Vec::new(); - for (index_uid_str, num_splits, total_size_bytes) in rows { + let mut index_stats = HashMap::new(); + for (index_uid_str, split_state, num_splits, total_size_bytes) in rows { let Ok(index_uid) = IndexUid::from_str(&index_uid_str) else { return Err(MetastoreError::Internal { message: "failed to parse index_uid".to_string(), cause: index_uid_str.to_string(), }); }; - index_stats.push(IndexStats { - index_uid: Some(index_uid), - num_splits, - total_size_bytes, - }); + let stats = index_stats + .entry(index_uid_str) + .or_insert_with(|| IndexStats { + index_uid: Some(index_uid), + staged: Some(SplitStats::default()), + published: Some(SplitStats::default()), + marked_for_deletion: Some(SplitStats::default()), + }); + let num_splits = num_splits as u64; + let total_size_bytes = total_size_bytes as u64; + match split_state.as_deref() { + Some("Staged") => { + stats.staged = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + Some("Published") => { + stats.published = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + Some("MarkedForDeletion") => { + stats.marked_for_deletion = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + None => {} // if an index has no splits, split_state is null and we can keep the defaults + Some(split_state) => { + return Err(MetastoreError::Internal { + message: "invalid split state".to_string(), + cause: split_state.to_string(), + }); + } + } } - Ok(ListIndexStatsResponse { index_stats }) + Ok(ListIndexStatsResponse { + index_stats: index_stats.into_values().collect(), + }) } #[instrument(skip(self))] diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..3b16cc7c3aa 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -290,16 +290,19 @@ impl quickwit_config::TestableForRegression for SplitMetadata { } /// A split state. -#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema, prost::Enumeration, +)] +#[repr(i32)] pub enum SplitState { /// The split is almost ready. Some of its files may have been uploaded in the storage. - Staged, + Staged = 0, /// The split is ready and published. - Published, + Published = 1, /// The split is marked for deletion. - MarkedForDeletion, + MarkedForDeletion = 2, } impl fmt::Display for SplitState { diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 8361cfb85f8..97b2725836f 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -34,7 +34,7 @@ use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, IndexStats, IndexesMetadataRequest, ListIndexStatsRequest, ListIndexesMetadataRequest, MetastoreError, - MetastoreService, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, + MetastoreService, PublishSplitsRequest, SplitStats, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid}; use time::OffsetDateTime; @@ -939,15 +939,36 @@ pub async fn test_metastore_list_index_stats< .unwrap(); metastore.stage_splits(stage_splits_request).await.unwrap(); - let publish_splits_request = PublishSplitsRequest { + let expected_stats_1 = IndexStats { + index_uid: Some(index_uid_1.clone()), + staged: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + published: Some(SplitStats { + num_splits: 2, + total_size_bytes: 4096, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + }; + let expected_stats_2 = IndexStats { index_uid: Some(index_uid_2.clone()), - staged_split_ids: vec![split_id_3.clone()], - ..Default::default() + staged: Some(SplitStats { + num_splits: 1, + total_size_bytes: 1000, + }), + published: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), }; - metastore - .publish_splits(publish_splits_request) - .await - .unwrap(); let response = metastore .list_index_stats(ListIndexStatsRequest { @@ -956,82 +977,32 @@ pub async fn test_metastore_list_index_stats< .await .unwrap(); - // we use the same postgres db for multiple executions of this test. we need to filter out the - // indexes that don't belong to the current execution - let indexes = response + let index_stats_1 = response .index_stats - .iter() - .filter(|stats| { - stats.index_uid == Some(index_uid_1.clone()) - || stats.index_uid == Some(index_uid_2.clone()) - }) - .collect::>(); - - assert_eq!(indexes.len(), 2); - - let index_1 = indexes .iter() .find(|index| index.index_uid == Some(index_uid_1.clone())) .expect("Should find index 1"); - assert_eq!(index_1.num_splits, 2); - assert_eq!(index_1.total_size_bytes, 4096); - let index_2 = indexes + assert_eq!(index_stats_1, &expected_stats_1); + + let index_stats_2 = response + .index_stats .iter() .find(|index| index.index_uid == Some(index_uid_2.clone())) .expect("Should find index 2"); - assert_eq!(index_2.num_splits, 1); - assert_eq!(index_2.total_size_bytes, 1000); + assert_eq!(index_stats_2, &expected_stats_2); } -pub async fn test_metastore_list_index_stats_no_publish< +pub async fn test_metastore_list_index_stats_no_splits< MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let metastore = MetastoreToTest::default_for_test().await; - let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); - - let index_id = append_random_suffix("test-list-index-stats"); - let index_uid = IndexUid::new_with_random_ulid(&index_id); + let index_id = append_random_suffix("test-list-index-stats-no-splits"); let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(&index_id, &index_uri); - - let split_id_1 = format!("{index_id}--split-1"); - let split_metadata_1 = SplitMetadata { - split_id: split_id_1.clone(), - index_uid: index_uid.clone(), - time_range: Some(0..=99), - create_timestamp: current_timestamp, - maturity: SplitMaturity::Immature { - maturation_period: Duration::from_secs(0), - }, - tags: to_btree_set(&["tag!", "tag:foo", "$tag!", "$tag:bar"]), - delete_opstamp: 3, - footer_offsets: 0..2048, - uncompressed_docs_size_in_bytes: 2048, - num_docs: 100, - ..Default::default() - }; - - let split_id_2 = format!("{index_id}--split-2"); - let split_metadata_2 = SplitMetadata { - split_id: split_id_2.clone(), - index_uid: index_uid.clone(), - time_range: Some(100..=199), - create_timestamp: current_timestamp, - maturity: SplitMaturity::Immature { - maturation_period: Duration::from_secs(10), - }, - tags: to_btree_set(&["tag!", "$tag!", "$tag:bar"]), - delete_opstamp: 1, - footer_offsets: 0..2048, - uncompressed_docs_size_in_bytes: 2048, - num_docs: 100, - ..Default::default() - }; - - // add split-1 and split-2 to index-1 let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore .create_index(create_index_request) .await @@ -1039,28 +1010,34 @@ pub async fn test_metastore_list_index_stats_no_publish< .index_uid() .clone(); - let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( - index_uid.clone(), - vec![split_metadata_1.clone(), split_metadata_2.clone()], - ) - .unwrap(); - metastore.stage_splits(stage_splits_request).await.unwrap(); + let expected_stats = IndexStats { + index_uid: Some(index_uid.clone()), + staged: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + published: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + }; let response = metastore .list_index_stats(ListIndexStatsRequest { - index_id_patterns: vec!["test-list-index-stats*".to_string()], + index_id_patterns: vec!["test-list-index-stats-no-splits*".to_string()], }) .await .unwrap(); - // we use the same postgres db for multiple executions of this test. we need to filter out the - // indexes that don't belong to the current execution let index_stats = response .index_stats .iter() - .filter(|stats| stats.index_uid == Some(index_uid.clone())) - .collect::>(); - assert_eq!(index_stats.len(), 1); - assert_eq!(index_stats[0].num_splits, 0); - assert_eq!(index_stats[0].total_size_bytes, 0); + .find(|index| index.index_uid == Some(index_uid.clone())) + .expect("Should find index"); + + assert_eq!(index_stats, &expected_stats); } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 62d87d35a34..fb174d974e6 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -290,9 +290,9 @@ macro_rules! metastore_test_suite { #[tokio::test] #[serial_test::file_serial] - async fn test_metastore_list_index_stats_no_publish() { + async fn test_metastore_list_index_stats_no_splits() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_list_index_stats_no_publish::<$metastore_type>().await; + $crate::tests::index::test_metastore_list_index_stats_no_splits::<$metastore_type>().await; } // Split API tests diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 9d9fbcc99da..9b42c0097ce 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -303,8 +303,14 @@ message ListIndexStatsResponse { message IndexStats { quickwit.common.IndexUid index_uid = 1; - int64 num_splits = 2; - int64 total_size_bytes = 3; + SplitStats staged = 2; + SplitStats published = 3; + SplitStats marked_for_deletion = 4; +} + +message SplitStats { + uint64 num_splits = 1; + uint64 total_size_bytes = 2; } message ListSplitsRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 3fe8609de1f..f0ad21c9b3e 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -137,10 +137,20 @@ pub struct ListIndexStatsResponse { pub struct IndexStats { #[prost(message, optional, tag = "1")] pub index_uid: ::core::option::Option, - #[prost(int64, tag = "2")] - pub num_splits: i64, - #[prost(int64, tag = "3")] - pub total_size_bytes: i64, + #[prost(message, optional, tag = "2")] + pub staged: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub published: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub marked_for_deletion: ::core::option::Option, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SplitStats { + #[prost(uint64, tag = "1")] + pub num_splits: u64, + #[prost(uint64, tag = "2")] + pub total_size_bytes: u64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index a855ba5a665..a96b581824f 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -387,6 +387,13 @@ impl ListDeleteTasksRequest { } } +impl SplitStats { + pub fn add_split(&mut self, size: u64) { + self.num_splits += 1; + self.total_size_bytes += size; + } +} + pub mod serde_utils { use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; From 705618489c04b1fcf36e445ca1b5c8698896793a Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Wed, 24 Dec 2025 11:54:03 -0500 Subject: [PATCH 4/7] lints --- .../src/metastore/postgres/metastore.rs | 3 ++- quickwit/quickwit-metastore/src/split_metadata.rs | 11 ++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 44f57f65849..4aabe0a9072 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -967,7 +967,8 @@ impl MetastoreService for PostgresqlMetastore { total_size_bytes, }); } - None => {} // if an index has no splits, split_state is null and we can keep the defaults + None => {} /* if an index has no splits, split_state is null and we can keep the + * defaults */ Some(split_state) => { return Err(MetastoreError::Internal { message: "invalid split state".to_string(), diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index 3b16cc7c3aa..fe88fe379d3 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -290,19 +290,16 @@ impl quickwit_config::TestableForRegression for SplitMetadata { } /// A split state. -#[derive( - Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema, prost::Enumeration, -)] -#[repr(i32)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)] pub enum SplitState { /// The split is almost ready. Some of its files may have been uploaded in the storage. - Staged = 0, + Staged, /// The split is ready and published. - Published = 1, + Published, /// The split is marked for deletion. - MarkedForDeletion = 2, + MarkedForDeletion, } impl fmt::Display for SplitState { From 32535bdb6ad2f7a622eb09006a84f4b9f750709c Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Wed, 24 Dec 2025 12:23:56 -0500 Subject: [PATCH 5/7] lints --- .../quickwit-metastore/src/metastore/postgres/metastore.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 4aabe0a9072..7368b5e1e1b 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -967,8 +967,7 @@ impl MetastoreService for PostgresqlMetastore { total_size_bytes, }); } - None => {} /* if an index has no splits, split_state is null and we can keep the - * defaults */ + None => {} // if an index has no splits, we can keep the defaults Some(split_state) => { return Err(MetastoreError::Internal { message: "invalid split state".to_string(), From a8d0ffc93cf43da49445bb8aa10af78ea584998f Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Fri, 26 Dec 2025 12:58:34 -0500 Subject: [PATCH 6/7] test_get_stats and nits --- .../file_backed/file_backed_index/mod.rs | 35 ++++++++++++++++-- .../src/metastore/postgres/metastore.rs | 2 +- .../quickwit-metastore/src/tests/index.rs | 37 ++----------------- quickwit/quickwit-metastore/src/tests/mod.rs | 2 +- .../protos/quickwit/metastore.proto | 2 +- .../codegen/quickwit/quickwit.metastore.rs | 2 +- quickwit/quickwit-proto/src/metastore/mod.rs | 4 +- 7 files changed, 41 insertions(+), 43 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index b42049d3833..8b1c4aaa424 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -793,16 +793,16 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { #[cfg(test)] mod tests { - use std::collections::BTreeSet; + use std::collections::{BTreeSet, HashMap}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::ingest::Shard; - use quickwit_proto::metastore::ListShardsSubrequest; + use quickwit_proto::metastore::{ListShardsSubrequest, SplitStats}; use quickwit_proto::types::{IndexUid, SourceId}; use super::FileBackedIndex; use crate::file_backed::file_backed_index::split_query_predicate; - use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState}; + use crate::{IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState}; impl FileBackedIndex { pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec) { @@ -833,6 +833,7 @@ mod tests { time_range: Some(32..=40), tags: BTreeSet::from(["tag-1".to_string()]), create_timestamp: 12, + footer_offsets: 0..2048, ..Default::default() }, split_state: SplitState::Staged, @@ -846,6 +847,7 @@ mod tests { time_range: None, tags: BTreeSet::from(["tag-2".to_string(), "tag-3".to_string()]), create_timestamp: 5, + footer_offsets: 0..1024, ..Default::default() }, split_state: SplitState::MarkedForDeletion, @@ -859,6 +861,7 @@ mod tests { time_range: Some(0..=90), tags: BTreeSet::from(["tag-2".to_string(), "tag-4".to_string()]), create_timestamp: 64, + footer_offsets: 0..512, ..Default::default() }, split_state: SplitState::Published, @@ -973,4 +976,30 @@ mod tests { assert!(!split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); } + + #[test] + fn test_get_stats() { + let index_id = "test-index"; + let index_metadata = IndexMetadata::for_test(index_id, "file:///qwdata/indexes/test-index"); + let index = + FileBackedIndex::new(index_metadata, make_splits().into(), HashMap::new(), vec![]); + + let expected_staged = Some(SplitStats { + num_splits: 1, + total_size_bytes: 2048, + }); + let expected_published = Some(SplitStats { + num_splits: 1, + total_size_bytes: 512, + }); + let expected_marked_for_deletion = Some(SplitStats { + num_splits: 1, + total_size_bytes: 1024, + }); + let stats = index.get_stats().unwrap(); + + assert_eq!(stats.staged, expected_staged); + assert_eq!(stats.published, expected_published); + assert_eq!(stats.marked_for_deletion, expected_marked_for_deletion); + } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 7368b5e1e1b..c7640bfb9ab 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -919,7 +919,7 @@ impl MetastoreService for PostgresqlMetastore { "SELECT i.index_uid, s.split_state, - COUNT(split_id) AS num_splits, + COUNT(s.split_state) AS num_splits, COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes FROM ({index_pattern_sql}) i LEFT JOIN splits s ON s.index_uid = i.index_uid diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 97b2725836f..c093c7b1a6c 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -21,7 +21,6 @@ // - delete_index use std::num::NonZeroUsize; -use std::time::Duration; use quickwit_common::rand::append_random_suffix; use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; @@ -37,14 +36,13 @@ use quickwit_proto::metastore::{ MetastoreService, PublishSplitsRequest, SplitStats, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid}; -use time::OffsetDateTime; use super::DefaultForTest; -use crate::tests::{cleanup_index, to_btree_set}; +use crate::tests::cleanup_index; use crate::{ CreateIndexRequestExt, IndexMetadataResponseExt, IndexesMetadataResponseExt, - ListIndexesMetadataResponseExt, MetastoreServiceExt, SplitMaturity, SplitMetadata, - StageSplitsRequestExt, UpdateIndexRequestExt, + ListIndexesMetadataResponseExt, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + UpdateIndexRequestExt, }; pub async fn test_metastore_create_index< @@ -834,8 +832,6 @@ pub async fn test_metastore_list_index_stats< >() { let metastore = MetastoreToTest::default_for_test().await; - let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); - let index_id_1 = append_random_suffix("test-list-index-stats"); let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); let index_uri_1 = format!("ram:///indexes/{index_id_1}"); @@ -850,16 +846,7 @@ pub async fn test_metastore_list_index_stats< let split_metadata_1 = SplitMetadata { split_id: split_id_1.clone(), index_uid: index_uid_1.clone(), - time_range: Some(0..=99), - create_timestamp: current_timestamp, - maturity: SplitMaturity::Immature { - maturation_period: Duration::from_secs(0), - }, - tags: to_btree_set(&["tag!", "tag:foo", "$tag!", "$tag:bar"]), - delete_opstamp: 3, footer_offsets: 0..2048, - uncompressed_docs_size_in_bytes: 2048, - num_docs: 100, ..Default::default() }; @@ -867,16 +854,7 @@ pub async fn test_metastore_list_index_stats< let split_metadata_2 = SplitMetadata { split_id: split_id_2.clone(), index_uid: index_uid_1.clone(), - time_range: Some(100..=199), - create_timestamp: current_timestamp, - maturity: SplitMaturity::Immature { - maturation_period: Duration::from_secs(10), - }, - tags: to_btree_set(&["tag!", "$tag!", "$tag:bar"]), - delete_opstamp: 1, footer_offsets: 0..2048, - uncompressed_docs_size_in_bytes: 2048, - num_docs: 100, ..Default::default() }; @@ -884,16 +862,7 @@ pub async fn test_metastore_list_index_stats< let split_metadata_3 = SplitMetadata { split_id: split_id_3.clone(), index_uid: index_uid_2.clone(), - time_range: Some(200..=299), - create_timestamp: current_timestamp, - maturity: SplitMaturity::Immature { - maturation_period: Duration::from_secs(20), - }, - tags: to_btree_set(&["tag!", "tag:foo", "tag:baz", "$tag!"]), - delete_opstamp: 5, footer_offsets: 0..1000, - uncompressed_docs_size_in_bytes: 1000, - num_docs: 100, ..Default::default() }; diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index fb174d974e6..d6e549baf25 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -178,7 +178,7 @@ macro_rules! metastore_test_suite { // - indexes_metadata // - list_indexes // - delete_index - // - list_index_size_info + // - list_index_stats #[tokio::test] #[serial_test::file_serial] diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 9b42c0097ce..00680da02d0 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -293,7 +293,7 @@ message ListIndexStatsRequest { // List of patterns an index should match or not match to get considered // An index must match at least one positive pattern (a pattern not starting // with a '-'), and no negative pattern (a pattern starting with a '-'). - repeated string index_id_patterns = 2; + repeated string index_id_patterns = 1; } message ListIndexStatsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index f0ad21c9b3e..ab6d1ddc236 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -122,7 +122,7 @@ pub struct ListIndexStatsRequest { /// List of patterns an index should match or not match to get considered /// An index must match at least one positive pattern (a pattern not starting /// with a '-'), and no negative pattern (a pattern starting with a '-'). - #[prost(string, repeated, tag = "2")] + #[prost(string, repeated, tag = "1")] pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index a96b581824f..ba371c13d4a 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -388,9 +388,9 @@ impl ListDeleteTasksRequest { } impl SplitStats { - pub fn add_split(&mut self, size: u64) { + pub fn add_split(&mut self, size_bytes: u64) { self.num_splits += 1; - self.total_size_bytes += size; + self.total_size_bytes += size_bytes; } } From 27c4692b97600cb3dc4afc1d7098a45d95c552e3 Mon Sep 17 00:00:00 2001 From: Abdul Andha Date: Sat, 27 Dec 2025 13:10:05 -0500 Subject: [PATCH 7/7] use futuresunordered --- .../src/metastore/file_backed/mod.rs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index f6328445368..fadeb1bc84f 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -815,7 +815,7 @@ impl MetastoreService for FileBackedMetastore { ) -> MetastoreResult { let index_id_matcher = IndexIdMatcher::try_from_index_id_patterns(&request.index_id_patterns)?; - let index_id_incarnation_id_opts: Vec<(IndexId, Option)> = { + let index_ids: Vec = { let inner_rlock_guard = self.state.read().await; inner_rlock_guard .indexes @@ -826,19 +826,23 @@ impl MetastoreService for FileBackedMetastore { } _ => None, }) - .map(|index_id| (index_id.clone(), None)) + .cloned() .collect() }; + let mut index_read_futures = FuturesUnordered::new(); + for index_id in index_ids { + let index_read_future = async move { + self.read_any(&index_id, None, |index| index.get_stats()) + .await + }; + index_read_futures.push(index_read_future); + } + let mut index_stats = Vec::new(); - for (index_id, incarnation_id_opt) in index_id_incarnation_id_opts { - match self - .read_any(&index_id, incarnation_id_opt, |index| index.get_stats()) - .await - { - Ok(stats) => { - index_stats.push(stats); - } + while let Some(index_read_result) = index_read_futures.next().await { + match index_read_result { + Ok(stats) => index_stats.push(stats), Err(MetastoreError::NotFound(_)) => { // If the index does not exist, we just skip it. continue;