Skip to content
12 changes: 12 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ service SearchService {

// Describe how a search would be processed.
rpc SearchPlan(SearchRequest) returns (SearchPlanResponse);

// Returns the current load of this searcher node.
rpc GetLoad(GetLoadRequest) returns (GetLoadResponse);
}

/// Scroll Request
Expand Down Expand Up @@ -110,6 +113,15 @@ message ReportSplitsRequest {

message ReportSplitsResponse {}

message GetLoadRequest {}

message GetLoadResponse {
// Current load expressed as the sum of job costs (same arbitrary unit as
// Job::cost() in the search job placer) across all queued and active tasks
// in the SearchPermitProvider.
uint64 load_job_cost = 1;
}

// -- ListFields -------------------

message ListFieldsRequest {
Expand Down
87 changes: 87 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions quickwit/quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ enum SearchServiceClientImpl {
pub struct SearchServiceClient {
client_impl: SearchServiceClientImpl,
grpc_addr: SocketAddr,
/// In test/testsuite builds, overrides the load returned by `get_load()` for local clients,
/// so that tests using mock services don't need to set up `get_load` expectations.
#[cfg(any(test, feature = "testsuite"))]
test_load: Option<usize>,
}

impl fmt::Debug for SearchServiceClient {
Expand All @@ -75,6 +79,8 @@ impl SearchServiceClient {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Grpc(client),
grpc_addr,
#[cfg(any(test, feature = "testsuite"))]
test_load: None,
}
}

Expand All @@ -83,9 +89,21 @@ impl SearchServiceClient {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Local(service),
grpc_addr,
#[cfg(any(test, feature = "testsuite"))]
test_load: None,
}
}

/// Sets the load to return from `get_load()` for this client in test/testsuite builds.
///
/// This short-circuits the call to the underlying service so that mock services
/// do not need to set up `get_load` expectations in tests unrelated to load-aware placement.
#[cfg(any(test, feature = "testsuite"))]
pub fn with_test_load(mut self, load: usize) -> Self {
self.test_load = Some(load);
self
}

/// Return the grpc_addr the underlying client connects to.
pub fn grpc_addr(&self) -> SocketAddr {
self.grpc_addr
Expand Down Expand Up @@ -219,6 +237,32 @@ impl SearchServiceClient {
Ok(())
}

/// Returns the current load of the targeted node, expressed as the sum of job costs
/// across all queued and active tasks in its SearchPermitProvider.
pub async fn get_load(&mut self) -> crate::Result<usize> {
// In test/testsuite builds, short-circuit for local clients so that mock services
// do not need a `get_load` expectation in tests unrelated to load-aware placement.
#[cfg(any(test, feature = "testsuite"))]
if let SearchServiceClientImpl::Local(_) = &self.client_impl {
return Ok(self.test_load.unwrap_or(0));
}
match &mut self.client_impl {
SearchServiceClientImpl::Local(service) => Ok(service.get_load().await),
SearchServiceClientImpl::Grpc(grpc_client) => {
match grpc_client
.get_load(quickwit_proto::search::GetLoadRequest {})
.await
{
Ok(response) => Ok(response.into_inner().load_job_cost as usize),
// Older searcher nodes do not implement `get_load`. To
// preserve a smooth upgrade path, treat them as unloaded
Err(tonic_error) if tonic_error.code() == tonic::Code::Unimplemented => Ok(0),
Err(tonic_error) => Err(parse_grpc_error(&tonic_error)),
}
}
}
}

/// Indexers call report_splits to inform searchers node about the presence of a split, which
/// would then be considered as a candidate for the searcher split cache.
pub async fn report_splits(&mut self, report_splits_request: ReportSplitsRequest) {
Expand Down
13 changes: 9 additions & 4 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,15 +1622,20 @@ async fn schedule_search_tasks(
mut splits: Vec<(SplitIdAndFooterOffsets, SearchRequest)>,
searcher_context: &SearcherContext,
) -> ScheduleSearchTaskResult {
let permit_sizes: Vec<ByteSize> = splits
let task_metadata: Vec<crate::search_permit_provider::SplitSearchTaskMetadata> = splits
.iter()
.map(|(split, _)| {
compute_initial_memory_allocation(
let memory_allocation = compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
);
let job_cost = crate::root::compute_split_cost(split.num_docs);
crate::search_permit_provider::SplitSearchTaskMetadata {
memory_allocation,
job_cost,
}
})
.collect();

Expand All @@ -1644,7 +1649,7 @@ async fn schedule_search_tasks(

let search_permit_futures = searcher_context
.search_permit_provider
.get_permits_with_offload(permit_sizes, offload_threshold)
.get_permits_with_offload(task_metadata, offload_threshold)
.await;

let splits_to_run_on_lambda: Vec<(SplitIdAndFooterOffsets, SearchRequest)> =
Expand Down
14 changes: 9 additions & 5 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Bound;
use std::sync::Arc;

use anyhow::Context;
use bytesize::ByteSize;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::pretty::PrettySample;
Expand Down Expand Up @@ -329,23 +328,28 @@ pub async fn leaf_list_terms(
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let permit_sizes: Vec<ByteSize> = splits
let task_metadata: Vec<crate::search_permit_provider::SplitSearchTaskMetadata> = splits
.iter()
.map(|split| {
compute_initial_memory_allocation(
let memory_allocation = compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
);
let job_cost = crate::root::compute_split_cost(split.num_docs);
crate::search_permit_provider::SplitSearchTaskMetadata {
memory_allocation,
job_cost,
}
})
.collect();
// We have added offloading leaf search to lambdas, but not for list_terms yet.
// TODO (Add it)
// https://github.com/quickwit-oss/quickwit/issues/6150
let permits = searcher_context
.search_permit_provider
.get_permits(permit_sizes)
.get_permits(task_metadata)
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<'a> From<&'a SplitMetadata> for SearchJob {
fn from(split_metadata: &'a SplitMetadata) -> Self {
SearchJob {
index_uid: split_metadata.index_uid.clone(),
cost: compute_split_cost(split_metadata),
cost: compute_split_cost(split_metadata.num_docs as u64),
offsets: extract_split_and_footer_offsets(split_metadata),
}
}
Expand Down Expand Up @@ -1752,18 +1752,21 @@ async fn assign_client_fetch_docs_jobs(
fetch_docs_req_jobs.push(fetch_docs_job);
}

// don't do a second call to GetLoad to place fetch_docs jobs
let assigned_jobs = client_pool
.assign_jobs(fetch_docs_req_jobs, &HashSet::new())
.assign_jobs_ignoring_load(fetch_docs_req_jobs, &HashSet::new())
.await?;

Ok(assigned_jobs)
}

// Measure the cost associated to searching in a given split metadata.
fn compute_split_cost(split_metadata: &SplitMetadata) -> usize {
pub(crate) fn compute_split_cost(num_docs: u64) -> usize {
// TODO this formula could be tuned a lot more. The general idea is that there is a fixed
// cost to searching a split, plus a somewhat-linear cost depending on the size of the split
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should include if we have aggregations or not, or ideally a cost from the query

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure how to factor that in exactly, we don't have a good cost model for aggregations.

A simple count() group by X on a low cardinality field isn't usually very expensive, but a cardinality(Y) group by Z with both being high cardinality is much more expensive, though i have no idea of by how-much. And this also depends on the selectivity of the query

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would probably have a very rough estimate on the query AST. Can you add the TODO to eventually track aggregation cost and estimate query selectivity?

5 + split_metadata.num_docs / 100_000
// This should also factor the query shape (is it an expensive filter, is it an expensive
// aggregation...)
5 + (num_docs / 100_000) as usize
}

/// Builds a LeafSearchRequest to one node, from a list of [`SearchJob`].
Expand Down
Loading
Loading