Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX IF EXISTS idx_splits_stats;

ALTER TABLE splits DROP COLUMN IF EXISTS split_size_bytes;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE splits ADD COLUMN IF NOT EXISTS split_size_bytes BIGINT NOT NULL GENERATED ALWAYS AS ((split_metadata_json::json->'footer_offsets'->>'end')::bigint) STORED;

CREATE INDEX IF NOT EXISTS idx_splits_stats ON splits (index_uid, split_state) INCLUDE (split_size_bytes);
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ use quickwit_proto::metastore::{
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse,
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse,
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream,
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult,
MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest,
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
};

/// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can
Expand Down Expand Up @@ -162,6 +163,13 @@ impl MetastoreService for ControlPlaneMetastore {
self.metastore.list_splits(request).await
}

async fn list_index_stats(
&self,
request: ListIndexStatsRequest,
) -> MetastoreResult<ListIndexStatsResponse> {
self.metastore.list_index_stats(request).await
}

async fn list_stale_splits(
&self,
request: ListStaleSplitsRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use quickwit_config::{
};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
DeleteShardsResponse, DeleteTask, EntityKind, IndexStats, ListShardsSubrequest,
ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest,
OpenShardSubresponse, PruneShardsRequest, SplitStats,
};
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -497,6 +498,34 @@ impl FileBackedIndex {
Ok(())
}

/// Gets IndexStats for this index
pub(crate) fn get_stats(&self) -> MetastoreResult<IndexStats> {
let mut staged_stats = SplitStats::default();
let mut published_stats = SplitStats::default();
let mut marked_for_deletion_stats = SplitStats::default();

for split in self.splits.values() {
match split.split_state {
SplitState::Staged => {
staged_stats.add_split(split.split_metadata.footer_offsets.end)
}
SplitState::Published => {
published_stats.add_split(split.split_metadata.footer_offsets.end)
}
SplitState::MarkedForDeletion => {
marked_for_deletion_stats.add_split(split.split_metadata.footer_offsets.end)
}
}
}

Ok(IndexStats {
index_uid: Some(self.index_uid().clone()),
staged: Some(staged_stats),
published: Some(published_stats),
marked_for_deletion: Some(marked_for_deletion_stats),
})
}

/// Adds a source.
pub(crate) fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> {
let index_uid = self.index_uid().clone();
Expand Down Expand Up @@ -764,16 +793,16 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};

use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::ListShardsSubrequest;
use quickwit_proto::metastore::{ListShardsSubrequest, SplitStats};
use quickwit_proto::types::{IndexUid, SourceId};

use super::FileBackedIndex;
use crate::file_backed::file_backed_index::split_query_predicate;
use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState};
use crate::{IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState};

