-
Notifications
You must be signed in to change notification settings - Fork 544
feat(_mapping): timestamp pushdown + column-hints fast path (on top of #6439) #6443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2aebb39
2f719f6
2c2568e
7aaa932
b208d21
010d7f2
4d9a120
46e8ce9
d21005b
31023d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| // Copyright 2021-Present Datadog, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| /// Query parameters for `_mapping(s)`. Unknown params are silently ignored. | ||
| /// | ||
| /// Timestamps (`start_timestamp`, `end_timestamp`) are epoch seconds, | ||
| /// half-open `[start, end)`, forwarded to `ListFieldsRequest` to prune splits. | ||
| /// `field_patterns` is a comma-separated list mirroring | ||
| /// `ListFieldsRequest.field_patterns`, pushed down to the leaves for | ||
| /// dynamic-field filtering. | ||
| #[serde_with::skip_serializing_none] | ||
| #[derive(Default, Debug, Clone, Serialize, Deserialize)] | ||
| pub struct IndexMappingQueryParams { | ||
| #[serde(default)] | ||
| pub start_timestamp: Option<i64>, | ||
| #[serde(default)] | ||
| pub end_timestamp: Option<i64>, | ||
| /// Accepts both `field_patterns` (Quickwit) and `fields` (ES-compatible). | ||
| #[serde(default, alias = "fields")] | ||
| pub field_patterns: Option<String>, | ||
| } | ||
|
|
||
| impl IndexMappingQueryParams { | ||
| /// Splits `field_patterns` on commas, trims, and drops empties. | ||
| /// Returns an empty `Vec` when the parameter is absent or blank. | ||
| pub fn field_patterns(&self) -> Vec<String> { | ||
| self.field_patterns | ||
| .as_deref() | ||
| .unwrap_or_default() | ||
| .split(',') | ||
| .filter_map(|pattern| { | ||
| let trimmed = pattern.trim(); | ||
| if trimmed.is_empty() { | ||
| None | ||
| } else { | ||
| Some(trimmed.to_string()) | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::IndexMappingQueryParams; | ||
|
|
||
| #[test] | ||
| fn empty_query_string_yields_none() { | ||
| let params: IndexMappingQueryParams = serde_qs::from_str("").unwrap(); | ||
| assert!(params.start_timestamp.is_none()); | ||
| assert!(params.end_timestamp.is_none()); | ||
| assert!(params.field_patterns.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn both_params_present_yield_some() { | ||
| let qs = "start_timestamp=1712160204&end_timestamp=1712764984"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.start_timestamp, Some(1712160204)); | ||
| assert_eq!(params.end_timestamp, Some(1712764984)); | ||
| assert!(params.field_patterns.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn only_start_timestamp_present() { | ||
| let qs = "start_timestamp=1712160204"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.start_timestamp, Some(1712160204)); | ||
| assert!(params.end_timestamp.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn only_end_timestamp_present() { | ||
| let qs = "end_timestamp=1712764984"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert!(params.start_timestamp.is_none()); | ||
| assert_eq!(params.end_timestamp, Some(1712764984)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn unknown_field_is_ignored() { | ||
| let qs = "start_timestamp=1&pretty=true&ignore_unavailable=true"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.start_timestamp, Some(1)); | ||
| assert!(params.end_timestamp.is_none()); | ||
| assert!(params.field_patterns.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn field_patterns_param_present() { | ||
| let qs = "field_patterns=host,message,status"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!( | ||
| params.field_patterns.as_deref(), | ||
| Some("host,message,status") | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn field_patterns_combined_with_timestamps() { | ||
| let qs = "start_timestamp=1&end_timestamp=2&field_patterns=host"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.start_timestamp, Some(1)); | ||
| assert_eq!(params.end_timestamp, Some(2)); | ||
| assert_eq!(params.field_patterns.as_deref(), Some("host")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn empty_field_patterns_value() { | ||
| // `serde_qs` collapses an empty value to `None`. | ||
| let qs = "field_patterns="; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert!(params.field_patterns.is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn es_compatible_fields_alias_accepted() { | ||
| // ES clients send `?fields=` — must map to the same field as `field_patterns`. | ||
| let qs = "fields=host,message"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.field_patterns.as_deref(), Some("host,message")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn fields_alias_combined_with_timestamps() { | ||
| let qs = "start_timestamp=1&end_timestamp=2&fields=host"; | ||
| let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap(); | ||
| assert_eq!(params.start_timestamp, Some(1)); | ||
| assert_eq!(params.end_timestamp, Some(2)); | ||
| assert_eq!(params.field_patterns.as_deref(), Some("host")); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,9 +58,9 @@ use super::model::{ | |
| CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError, | ||
| ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse, | ||
| ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams, | ||
| FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, | ||
| MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, | ||
| SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, | ||
| FieldCapabilityRequestBody, FieldCapabilityResponse, IndexMappingQueryParams, | ||
| MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, | ||
| ScrollQueryParams, SearchBody, SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, | ||
| build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, | ||
| }; | ||
| use super::{TrackTotalHits, make_elastic_api_response}; | ||
|
|
@@ -199,8 +199,12 @@ async fn get_index_metadata( | |
| Ok(index_metadata) | ||
| } | ||
|
|
||
| /// `_mapping(s)` handler. Pushes `field_patterns`, `start_timestamp`, and | ||
| /// `end_timestamp` down to `root_list_fields` so splits can be pruned and | ||
| /// dynamic fields filtered at the leaves. | ||
| pub(crate) async fn es_compat_index_mapping( | ||
| index_id: String, | ||
| params: IndexMappingQueryParams, | ||
| mut metastore: MetastoreServiceClient, | ||
| search_service: Arc<dyn SearchService>, | ||
| ) -> Result<ElasticsearchMappingsResponse, ElasticsearchError> { | ||
|
|
@@ -214,17 +218,23 @@ pub(crate) async fn es_compat_index_mapping( | |
| .iter() | ||
| .map(|m| m.index_id().to_string()) | ||
| .collect(); | ||
|
|
||
| let list_fields_request = quickwit_proto::search::ListFieldsRequest { | ||
| index_id_patterns, | ||
| field_patterns: Vec::new(), | ||
| start_timestamp: None, | ||
| end_timestamp: None, | ||
| field_patterns: params.field_patterns(), | ||
| start_timestamp: params.start_timestamp, | ||
| end_timestamp: params.end_timestamp, | ||
| query_ast: None, | ||
| }; | ||
| let list_fields_response = search_service | ||
| .root_list_fields(list_fields_request) | ||
| .await | ||
| .ok(); | ||
| let list_fields_response = match search_service.root_list_fields(list_fields_request).await { | ||
| Ok(response) => Some(response), | ||
| // Bad field pattern supplied by the caller — surface as 400. | ||
| Err(err @ SearchError::InvalidArgument(_)) => { | ||
| return Err(ElasticsearchError::from(err)); | ||
| } | ||
| // Infrastructure / timeout failures degrade gracefully. | ||
| Err(_) => None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In a multi-node deployment where Useful? React with 👍 / 👎. |
||
| }; | ||
| let response = ElasticsearchMappingsResponse::from_doc_mapping( | ||
| indexes_metadata, | ||
| list_fields_response.as_ref(), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.