Skip to content
Merged
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use warp::{Filter, Rejection};

use super::model::{
CatIndexQueryParams, DeleteQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
IndexMappingQueryParams, MultiSearchQueryParams, SearchQueryParamsCount,
};
use crate::Body;
use crate::decompression::get_body_bytes;
Expand Down Expand Up @@ -285,9 +285,10 @@ pub(crate) fn elastic_aliases_filter() -> impl Filter<Extract = (), Error = Reje
}

pub(crate) fn elastic_index_mapping_filter()
-> impl Filter<Extract = (String,), Error = Rejection> + Clone {
-> impl Filter<Extract = (String, IndexMappingQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_mapping")
.or(warp::path!("_elastic" / String / "_mappings"))
.unify()
.and(warp::get())
.and(warp::query())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// Query parameters for `_mapping(s)`. Unknown params are silently ignored.
///
/// Timestamps (`start_timestamp`, `end_timestamp`) are epoch seconds,
/// half-open `[start, end)`, forwarded to `ListFieldsRequest` to prune splits.
/// `field_patterns` is a comma-separated list mirroring
/// `ListFieldsRequest.field_patterns`, pushed down to the leaves for
/// dynamic-field filtering.
#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct IndexMappingQueryParams {
#[serde(default)]
pub start_timestamp: Option<i64>,
#[serde(default)]
pub end_timestamp: Option<i64>,
/// Accepts both `field_patterns` (Quickwit) and `fields` (ES-compatible).
#[serde(default, alias = "fields")]
pub field_patterns: Option<String>,
}

impl IndexMappingQueryParams {
/// Splits `field_patterns` on commas, trims, and drops empties.
/// Returns an empty `Vec` when the parameter is absent or blank.
pub fn field_patterns(&self) -> Vec<String> {
self.field_patterns
.as_deref()
.unwrap_or_default()
.split(',')
.filter_map(|pattern| {
let trimmed = pattern.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect()
}
}

#[cfg(test)]
mod tests {
use super::IndexMappingQueryParams;

#[test]
fn empty_query_string_yields_none() {
let params: IndexMappingQueryParams = serde_qs::from_str("").unwrap();
assert!(params.start_timestamp.is_none());
assert!(params.end_timestamp.is_none());
assert!(params.field_patterns.is_none());
}

#[test]
fn both_params_present_yield_some() {
let qs = "start_timestamp=1712160204&end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert_eq!(params.end_timestamp, Some(1712764984));
assert!(params.field_patterns.is_none());
}

#[test]
fn only_start_timestamp_present() {
let qs = "start_timestamp=1712160204";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert!(params.end_timestamp.is_none());
}

#[test]
fn only_end_timestamp_present() {
let qs = "end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert!(params.start_timestamp.is_none());
assert_eq!(params.end_timestamp, Some(1712764984));
}

#[test]
fn unknown_field_is_ignored() {
let qs = "start_timestamp=1&pretty=true&ignore_unavailable=true";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert!(params.end_timestamp.is_none());
assert!(params.field_patterns.is_none());
}

#[test]
fn field_patterns_param_present() {
let qs = "field_patterns=host,message,status";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(
params.field_patterns.as_deref(),
Some("host,message,status")
);
}

#[test]
fn field_patterns_combined_with_timestamps() {
let qs = "start_timestamp=1&end_timestamp=2&field_patterns=host";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert_eq!(params.end_timestamp, Some(2));
assert_eq!(params.field_patterns.as_deref(), Some("host"));
}

#[test]
fn empty_field_patterns_value() {
// `serde_qs` collapses an empty value to `None`.
let qs = "field_patterns=";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert!(params.field_patterns.is_none());
}

#[test]
fn es_compatible_fields_alias_accepted() {
// ES clients send `?fields=` — must map to the same field as `field_patterns`.
let qs = "fields=host,message";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.field_patterns.as_deref(), Some("host,message"));
}

#[test]
fn fields_alias_combined_with_timestamps() {
let qs = "start_timestamp=1&end_timestamp=2&fields=host";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert_eq!(params.end_timestamp, Some(2));
assert_eq!(params.field_patterns.as_deref(), Some("host"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ enum FieldMapping {
}

impl ElasticsearchMappingsResponse {
/// Builds a response from declared doc-mapping field mappings, optionally
/// merged with dynamic fields from a `ListFields` response.
pub fn from_doc_mapping(
indexes_metadata: Vec<IndexMetadata>,
list_fields_response: Option<&ListFieldsResponse>,
Expand Down Expand Up @@ -272,10 +274,10 @@ mod tests {
assert_eq!(meta["properties"]["source"]["type"], "keyword");
}

use quickwit_proto::search::ListFieldsEntry;

#[test]
fn test_merge_dynamic_fields_skips_existing_and_internal() {
use quickwit_proto::search::ListFieldsEntry;

let mut properties = HashMap::new();
properties.insert("title".to_string(), FieldMapping::Leaf { typ: "text" });

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod bulk_query_params;
mod cat_indices;
mod error;
mod field_capability;
mod index_mapping_query_params;
mod mappings;
mod multi_search;
mod scroll;
Expand All @@ -36,6 +37,7 @@ pub use field_capability::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse,
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
};
pub use index_mapping_query_params::IndexMappingQueryParams;
pub(crate) use mappings::ElasticsearchMappingsResponse;
pub use multi_search::{
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
Expand Down
30 changes: 20 additions & 10 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use super::model::{
CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError,
ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse,
ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams,
FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams,
MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody,
SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
FieldCapabilityRequestBody, FieldCapabilityResponse, IndexMappingQueryParams,
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
ScrollQueryParams, SearchBody, SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
};
use super::{TrackTotalHits, make_elastic_api_response};
Expand Down Expand Up @@ -199,8 +199,12 @@ async fn get_index_metadata(
Ok(index_metadata)
}

/// `_mapping(s)` handler. Pushes `field_patterns`, `start_timestamp`, and
/// `end_timestamp` down to `root_list_fields` so splits can be pruned and
/// dynamic fields filtered at the leaves.
pub(crate) async fn es_compat_index_mapping(
index_id: String,
params: IndexMappingQueryParams,
mut metastore: MetastoreServiceClient,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticsearchMappingsResponse, ElasticsearchError> {
Expand All @@ -214,17 +218,23 @@ pub(crate) async fn es_compat_index_mapping(
.iter()
.map(|m| m.index_id().to_string())
.collect();

let list_fields_request = quickwit_proto::search::ListFieldsRequest {
index_id_patterns,
field_patterns: Vec::new(),
start_timestamp: None,
end_timestamp: None,
field_patterns: params.field_patterns(),
Comment thread
congx4 marked this conversation as resolved.
start_timestamp: params.start_timestamp,
end_timestamp: params.end_timestamp,
query_ast: None,
};
let list_fields_response = search_service
.root_list_fields(list_fields_request)
.await
.ok();
let list_fields_response = match search_service.root_list_fields(list_fields_request).await {
Ok(response) => Some(response),
// Bad field pattern supplied by the caller — surface as 400.
Err(err @ SearchError::InvalidArgument(_)) => {
return Err(ElasticsearchError::from(err));
}
// Infrastructure / timeout failures degrade gracefully.
Err(_) => None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve remote bad field-pattern errors

In a multi-node deployment where root_list_fields sends field_patterns to a remote leaf, an invalid pattern such as a*b* is produced by the leaf as SearchError::InvalidArgument, but the gRPC search client currently falls back to SearchError::Internal when parsing the tonic status message (quickwit-search/src/error.rs::parse_grpc_error). Fresh evidence after the .ok() fix is that this catch-all still swallows that remote bad-pattern error here, so the _mapping?fields=a*b* request can succeed with only declared mappings instead of returning the intended 400.

Useful? React with 👍 / 👎.

};
let response = ElasticsearchMappingsResponse::from_doc_mapping(
indexes_metadata,
list_fields_response.as_ref(),
Expand Down
Loading