impl FileBackedIndex {
pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec<Shard>) {
Expand Down Expand Up @@ -804,6 +833,7 @@ mod tests {
time_range: Some(32..=40),
tags: BTreeSet::from(["tag-1".to_string()]),
create_timestamp: 12,
footer_offsets: 0..2048,
..Default::default()
},
split_state: SplitState::Staged,
Expand All @@ -817,6 +847,7 @@ mod tests {
time_range: None,
tags: BTreeSet::from(["tag-2".to_string(), "tag-3".to_string()]),
create_timestamp: 5,
footer_offsets: 0..1024,
..Default::default()
},
split_state: SplitState::MarkedForDeletion,
Expand All @@ -830,6 +861,7 @@ mod tests {
time_range: Some(0..=90),
tags: BTreeSet::from(["tag-2".to_string(), "tag-4".to_string()]),
create_timestamp: 64,
footer_offsets: 0..512,
..Default::default()
},
split_state: SplitState::Published,
Expand Down Expand Up @@ -944,4 +976,30 @@ mod tests {
assert!(!split_query_predicate(&&split_2, &query));
assert!(!split_query_predicate(&&split_3, &query));
}

#[test]
fn test_get_stats() {
let index_id = "test-index";
let index_metadata = IndexMetadata::for_test(index_id, "file:///qwdata/indexes/test-index");
let index =
FileBackedIndex::new(index_metadata, make_splits().into(), HashMap::new(), vec![]);

let expected_staged = Some(SplitStats {
num_splits: 1,
total_size_bytes: 2048,
});
let expected_published = Some(SplitStats {
num_splits: 1,
total_size_bytes: 512,
});
let expected_marked_for_deletion = Some(SplitStats {
num_splits: 1,
total_size_bytes: 1024,
});
let stats = index.get_stats().unwrap();

assert_eq!(stats.staged, expected_staged);
assert_eq!(stats.published, expected_published);
assert_eq!(stats.marked_for_deletion, expected_marked_for_deletion);
}
}
61 changes: 53 additions & 8 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ use quickwit_proto::metastore::{
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
serde_utils,
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -809,6 +809,51 @@ impl MetastoreService for FileBackedMetastore {
Ok(ServiceStream::new(splits_responses_stream))
}

async fn list_index_stats(
&self,
request: ListIndexStatsRequest,
) -> MetastoreResult<ListIndexStatsResponse> {
let index_id_matcher =
IndexIdMatcher::try_from_index_id_patterns(&request.index_id_patterns)?;
let index_ids: Vec<IndexId> = {
let inner_rlock_guard = self.state.read().await;
inner_rlock_guard
.indexes
.iter()
.filter_map(|(index_id, index_state)| match index_state {
LazyIndexStatus::Active(_) if index_id_matcher.is_match(index_id) => {
Some(index_id)
}
_ => None,
})
.cloned()
.collect()
};

let mut index_read_futures = FuturesUnordered::new();
for index_id in index_ids {
let index_read_future = async move {
self.read_any(&index_id, None, |index| index.get_stats())
.await
};
index_read_futures.push(index_read_future);
}

let mut index_stats = Vec::new();
while let Some(index_read_result) = index_read_futures.next().await {
match index_read_result {
Ok(stats) => index_stats.push(stats),
Err(MetastoreError::NotFound(_)) => {
// If the index does not exist, we just skip it.
continue;
}
Err(error) => return Err(error),
}
}

Ok(ListIndexStatsResponse { index_stats })
}

async fn list_stale_splits(
&self,
request: ListStaleSplitsRequest,
Expand Down
86 changes: 82 additions & 4 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::fmt::{self, Write};
use std::str::FromStr;
use std::time::Duration;

use async_trait::async_trait;
Expand All @@ -34,15 +35,16 @@ use quickwit_proto::metastore::{
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse,
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse, serde_utils,
};
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
Expand Down Expand Up @@ -904,6 +906,82 @@ impl MetastoreService for PostgresqlMetastore {
Ok(service_stream)
}

async fn list_index_stats(
&self,
request: ListIndexStatsRequest,
) -> MetastoreResult<ListIndexStatsResponse> {
let index_pattern_sql = build_index_id_patterns_sql_query(&request.index_id_patterns)
.map_err(|error| MetastoreError::Internal {
message: "failed to build `list_index_stats` SQL query".to_string(),
cause: error.to_string(),
})?;
let sql = format!(
"SELECT
i.index_uid,
s.split_state,
COUNT(s.split_state) AS num_splits,
COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes
FROM ({index_pattern_sql}) i
LEFT JOIN splits s ON s.index_uid = i.index_uid
GROUP BY i.index_uid, s.split_state"
);

let rows: Vec<(String, Option<String>, i64, i64)> = sqlx::query_as(&sql)
.fetch_all(&self.connection_pool)
.await?;

let mut index_stats = HashMap::new();
for (index_uid_str, split_state, num_splits, total_size_bytes) in rows {
let Ok(index_uid) = IndexUid::from_str(&index_uid_str) else {
return Err(MetastoreError::Internal {
message: "failed to parse index_uid".to_string(),
cause: index_uid_str.to_string(),
});
};
let stats = index_stats
.entry(index_uid_str)
.or_insert_with(|| IndexStats {
index_uid: Some(index_uid),
staged: Some(SplitStats::default()),
published: Some(SplitStats::default()),
marked_for_deletion: Some(SplitStats::default()),
});
let num_splits = num_splits as u64;
let total_size_bytes = total_size_bytes as u64;
match split_state.as_deref() {
Some("Staged") => {
stats.staged = Some(SplitStats {
num_splits,
total_size_bytes,
});
}
Some("Published") => {
stats.published = Some(SplitStats {
num_splits,
total_size_bytes,
});
}
Some("MarkedForDeletion") => {
stats.marked_for_deletion = Some(SplitStats {
num_splits,
total_size_bytes,
});
}
None => {} // if an index has no splits, we can keep the defaults
Some(split_state) => {
return Err(MetastoreError::Internal {
message: "invalid split state".to_string(),
cause: split_state.to_string(),
});
}
}
}

Ok(ListIndexStatsResponse {
index_stats: index_stats.into_values().collect(),
})
}

#[instrument(skip(self))]
async fn mark_splits_for_deletion(
&self,
Expand Down
Loading