diff --git a/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql new file mode 100644 index 00000000000..3e1c1746b04 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS idx_splits_stats; + +ALTER TABLE splits DROP COLUMN IF EXISTS split_size_bytes; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql new file mode 100644 index 00000000000..407809cc6b2 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/25_add-split-size.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE splits ADD COLUMN IF NOT EXISTS split_size_bytes BIGINT NOT NULL GENERATED ALWAYS AS ((split_metadata_json::json->'footer_offsets'->>'end')::bigint) STORED; + +CREATE INDEX IF NOT EXISTS idx_splits_stats ON splits (index_uid, split_state) INCLUDE (split_size_bytes); diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 9d3fc7bd95f..bcb07d79020 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -26,13 +26,14 @@ use quickwit_proto::metastore::{ GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, - ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, - ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, - ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, - OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, + ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, + ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, + ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, + MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, + OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -162,6 +163,13 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.list_splits(request).await } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> MetastoreResult { + self.metastore.list_index_stats(request).await + } + async fn list_stale_splits( &self, request: ListStaleSplitsRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 15aeeb21a5c..8b1c4aaa424 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,8 +30,9 @@ use quickwit_config::{ }; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, - DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, - MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, + DeleteShardsResponse, DeleteTask, EntityKind, IndexStats, ListShardsSubrequest, + ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, + OpenShardSubresponse, PruneShardsRequest, SplitStats, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -497,6 +498,34 @@ impl FileBackedIndex { Ok(()) } + /// Gets IndexStats for this index + pub(crate) fn get_stats(&self) -> MetastoreResult { + let mut staged_stats = SplitStats::default(); + let mut published_stats = SplitStats::default(); + let mut marked_for_deletion_stats = SplitStats::default(); + + for split in self.splits.values() { + match split.split_state { + SplitState::Staged => { + staged_stats.add_split(split.split_metadata.footer_offsets.end) + } + SplitState::Published => { + published_stats.add_split(split.split_metadata.footer_offsets.end) + } + SplitState::MarkedForDeletion => { + marked_for_deletion_stats.add_split(split.split_metadata.footer_offsets.end) + } + } + } + + Ok(IndexStats { + index_uid: Some(self.index_uid().clone()), + staged: Some(staged_stats), + published: Some(published_stats), + marked_for_deletion: Some(marked_for_deletion_stats), + }) + } + /// Adds a source. pub(crate) fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> { let index_uid = self.index_uid().clone(); @@ -764,16 +793,16 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { #[cfg(test)] mod tests { - use std::collections::BTreeSet; + use std::collections::{BTreeSet, HashMap}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::ingest::Shard; - use quickwit_proto::metastore::ListShardsSubrequest; + use quickwit_proto::metastore::{ListShardsSubrequest, SplitStats}; use quickwit_proto::types::{IndexUid, SourceId}; use super::FileBackedIndex; use crate::file_backed::file_backed_index::split_query_predicate; - use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState}; + use crate::{IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState}; impl FileBackedIndex { pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec) { @@ -804,6 +833,7 @@ mod tests { time_range: Some(32..=40), tags: BTreeSet::from(["tag-1".to_string()]), create_timestamp: 12, + footer_offsets: 0..2048, ..Default::default() }, split_state: SplitState::Staged, @@ -817,6 +847,7 @@ mod tests { time_range: None, tags: BTreeSet::from(["tag-2".to_string(), "tag-3".to_string()]), create_timestamp: 5, + footer_offsets: 0..1024, ..Default::default() }, split_state: SplitState::MarkedForDeletion, @@ -830,6 +861,7 @@ mod tests { time_range: Some(0..=90), tags: BTreeSet::from(["tag-2".to_string(), "tag-4".to_string()]), create_timestamp: 64, + footer_offsets: 0..512, ..Default::default() }, split_state: SplitState::Published, @@ -944,4 +976,30 @@ mod tests { assert!(!split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); } + + #[test] + fn test_get_stats() { + let index_id = "test-index"; + let index_metadata = IndexMetadata::for_test(index_id, "file:///qwdata/indexes/test-index"); + let index = + FileBackedIndex::new(index_metadata, make_splits().into(), HashMap::new(), vec![]); + + let expected_staged = Some(SplitStats { + num_splits: 1, + total_size_bytes: 2048, + }); + let expected_published = Some(SplitStats { + num_splits: 1, + total_size_bytes: 512, + }); + let expected_marked_for_deletion = Some(SplitStats { + num_splits: 1, + total_size_bytes: 1024, + }); + let stats = index.get_stats().unwrap(); + + assert_eq!(stats.staged, expected_staged); + assert_eq!(stats.published, expected_published); + assert_eq!(stats.marked_for_deletion, expected_marked_for_deletion); + } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 8377e6e8b7b..2542f1db36f 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -49,14 +49,14 @@ use quickwit_proto::metastore::{ IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, - ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, - ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, - ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, - MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, - OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, - serde_utils, + ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, + ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, + ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, + ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, + MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, + OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils, }; use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; @@ -809,6 +809,51 @@ impl MetastoreService for FileBackedMetastore { Ok(ServiceStream::new(splits_responses_stream)) } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> MetastoreResult { + let index_id_matcher = + IndexIdMatcher::try_from_index_id_patterns(&request.index_id_patterns)?; + let index_ids: Vec = { + let inner_rlock_guard = self.state.read().await; + inner_rlock_guard + .indexes + .iter() + .filter_map(|(index_id, index_state)| match index_state { + LazyIndexStatus::Active(_) if index_id_matcher.is_match(index_id) => { + Some(index_id) + } + _ => None, + }) + .cloned() + .collect() + }; + + let mut index_read_futures = FuturesUnordered::new(); + for index_id in index_ids { + let index_read_future = async move { + self.read_any(&index_id, None, |index| index.get_stats()) + .await + }; + index_read_futures.push(index_read_future); + } + + let mut index_stats = Vec::new(); + while let Some(index_read_result) = index_read_futures.next().await { + match index_read_result { + Ok(stats) => index_stats.push(stats), + Err(MetastoreError::NotFound(_)) => { + // If the index does not exist, we just skip it. + continue; + } + Err(error) => return Err(error), + } + } + + Ok(ListIndexStatsResponse { index_stats }) + } + async fn list_stale_splits( &self, request: ListStaleSplitsRequest, diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index e63a9400688..c7640bfb9ab 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt::{self, Write}; +use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; @@ -34,15 +35,16 @@ use quickwit_proto::metastore::{ FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse, - IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest, - LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, + IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, + LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest, + ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, - PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, - UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, + PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest, + ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId}; @@ -904,6 +906,82 @@ impl MetastoreService for PostgresqlMetastore { Ok(service_stream) } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> MetastoreResult { + let index_pattern_sql = build_index_id_patterns_sql_query(&request.index_id_patterns) + .map_err(|error| MetastoreError::Internal { + message: "failed to build `list_index_stats` SQL query".to_string(), + cause: error.to_string(), + })?; + let sql = format!( + "SELECT + i.index_uid, + s.split_state, + COUNT(s.split_state) AS num_splits, + COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes + FROM ({index_pattern_sql}) i + LEFT JOIN splits s ON s.index_uid = i.index_uid + GROUP BY i.index_uid, s.split_state" + ); + + let rows: Vec<(String, Option, i64, i64)> = sqlx::query_as(&sql) + .fetch_all(&self.connection_pool) + .await?; + + let mut index_stats = HashMap::new(); + for (index_uid_str, split_state, num_splits, total_size_bytes) in rows { + let Ok(index_uid) = IndexUid::from_str(&index_uid_str) else { + return Err(MetastoreError::Internal { + message: "failed to parse index_uid".to_string(), + cause: index_uid_str.to_string(), + }); + }; + let stats = index_stats + .entry(index_uid_str) + .or_insert_with(|| IndexStats { + index_uid: Some(index_uid), + staged: Some(SplitStats::default()), + published: Some(SplitStats::default()), + marked_for_deletion: Some(SplitStats::default()), + }); + let num_splits = num_splits as u64; + let total_size_bytes = total_size_bytes as u64; + match split_state.as_deref() { + Some("Staged") => { + stats.staged = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + Some("Published") => { + stats.published = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + Some("MarkedForDeletion") => { + stats.marked_for_deletion = Some(SplitStats { + num_splits, + total_size_bytes, + }); + } + None => {} // if an index has no splits, we can keep the defaults + Some(split_state) => { + return Err(MetastoreError::Internal { + message: "invalid split state".to_string(), + cause: split_state.to_string(), + }); + } + } + } + + Ok(ListIndexStatsResponse { + index_stats: index_stats.into_values().collect(), + }) + } + #[instrument(skip(self))] async fn mark_splits_for_deletion( &self, diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 50e0f695a3c..c093c7b1a6c 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -31,9 +31,9 @@ use quickwit_config::{ use quickwit_doc_mapper::{Cardinality, FieldMappingEntry, FieldMappingType, QuickwitJsonOptions}; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, - IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, - IndexesMetadataRequest, ListIndexesMetadataRequest, MetastoreError, MetastoreService, - StageSplitsRequest, UpdateIndexRequest, + IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, IndexStats, + IndexesMetadataRequest, ListIndexStatsRequest, ListIndexesMetadataRequest, MetastoreError, + MetastoreService, PublishSplitsRequest, SplitStats, StageSplitsRequest, UpdateIndexRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid}; @@ -826,3 +826,187 @@ pub async fn test_metastore_delete_index< cleanup_index(&mut metastore, index_uid).await; } + +pub async fn test_metastore_list_index_stats< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let metastore = MetastoreToTest::default_for_test().await; + + let index_id_1 = append_random_suffix("test-list-index-stats"); + let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-index-stats"); + let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let split_id_1 = format!("{index_id_1}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + footer_offsets: 0..2048, + ..Default::default() + }; + + let split_id_2 = format!("{index_id_1}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_1.clone(), + footer_offsets: 0..2048, + ..Default::default() + }; + + let split_id_3 = format!("{index_id_1}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_2.clone(), + footer_offsets: 0..1000, + ..Default::default() + }; + + // add split-1 and split-2 to index-1 + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![split_metadata_1.clone(), split_metadata_2.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone(), split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + // add split-3 to index-2 + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![split_metadata_3.clone()], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let expected_stats_1 = IndexStats { + index_uid: Some(index_uid_1.clone()), + staged: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + published: Some(SplitStats { + num_splits: 2, + total_size_bytes: 4096, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + }; + let expected_stats_2 = IndexStats { + index_uid: Some(index_uid_2.clone()), + staged: Some(SplitStats { + num_splits: 1, + total_size_bytes: 1000, + }), + published: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + }; + + let response = metastore + .list_index_stats(ListIndexStatsRequest { + index_id_patterns: vec!["test-list-index-stats*".to_string()], + }) + .await + .unwrap(); + + let index_stats_1 = response + .index_stats + .iter() + .find(|index| index.index_uid == Some(index_uid_1.clone())) + .expect("Should find index 1"); + + assert_eq!(index_stats_1, &expected_stats_1); + + let index_stats_2 = response + .index_stats + .iter() + .find(|index| index.index_uid == Some(index_uid_2.clone())) + .expect("Should find index 2"); + assert_eq!(index_stats_2, &expected_stats_2); +} + +pub async fn test_metastore_list_index_stats_no_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let metastore = MetastoreToTest::default_for_test().await; + + let index_id = append_random_suffix("test-list-index-stats-no-splits"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let expected_stats = IndexStats { + index_uid: Some(index_uid.clone()), + staged: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + published: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + marked_for_deletion: Some(SplitStats { + num_splits: 0, + total_size_bytes: 0, + }), + }; + + let response = metastore + .list_index_stats(ListIndexStatsRequest { + index_id_patterns: vec!["test-list-index-stats-no-splits*".to_string()], + }) + .await + .unwrap(); + + let index_stats = response + .index_stats + .iter() + .find(|index| index.index_uid == Some(index_uid.clone())) + .expect("Should find index"); + + assert_eq!(index_stats, &expected_stats); +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 37d14a34c69..d6e549baf25 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -178,6 +178,7 @@ macro_rules! metastore_test_suite { // - indexes_metadata // - list_indexes // - delete_index + // - list_index_stats #[tokio::test] #[serial_test::file_serial] @@ -280,6 +281,20 @@ macro_rules! metastore_test_suite { $crate::tests::index::test_metastore_delete_index::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_index_stats() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_list_index_stats::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_index_stats_no_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_list_index_stats_no_splits::<$metastore_type>().await; + } + // Split API tests // // - stage_splits diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 4e6a0e717ef..00680da02d0 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -107,6 +107,9 @@ service MetastoreService { // Deletes an index rpc DeleteIndex(DeleteIndexRequest) returns (EmptyResponse); + // Returns a list of size info for each index. + rpc ListIndexStats(ListIndexStatsRequest) returns (ListIndexStatsResponse); + // Streams splits from index. rpc ListSplits(ListSplitsRequest) returns (stream ListSplitsResponse); @@ -286,6 +289,30 @@ enum IndexMetadataFailureReason { INDEX_METADATA_FAILURE_REASON_INTERNAL = 2; } +message ListIndexStatsRequest { + // List of patterns an index should match or not match to get considered + // An index must match at least one positive pattern (a pattern not starting + // with a '-'), and no negative pattern (a pattern starting with a '-'). + repeated string index_id_patterns = 1; +} + +message ListIndexStatsResponse { + // list of IndexStats. each one has the index id, the number of splits and the total size. + repeated IndexStats index_stats = 1; +} + +message IndexStats { + quickwit.common.IndexUid index_uid = 1; + SplitStats staged = 2; + SplitStats published = 3; + SplitStats marked_for_deletion = 4; +} + +message SplitStats { + uint64 num_splits = 1; + uint64 total_size_bytes = 2; +} + message ListSplitsRequest { // Predicate used to filter splits. // The predicate is expressed as a JSON serialized diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 1ba6096d031..ab6d1ddc236 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -118,6 +118,42 @@ pub struct IndexMetadataFailure { } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListIndexStatsRequest { + /// List of patterns an index should match or not match to get considered + /// An index must match at least one positive pattern (a pattern not starting + /// with a '-'), and no negative pattern (a pattern starting with a '-'). + #[prost(string, repeated, tag = "1")] + pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListIndexStatsResponse { + /// list of IndexStats. each one has the index id, the number of splits and the total size. + #[prost(message, repeated, tag = "1")] + pub index_stats: ::prost::alloc::vec::Vec, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct IndexStats { + #[prost(message, optional, tag = "1")] + pub index_uid: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub staged: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub published: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub marked_for_deletion: ::core::option::Option, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SplitStats { + #[prost(uint64, tag = "1")] + pub num_splits: u64, + #[prost(uint64, tag = "2")] + pub total_size_bytes: u64, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ListSplitsRequest { /// Predicate used to filter splits. /// The predicate is expressed as a JSON serialized @@ -627,6 +663,11 @@ impl RpcName for DeleteIndexRequest { "delete_index" } } +impl RpcName for ListIndexStatsRequest { + fn rpc_name() -> &'static str { + "list_index_stats" + } +} impl RpcName for ListSplitsRequest { fn rpc_name() -> &'static str { "list_splits" @@ -796,6 +837,11 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { &self, request: DeleteIndexRequest, ) -> crate::metastore::MetastoreResult; + ///Returns a list of size info for each index. + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult; ///Streams splits from index. async fn list_splits( &self, @@ -1085,6 +1131,12 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.0.delete_index(request).await } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.0.list_index_stats(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -1293,6 +1345,12 @@ pub mod mock_metastore_service { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_index(request).await } + async fn list_index_stats( + &self, + request: super::ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner.lock().await.list_index_stats(request).await + } async fn list_splits( &self, request: super::ListSplitsRequest, @@ -1560,6 +1618,22 @@ impl tower::Service for InnerMetastoreServiceClient { Box::pin(fut) } } +impl tower::Service for InnerMetastoreServiceClient { + type Response = ListIndexStatsResponse; + type Error = crate::metastore::MetastoreError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: ListIndexStatsRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.list_index_stats(request).await }; + Box::pin(fut) + } +} impl tower::Service for InnerMetastoreServiceClient { type Response = MetastoreServiceStream; type Error = crate::metastore::MetastoreError; @@ -2011,6 +2085,11 @@ struct MetastoreServiceTowerServiceStack { EmptyResponse, crate::metastore::MetastoreError, >, + list_index_stats_svc: quickwit_common::tower::BoxService< + ListIndexStatsRequest, + ListIndexStatsResponse, + crate::metastore::MetastoreError, + >, list_splits_svc: quickwit_common::tower::BoxService< ListSplitsRequest, MetastoreServiceStream, @@ -2180,6 +2259,12 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { ) -> crate::metastore::MetastoreResult { self.delete_index_svc.clone().ready().await?.call(request).await } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.list_index_stats_svc.clone().ready().await?.call(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -2403,6 +2488,16 @@ type DeleteIndexLayer = quickwit_common::tower::BoxLayer< EmptyResponse, crate::metastore::MetastoreError, >; +type ListIndexStatsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListIndexStatsRequest, + ListIndexStatsResponse, + crate::metastore::MetastoreError, + >, + ListIndexStatsRequest, + ListIndexStatsResponse, + crate::metastore::MetastoreError, +>; type ListSplitsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< ListSplitsRequest, @@ -2671,6 +2766,7 @@ pub struct MetastoreServiceTowerLayerStack { indexes_metadata_layers: Vec, list_indexes_metadata_layers: Vec, delete_index_layers: Vec, + list_index_stats_layers: Vec, list_splits_layers: Vec, stage_splits_layers: Vec, publish_splits_layers: Vec, @@ -2853,6 +2949,31 @@ impl MetastoreServiceTowerLayerStack { crate::metastore::MetastoreError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexStatsRequest, + ListIndexStatsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + ListIndexStatsRequest, + Response = ListIndexStatsResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< ListSplitsRequest, @@ -3532,6 +3653,8 @@ impl MetastoreServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.delete_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_index_stats_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.list_splits_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.stage_splits_layers @@ -3703,6 +3826,25 @@ impl MetastoreServiceTowerLayerStack { self.delete_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_list_index_stats_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexStatsRequest, + ListIndexStatsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + ListIndexStatsRequest, + Response = ListIndexStatsResponse, + Error = crate::metastore::MetastoreError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.list_index_stats_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn stack_list_splits_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -4332,6 +4474,14 @@ impl MetastoreServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let list_index_stats_svc = self + .list_index_stats_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let list_splits_svc = self .list_splits_layers .into_iter() @@ -4548,6 +4698,7 @@ impl MetastoreServiceTowerLayerStack { indexes_metadata_svc, list_indexes_metadata_svc, delete_index_svc, + list_index_stats_svc, list_splits_svc, stage_splits_svc, publish_splits_svc, @@ -4689,6 +4840,12 @@ where Error = crate::metastore::MetastoreError, Future = BoxFuture, > + + tower::Service< + ListIndexStatsRequest, + Response = ListIndexStatsResponse, + Error = crate::metastore::MetastoreError, + Future = BoxFuture, + > + tower::Service< ListSplitsRequest, Response = MetastoreServiceStream, @@ -4903,6 +5060,12 @@ where ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.clone().call(request).await + } async fn list_splits( &self, request: ListSplitsRequest, @@ -5190,6 +5353,20 @@ where DeleteIndexRequest::rpc_name(), )) } + async fn list_index_stats( + &self, + request: ListIndexStatsRequest, + ) -> crate::metastore::MetastoreResult { + self.inner + .clone() + .list_index_stats(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + ListIndexStatsRequest::rpc_name(), + )) + } async fn list_splits( &self, request: ListSplitsRequest, @@ -5661,6 +5838,17 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn list_index_stats( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .list_index_stats(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } type ListSplitsStream = quickwit_common::ServiceStream< tonic::Result, >; @@ -6269,6 +6457,36 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Returns a list of size info for each index. + pub async fn list_index_stats( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.metastore.MetastoreService/ListIndexStats", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "ListIndexStats", + ), + ); + self.inner.unary(req, path, codec).await + } /// Streams splits from index. pub async fn list_splits( &mut self, @@ -7068,6 +7286,14 @@ pub mod metastore_service_grpc_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Returns a list of size info for each index. + async fn list_index_stats( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ListSplits method. type ListSplitsStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, @@ -7667,6 +7893,55 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } + "/quickwit.metastore.MetastoreService/ListIndexStats" => { + #[allow(non_camel_case_types)] + struct ListIndexStatsSvc(pub Arc); + impl< + T: MetastoreServiceGrpc, + > tonic::server::UnaryService + for ListIndexStatsSvc { + type Response = super::ListIndexStatsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_index_stats( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListIndexStatsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.metastore.MetastoreService/ListSplits" => { #[allow(non_camel_case_types)] struct ListSplitsSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index a855ba5a665..ba371c13d4a 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -387,6 +387,13 @@ impl ListDeleteTasksRequest { } } +impl SplitStats { + pub fn add_split(&mut self, size_bytes: u64) { + self.num_splits += 1; + self.total_size_bytes += size_bytes; + } +} + pub mod serde_utils { use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize};