diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 231d9e5c542..3861a0e5900 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -159,6 +159,7 @@ clap_builder,https://github.com/clap-rs/clap,MIT OR Apache-2.0,The clap_builder clap_lex,https://github.com/clap-rs/clap,MIT OR Apache-2.0,The clap_lex Authors cmac,https://github.com/RustCrypto/MACs,MIT OR Apache-2.0,RustCrypto Developers cmov,https://github.com/RustCrypto/utils,Apache-2.0 OR MIT,RustCrypto Developers +cmsketch,https://github.com/mrcroxx/cmsketch-rs,Apache-2.0,MrCroxx coarsetime,https://github.com/jedisct1/rust-coarsetime,BSD-2-Clause,Frank Denis cobs,https://github.com/jamesmunns/cobs.rs,MIT OR Apache-2.0,"Allen Welkie <>, James Munns " codespan-reporting,https://github.com/brendanzab/codespan,Apache-2.0,Brendan Zabarauskas @@ -182,6 +183,7 @@ constant_time_eq,https://github.com/cesarb/constant_time_eq,CC0-1.0 OR MIT-0 OR convert_case,https://github.com/rutrum/convert-case,MIT,rutrum core-foundation,https://github.com/servo/core-foundation-rs,MIT OR Apache-2.0,The Servo Project Developers core-foundation-sys,https://github.com/servo/core-foundation-rs,MIT OR Apache-2.0,The Servo Project Developers +core_affinity,https://github.com/Elzair/core_affinity_rs,MIT OR Apache-2.0,Philip Woods cpp_demangle,https://github.com/gimli-rs/cpp_demangle,MIT OR Apache-2.0,"Nick Fitzgerald , Jim Blandy , Kyle Huey " cpufeatures,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers crc,https://github.com/mrhooray/crc-rs,MIT OR Apache-2.0,"Rui Hu , Akhil Velagapudi <4@4khil.com>" @@ -297,6 +299,7 @@ event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,"Stje event-listener-strategy,https://github.com/smol-rs/event-listener-strategy,Apache-2.0 OR MIT,John Nunley fail,https://github.com/tikv/fail-rs,Apache-2.0,The TiKV Project Developers fancy-regex,https://github.com/fancy-regex/fancy-regex,MIT,"Raph Levien , Robin Stocker , Keith Hall " +fastant,https://github.com/fast/fastant,MIT,The fastant Authors fastdivide,https://github.com/fulmicoton/fastdivide,zlib-acknowledgement OR MIT,Paul Masurel fastrand,https://github.com/smol-rs/fastrand,Apache-2.0 OR MIT,Stjepan Glavina ff,https://github.com/zkcrypto/ff,MIT OR Apache-2.0,"Sean Bowe , Jack Grigg " @@ -314,6 +317,12 @@ foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters foreign-types-shared,https://github.com/sfackler/foreign-types,MIT OR Apache-2.0,Steven Fackler form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers +foyer,https://github.com/foyer-rs/foyer,Apache-2.0,MrCroxx +foyer-common,https://github.com/foyer-rs/foyer,Apache-2.0,MrCroxx +foyer-intrusive-collections,https://github.com/foyer-rs/intrusive-rs,Apache-2.0 OR MIT,Amanieu d'Antras +foyer-memory,https://github.com/foyer-rs/foyer,Apache-2.0,MrCroxx +foyer-storage,https://github.com/foyer-rs/foyer,Apache-2.0,MrCroxx +foyer-tokio,https://github.com/foyer-rs/foyer,Apache-2.0,MrCroxx fraction,https://github.com/dnsl48/fraction,MIT OR Apache-2.0,dnsl48 fragile,https://github.com/mitsuhiko/fragile,Apache-2.0,Armin Ronacher fs4,https://github.com/al8n/fs4-rs,MIT OR Apache-2.0,"Dan Burkert , Al Liu " @@ -400,6 +409,7 @@ inout,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developer instant,https://github.com/sebcrozet/instant,BSD-3-Clause,sebcrozet integer-encoding,https://github.com/dermesser/integer-encoding-rs,MIT,Lewin Bormann inventory,https://github.com/dtolnay/inventory,MIT OR Apache-2.0,David Tolnay +io-uring,https://github.com/tokio-rs/io-uring,MIT OR Apache-2.0,quininer ipcrypt-rs,https://github.com/jedisct1/rust-ipcrypt2,ISC,Frank Denis ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price ipnetwork,https://github.com/achanda/ipnetwork,MIT OR Apache-2.0,"Abhishek Chanda , Linus Färnstrand " @@ -457,6 +467,7 @@ mea,https://github.com/fast/mea,Apache-2.0,The mea Authors measure_time,https://github.com/PSeitz/rust_measure_time,MIT,Pascal Seitz memchr,https://github.com/BurntSushi/memchr,Unlicense OR MIT,"Andrew Gallant , bluss" memmap2,https://github.com/RazrFalcon/memmap2-rs,MIT OR Apache-2.0,"Dan Burkert , Yevhenii Reizner , The Contributors" +memoffset,https://github.com/Gilnaa/memoffset,MIT,Gilad Naaman mime,https://github.com/hyperium/mime,MIT OR Apache-2.0,Sean McArthur mime_guess,https://github.com/abonander/mime_guess,MIT,Austin Bonander mini-internal,https://github.com/dtolnay/miniserde,MIT OR Apache-2.0,David Tolnay @@ -465,6 +476,7 @@ minimal-lexical,https://github.com/Alexhuszagh/minimal-lexical,MIT OR Apache-2.0 miniserde,https://github.com/dtolnay/miniserde,MIT OR Apache-2.0,David Tolnay miniz_oxide,https://github.com/Frommi/miniz_oxide/tree/master/miniz_oxide,MIT OR Zlib OR Apache-2.0,"Frommi , oyvindln , Rich Geldreich richgel99@gmail.com" mio,https://github.com/tokio-rs/mio,MIT,"Carl Lerche , Thomas de Zeeuw , Tokio Contributors " +mixtrics,https://github.com/foyer-rs/mixtrics,Apache-2.0,MrCroxx mockall,https://github.com/asomers/mockall,MIT OR Apache-2.0,Alan Somers mockall_derive,https://github.com/asomers/mockall,MIT OR Apache-2.0,Alan Somers moka,https://github.com/moka-rs/moka,(MIT OR Apache-2.0) AND Apache-2.0,The moka Authors @@ -749,6 +761,7 @@ simple_asn1,https://github.com/acw/simple_asn1,ISC,Adam Wick siphasher,https://github.com/jedisct1/rust-siphash,MIT OR Apache-2.0,Frank Denis sketches-ddsketch,https://github.com/mheffner/rust-sketches-ddsketch,Apache-2.0,Mike Heffner slab,https://github.com/tokio-rs/slab,MIT,Carl Lerche +small_ctor,https://github.com/mitsuhiko/small-ctor,Apache-2.0,Armin Ronacher smallvec,https://github.com/servo/rust-smallvec,MIT OR Apache-2.0,The Servo Project Developers snafu,https://github.com/shepmaster/snafu,MIT OR Apache-2.0,Jake Goulding snafu-derive,https://github.com/shepmaster/snafu,MIT OR Apache-2.0,Jake Goulding diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index fd7ff2ad9e8..ea09021549c 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2114,6 +2114,12 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f88a43d011fc4a6876cb7344703e297c71dda42494fee094d5f7c76bf13f746" +[[package]] +name = "cmsketch" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ee2cfacbd29706479902b06d75ad8f1362900836aa32799eabc7e004bfd854" + [[package]] name = "coarsetime" version = "0.1.37" @@ -2344,6 +2350,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpp_demangle" version = "0.4.5" @@ -4058,6 +4075,16 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "fastant" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e825441bfb2d831c47c97d05821552db8832479f44c571b97fededbf0099c07" +dependencies = [ + "small_ctor", + "web-time", +] + [[package]] name = "fastdivide" version = "0.4.2" @@ -4238,6 +4265,120 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "foyer" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0abc0b87814989efa711f9becd9f26969820e2d3905db27d10969c4bd45890" +dependencies = [ + "anyhow", + "equivalent", + "foyer-common", + "foyer-memory", + "foyer-storage", + "foyer-tokio", + "futures-util", + "mea", + "mixtrics", + "pin-project", + "serde", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3db80d5dece93adb7ad709c84578794724a9cba342a7e566c3551c7ec626789" +dependencies = [ + "anyhow", + "bincode", + "bytes", + "cfg-if", + "foyer-tokio", + "mixtrics", + "parking_lot", + "pin-project", + "serde", + "twox-hash", +] + +[[package]] +name = "foyer-intrusive-collections" +version = "0.10.0-dev" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b" +dependencies = [ + "memoffset", +] + +[[package]] +name = "foyer-memory" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db907f40a527ca2aa2f40a5f68b32ea58aa70f050cd233518e9ffd402cfba6ce" +dependencies = [ + "anyhow", + "bitflags 2.11.1", + "cmsketch", + "equivalent", + "foyer-common", + "foyer-intrusive-collections", + "foyer-tokio", + "futures-util", + "hashbrown 0.16.1", + "itertools 0.14.0", + "mea", + "mixtrics", + "parking_lot", + "paste", + "pin-project", + "serde", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1983f1db3d0710e9c9d5fc116d9202dccd41a2d1e032572224f1aff5520aa958" +dependencies = [ + "allocator-api2", + "anyhow", + "bytes", + "core_affinity", + "equivalent", + "fastant", + "foyer-common", + "foyer-memory", + "foyer-tokio", + "fs4", + "futures-core", + "futures-util", + "hashbrown 0.16.1", + "io-uring", + "itertools 0.14.0", + "libc", + "lz4", + "mea", + "parking_lot", + "pin-project", + "rand 0.9.4", + "serde", + "tracing", + "twox-hash", + "zstd", +] + +[[package]] +name = "foyer-tokio" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6577b05a7ffad0db555aedf00bfe52af818220fc4c1c3a7a12520896fc38627" +dependencies = [ + "tokio", +] + [[package]] name = "fraction" version = "0.15.4" @@ -5369,6 +5510,17 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-uring" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d09b98f7eace8982db770e4408e7470b028ce513ac28fecdc6bf4c30fe92b62" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "libc", +] + [[package]] name = "ipcrypt-rs" version = "0.9.4" @@ -6007,6 +6159,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -6087,6 +6248,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mixtrics" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb252c728b9d77c6ef9103f0c81524fa0a3d3b161d0a936295d7fbeff6e04c11" +dependencies = [ + "itertools 0.14.0", + "parking_lot", +] + [[package]] name = "mockall" version = "0.14.0" @@ -9030,6 +9201,7 @@ dependencies = [ "bytes", "bytesize", "fnv", + "foyer", "futures", "http 1.4.0", "itertools 0.14.0", @@ -9157,6 +9329,7 @@ dependencies = [ "bytes", "bytesize", "fnv", + "foyer", "futures", "http 1.4.0", "http-body-util", @@ -10804,6 +10977,12 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" +[[package]] +name = "small_ctor" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88414a5ca1f85d82cc34471e975f0f74f6aa54c40f062efa42c0080e7f763f81" + [[package]] name = "smallvec" version = "1.15.1" @@ -12288,6 +12467,9 @@ name = "twox-hash" version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +dependencies = [ + "rand 0.9.4", +] [[package]] name = "typeid" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index d0999958a42..2854611a50e 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -119,6 +119,7 @@ dyn-clone = "1.0" enum-iterator = "2.3" env_logger = { version = "0.11", default-features = false, features = ["auto-color"] } fail = "0.5" +foyer = { version = "0.22.3", features = ["serde"] } flate2 = "1.1" flume = "0.12" fnv = "1" diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 03e76fbb331..5325ffd283c 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -209,13 +209,12 @@ fn _check_macro_works() { #[doc(hidden)] pub use coarsetime::Instant as CoarsetimeInstant; -pub use rate_limited_debug; -pub use rate_limited_error; -pub use rate_limited_info; -pub use rate_limited_trace; #[doc(hidden)] pub use rate_limited_tracing; -pub use rate_limited_warn; +pub use { + rate_limited_debug, rate_limited_error, rate_limited_info, rate_limited_trace, + rate_limited_warn, +}; #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index cf67768966d..ab45411e19e 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -302,6 +302,20 @@ pub struct SearcherConfig { pub storage_timeout_policy: Option, pub warmup_memory_budget: ByteSize, pub warmup_single_split_initial_allocation: ByteSize, + + /// Per-cache foyer disk tier capacities. Disabled (0) by default. + /// Set any of these to a non-zero value to enable an NVMe-backed disk tier + /// for that cache; evicted in-memory entries spill to local disk under + /// `{data_dir}/search-cache/` instead of being lost. Use a local NVMe mount + /// (EBS contention will mask the benefit). The `predicate_cache` has no + /// disk tier because its access trait is synchronous. + #[serde(default)] + pub fast_field_disk_cache_capacity: ByteSize, + #[serde(default)] + pub split_footer_disk_cache_capacity: ByteSize, + #[serde(default)] + pub partial_request_disk_cache_capacity: ByteSize, + /// Lambda configuration for serverless leaf search execution. /// If set, enables Lambda execution for leaf search. /// @@ -525,6 +539,9 @@ impl Default for SearcherConfig { storage_timeout_policy: None, warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::mb(300), + fast_field_disk_cache_capacity: ByteSize(0), + split_footer_disk_cache_capacity: ByteSize(0), + partial_request_disk_cache_capacity: ByteSize(0), lambda: None, } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index ae19a92c45f..2a456758c77 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -688,6 +688,9 @@ mod tests { }), warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::mb(300), + fast_field_disk_cache_capacity: ByteSize(0), + split_footer_disk_cache_capacity: ByteSize(0), + partial_request_disk_cache_capacity: ByteSize(0), lambda: Some(LambdaConfig { function_name: "quickwit-lambda-leaf-search".to_string(), max_splits_per_invocation: NonZeroUsize::new(10).unwrap(), diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 428387c6353..a8c0bb0e935 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -22,8 +22,7 @@ use std::cmp::Ordering; pub mod cluster; pub mod control_plane; -pub use bytes; -pub use tonic; +pub use {bytes, tonic}; pub mod developer; pub mod error; mod getters; diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index dcf2bd5a774..2bf5c2c4436 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -17,6 +17,7 @@ base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } fnv = { workspace = true } +foyer = { workspace = true } futures = { workspace = true } http = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index bab9e3c7405..759525e2958 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -40,8 +40,8 @@ use quickwit_query::query_ast::{ }; use quickwit_query::tokenizers::TokenizerManager; use quickwit_storage::{ - BundleStorage, ByteRangeCache, CountingStorage, MemorySizedCache, OwnedBytes, SplitCache, - Storage, StorageResolver, TimeoutAndRetryStorage, wrap_storage_with_cache, + BundleStorage, ByteRangeCache, CountingStorage, OwnedBytes, SplitCache, Storage, + StorageResolver, TimeoutAndRetryStorage, wrap_storage_with_cache, }; use tantivy::aggregation::AggContextParams; use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations}; @@ -54,7 +54,7 @@ use tokio::task::{JoinError, JoinSet}; use tracing::*; use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; -use crate::leaf_cache::LeafSearchCache; +use crate::leaf_cache::{LeafSearchCache, SplitFooterCache}; use crate::metrics::SplitSearchOutcomeCounters; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::{ @@ -119,13 +119,10 @@ fn greedy_batch_split( async fn get_split_footer_from_cache_or_fetch( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, - footer_cache: &MemorySizedCache, + footer_cache: &SplitFooterCache, ) -> anyhow::Result { - { - let possible_val = footer_cache.get(&split_and_footer_offsets.split_id); - if let Some(footer_data) = possible_val { - return Ok(footer_data); - } + if let Some(footer_data) = footer_cache.get(&split_and_footer_offsets.split_id).await { + return Ok(footer_data); } let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); let footer_data_opt = index_storage @@ -511,6 +508,7 @@ async fn leaf_search_single_split( .searcher_context .leaf_search_cache .get(split.clone(), search_request.clone()) + .await { leaf_search_state_guard.set_state(SplitSearchState::CacheHit); return Ok(Some(cached_answer)); @@ -1708,7 +1706,8 @@ pub async fn single_doc_mapping_leaf_search( split_with_req, split_outcome_counters.clone(), &mut incremental_merge_collector, - )?; + ) + .await?; let incremental_merge_collector_arc: Arc> = Arc::new(Mutex::new(incremental_merge_collector)); @@ -1869,7 +1868,7 @@ async fn run_local_search_tasks( /// We identify the splits that are in the cache and append them to the incremental merge collector. /// The (split, request) that are yet to be processed are returned. -fn process_partial_result_cache( +async fn process_partial_result_cache( leaf_search_cache: &LeafSearchCache, split_with_req: Vec<(SplitIdAndFooterOffsets, SearchRequest)>, split_outcome_counters: Arc, @@ -1881,6 +1880,7 @@ fn process_partial_result_cache( if let Some(cached_response) = leaf_search_cache // TODO remove the clone here. .get(split.clone(), search_request.clone()) + .await { // The cached response already carries cache-hit `resource_stats` // (set at write time by `LeafSearchCache::put`), so no per-read diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 72ea0f8e70c..36dc051c773 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -13,7 +13,10 @@ // limitations under the License. use std::ops::{Bound, RangeBounds}; +use std::path::Path; +use bytesize::ByteSize; +use foyer::DeviceBuilder as _; use prost::Message; use quickwit_config::CacheConfig; use quickwit_proto::search::{ @@ -21,12 +24,9 @@ use quickwit_proto::search::{ }; use quickwit_proto::types::SplitId; use quickwit_storage::{MemorySizedCache, OwnedBytes}; +use serde::{Deserialize, Serialize}; use tantivy::index::SegmentId; - -/// A cache to memoize `leaf_search_single_split` results. -pub struct LeafSearchCache { - content: MemorySizedCache, -} +use tracing::{info, warn}; // TODO we could be smarter about search_after. If we have a cached request with a search_after // (possibly equal to None) A, and a corresponding response with the 1st element having the value @@ -43,26 +43,135 @@ pub struct LeafSearchCache { // truncate to X while merging, and get free results from cache for at least the next k subsequent // queries which vary only by search_after. +/// A cache to memoize `leaf_search_single_split` results. +/// +/// Supports two modes: +/// - **MemoryOnly**: In-memory LRU cache (existing behavior). +/// - **Hybrid**: foyer-based memory + disk cache. Evicted entries spill to local NVMe instead of +/// being lost, so the next access reads from disk (~0.1ms) instead of S3 (~50-200ms). +// The two variants differ significantly in size (`foyer::HybridCache` is large), but boxing the +// `Hybrid` variant would add an indirection on every cache hit, which we want to avoid on the +// search hot path. +#[allow(clippy::large_enum_variant)] +pub enum LeafSearchCache { + MemoryOnly { + content: MemorySizedCache, + }, + Hybrid { + cache: foyer::HybridCache, + metrics: quickwit_storage::metrics::SingleCacheMetrics, + }, +} + +/// Serializable wrapper around encoded protobuf bytes for the foyer disk cache. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct CachedValue(Vec); + impl LeafSearchCache { + /// Create a memory-only cache (backward-compatible, no disk tier). pub fn new(config: &CacheConfig) -> LeafSearchCache { - LeafSearchCache { + LeafSearchCache::MemoryOnly { content: MemorySizedCache::from_config( config, &quickwit_storage::STORAGE_METRICS.partial_request_cache, ), } } - pub fn get( + + /// Create a hybrid cache with memory + disk tiers. + /// + /// The memory tier uses the capacity from `config`. The disk tier uses the + /// provided path and capacity. When entries are evicted from memory, they + /// spill to the disk tier. + pub async fn new_hybrid( + config: &CacheConfig, + disk_cache_path: &Path, + disk_cache_capacity: ByteSize, + ) -> anyhow::Result { + // ensure the directory exists + tokio::fs::create_dir_all(disk_cache_path).await?; + + let memory_capacity = config.capacity().as_u64() as usize; + let disk_capacity = disk_cache_capacity.as_u64() as usize; + + info!( + memory_capacity_mb = memory_capacity / 1024 / 1024, + disk_capacity_mb = disk_capacity / 1024 / 1024, + disk_path = %disk_cache_path.display(), + "building hybrid leaf search cache" + ); + + let device = foyer::FsDeviceBuilder::new(disk_cache_path) + .with_capacity(disk_capacity) + .build()?; + let engine_config = foyer::BlockEngineConfig::new(device); + + let hybrid_cache: foyer::HybridCache = + foyer::HybridCacheBuilder::new() + .memory(memory_capacity) + .storage() + .with_engine_config(engine_config) + .build() + .await?; + + let metrics = quickwit_storage::STORAGE_METRICS + .partial_request_cache + .cache_metrics + .clone(); + + Ok(LeafSearchCache::Hybrid { + cache: hybrid_cache, + metrics, + }) + } + + pub async fn get( &self, split_info: SplitIdAndFooterOffsets, search_request: SearchRequest, ) -> Option { let key = CacheKey::from_split_meta_and_request(split_info, search_request); - let encoded_result = self.content.get(&key)?; - // this should never fail - LeafSearchResponse::decode(&*encoded_result).ok() + match self { + LeafSearchCache::MemoryOnly { content } => { + let encoded_result = content.get(&key)?; + LeafSearchResponse::decode(&*encoded_result).ok() + } + LeafSearchCache::Hybrid { cache, metrics } => { + let memory_size = cache.memory().usage(); + let entry = cache.get(&key).await; + match entry { + Ok(Some(entry)) => { + metrics.hits_num_items.inc(); + let bytes = &entry.value().0; + metrics.hits_num_bytes.inc_by(bytes.len() as u64); + tracing::debug!( + split_id = %key.split_id, + memory_usage = memory_size, + "hybrid cache HIT" + ); + LeafSearchResponse::decode(bytes.as_slice()).ok() + } + Ok(None) => { + metrics.misses_num_items.inc(); + tracing::debug!( + split_id = %key.split_id, + memory_usage = memory_size, + "hybrid cache MISS" + ); + None + } + Err(err) => { + metrics.misses_num_items.inc(); + warn!(error = %err, "hybrid cache get failed, treating as miss"); + None + } + } + } + } } + /// Insert a result into the cache. This is synchronous — foyer's `insert()` writes + /// to the memory tier immediately and spills to disk asynchronously in the background. pub fn put( &self, split_info: SplitIdAndFooterOffsets, @@ -78,12 +187,23 @@ impl LeafSearchCache { }); let key = CacheKey::from_split_meta_and_request(split_info, search_request); let encoded_result = result.encode_to_vec(); - self.content.put(key, OwnedBytes::new(encoded_result)); + match self { + LeafSearchCache::MemoryOnly { content } => { + content.put(key, OwnedBytes::new(encoded_result)); + } + LeafSearchCache::Hybrid { cache, metrics } => { + metrics.in_cache_num_bytes.add(encoded_result.len() as i64); + metrics.in_cache_count.inc(); + cache.insert(key, CachedValue(encoded_result)); + } + } } } /// A key inside a [`LeafSearchCache`]. -#[derive(Debug, Hash, Clone, PartialEq, Eq)] +/// +/// `Serialize`/`Deserialize` are needed for the foyer disk cache (via the `Code` trait). +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] struct CacheKey { /// The split this entry refers to split_id: SplitId, @@ -118,7 +238,7 @@ impl CacheKey { } /// A (half-open) range bounded inclusively below and exclusively above [start..end). -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] struct HalfOpenRange { start: i64, end: Option, @@ -195,6 +315,114 @@ impl RangeBounds for HalfOpenRange { } } +/// A cache for split footers, optionally backed by a foyer disk tier. +/// +/// `get` is async because the disk tier requires it; the memory-only +/// variant returns immediately. +// See `LeafSearchCache` comment: boxing the `Hybrid` variant would add a per-hit indirection. +#[allow(clippy::large_enum_variant)] +pub enum SplitFooterCache { + Memory { + content: MemorySizedCache, + }, + Hybrid { + cache: foyer::HybridCache, + metrics: quickwit_storage::metrics::SingleCacheMetrics, + }, +} + +/// Serializable wrapper around footer bytes for the foyer disk cache. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct FooterValue(Vec); + +impl SplitFooterCache { + pub fn new(config: &CacheConfig) -> Self { + SplitFooterCache::Memory { + content: MemorySizedCache::from_config( + config, + &quickwit_storage::STORAGE_METRICS.split_footer_cache, + ), + } + } + + pub async fn new_hybrid( + config: &CacheConfig, + disk_cache_path: &Path, + disk_cache_capacity: ByteSize, + ) -> anyhow::Result { + tokio::fs::create_dir_all(disk_cache_path).await?; + + let memory_capacity = config.capacity().as_u64() as usize; + let disk_capacity = disk_cache_capacity.as_u64() as usize; + + info!( + memory_capacity_mb = memory_capacity / 1024 / 1024, + disk_capacity_mb = disk_capacity / 1024 / 1024, + disk_path = %disk_cache_path.display(), + "building hybrid split footer cache" + ); + + let device = foyer::FsDeviceBuilder::new(disk_cache_path) + .with_capacity(disk_capacity) + .build()?; + let engine_config = foyer::BlockEngineConfig::new(device); + + let hybrid_cache: foyer::HybridCache = + foyer::HybridCacheBuilder::new() + .memory(memory_capacity) + .storage() + .with_engine_config(engine_config) + .build() + .await?; + + let metrics = quickwit_storage::STORAGE_METRICS + .split_footer_cache + .cache_metrics + .clone(); + + Ok(SplitFooterCache::Hybrid { + cache: hybrid_cache, + metrics, + }) + } + + pub async fn get(&self, split_id: &str) -> Option { + match self { + // `MemorySizedCache::get` requires `&K` (not `&str`) because + // `Arc: Borrow` is not implemented. One small alloc per hit. + SplitFooterCache::Memory { content } => content.get(&split_id.to_owned()), + SplitFooterCache::Hybrid { cache, metrics } => match cache.get(split_id).await { + Ok(Some(entry)) => { + metrics.hits_num_items.inc(); + let bytes = &entry.value().0; + metrics.hits_num_bytes.inc_by(bytes.len() as u64); + Some(OwnedBytes::new(bytes.clone())) + } + Ok(None) => { + metrics.misses_num_items.inc(); + None + } + Err(err) => { + metrics.misses_num_items.inc(); + warn!(error = %err, "hybrid split footer cache get failed, treating as miss"); + None + } + }, + } + } + + pub fn put(&self, split_id: String, bytes: OwnedBytes) { + match self { + SplitFooterCache::Memory { content } => content.put(split_id, bytes), + SplitFooterCache::Hybrid { cache, metrics } => { + metrics.in_cache_num_bytes.add(bytes.len() as i64); + metrics.in_cache_count.inc(); + cache.insert(split_id, FooterValue(bytes.to_vec())); + } + } + } +} + pub struct PredicateCacheImpl { content: MemorySizedCache<(SplitId, String)>, } @@ -250,8 +478,8 @@ mod tests { use super::LeafSearchCache; - #[test] - fn test_leaf_search_cache_no_timestamp() { + #[tokio::test] + async fn test_leaf_search_cache_no_timestamp() { let cache = LeafSearchCache::new(&ByteSize::mb(64).into()); let split_1 = SplitIdAndFooterOffsets { @@ -308,7 +536,7 @@ mod tests { resource_stats: None, }; - assert!(cache.get(split_1.clone(), query_1.clone()).is_none()); + assert!(cache.get(split_1.clone(), query_1.clone()).await.is_none()); // `LeafSearchCache::put` overwrites `resource_stats` with the // cache-hit counters; reads always see those, not the original stats. @@ -322,15 +550,15 @@ mod tests { }; cache.put(split_1.clone(), query_1.clone(), result); assert_eq!( - cache.get(split_1.clone(), query_1.clone()).unwrap(), + cache.get(split_1.clone(), query_1.clone()).await.unwrap(), expected ); - assert!(cache.get(split_2, query_1).is_none()); - assert!(cache.get(split_1, query_2).is_none()); + assert!(cache.get(split_2, query_1).await.is_none()); + assert!(cache.get(split_1, query_2).await.is_none()); } - #[test] - fn test_leaf_search_cache_timestamp() { + #[tokio::test] + async fn test_leaf_search_cache_timestamp() { let cache = LeafSearchCache::new(&ByteSize::mb(64).into()); let split_1 = SplitIdAndFooterOffsets { @@ -414,27 +642,92 @@ mod tests { // for split_1, 1 and 1bis cover different timestamp ranges cache.put(split_1.clone(), query_1.clone(), result.clone()); - assert!(cache.get(split_1.clone(), query_1.clone()).is_some()); - assert!(cache.get(split_1.clone(), query_1bis.clone()).is_none()); + assert!(cache.get(split_1.clone(), query_1.clone()).await.is_some()); + assert!( + cache + .get(split_1.clone(), query_1bis.clone()) + .await + .is_none() + ); // for split_2, both 1 and 1bis cover everything, so it should cache-hit cache.put(split_2.clone(), query_1.clone(), result.clone()); - assert!(cache.get(split_2.clone(), query_1).is_some()); - assert!(cache.get(split_2.clone(), query_1bis).is_some()); + assert!(cache.get(split_2.clone(), query_1).await.is_some()); + assert!(cache.get(split_2.clone(), query_1bis).await.is_some()); // for split_1, both 1 and 1bis cover everything, so it should cache-hit cache.put(split_1.clone(), query_2.clone(), result.clone()); - assert!(cache.get(split_1.clone(), query_2.clone()).is_some()); - assert!(cache.get(split_1, query_2bis.clone()).is_some()); + assert!(cache.get(split_1.clone(), query_2.clone()).await.is_some()); + assert!(cache.get(split_1, query_2bis.clone()).await.is_some()); // for split_2, 2 covers everything, but 2bis cover only a subrange cache.put(split_2.clone(), query_2.clone(), result.clone()); - assert!(cache.get(split_2.clone(), query_2.clone()).is_some()); - assert!(cache.get(split_2, query_2bis.clone()).is_none()); + assert!(cache.get(split_2.clone(), query_2.clone()).await.is_some()); + assert!(cache.get(split_2, query_2bis.clone()).await.is_none()); // same for split_3, but we try caching the bounded request and query for the unbounded one cache.put(split_3.clone(), query_2bis.clone(), result); - assert!(cache.get(split_3.clone(), query_2).is_none()); - assert!(cache.get(split_3, query_2bis).is_some()); + assert!(cache.get(split_3.clone(), query_2).await.is_none()); + assert!(cache.get(split_3, query_2bis).await.is_some()); + } + + #[tokio::test] + async fn test_hybrid_cache_basic() { + let tmp_dir = + std::env::temp_dir().join(format!("quickwit-test-hybrid-cache-{}", std::process::id())); + let _ = std::fs::remove_dir_all(&tmp_dir); + let cache = + LeafSearchCache::new_hybrid(&ByteSize::mb(64).into(), &tmp_dir, ByteSize::mb(100)) + .await + .expect("failed to create hybrid cache"); + + let split = SplitIdAndFooterOffsets { + split_id: "split_hybrid".to_string(), + split_footer_start: 0, + split_footer_end: 100, + timestamp_start: None, + timestamp_end: None, + num_docs: 0, + }; + + let query = SearchRequest { + index_id_patterns: vec!["test-idx".to_string()], + query_ast: "hybrid_test".to_string(), + start_timestamp: None, + end_timestamp: None, + max_hits: 10, + start_offset: 0, + ..Default::default() + }; + + let result = LeafSearchResponse { + failed_splits: Vec::new(), + intermediate_aggregation_result: None, + num_attempted_splits: 1, + num_successful_splits: 1, + num_hits: 42, + partial_hits: vec![PartialHit { + doc_id: 1, + segment_ord: 0, + sort_value: Some(SortValue::U64(0u64).into()), + sort_value2: None, + split_id: "split_hybrid".to_string(), + }], + resource_stats: None, + }; + + // miss before put + assert!( + cache.get(split.clone(), query.clone()).await.is_none(), + "expected miss before put" + ); + + // put + cache.put(split.clone(), query.clone(), result.clone()); + + // hit after put + let cached = cache.get(split.clone(), query.clone()).await; + assert!(cached.is_some(), "expected hit after put, got None"); + assert_eq!(cached.unwrap().num_hits, 42); } } diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 3b08e50e480..f55958933a4 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -28,14 +28,12 @@ use quickwit_proto::search::{ ReportSplitsRequest, ReportSplitsResponse, RootResourceStats, ScrollRequest, SearchPlanResponse, SearchRequest, SearchResponse, SnippetRequest, }; -use quickwit_storage::{ - MemorySizedCache, QuickwitCache, SplitCache, StorageCache, StorageResolver, -}; +use quickwit_storage::{QuickwitCache, SplitCache, StorageCache, StorageResolver}; use tantivy::aggregation::AggregationLimitsGuard; use crate::invoker::LambdaLeafSearchInvoker; use crate::leaf::multi_index_leaf_search; -use crate::leaf_cache::{LeafSearchCache, PredicateCacheImpl}; +use crate::leaf_cache::{LeafSearchCache, PredicateCacheImpl, SplitFooterCache}; use crate::list_fields::{leaf_list_fields, root_list_fields}; use crate::list_fields_cache::ListFieldsCache; use crate::list_terms::{leaf_list_terms, root_list_terms}; @@ -412,7 +410,7 @@ pub struct SearcherContext { /// Counting semaphore to limit concurrent leaf search split requests. pub search_permit_provider: SearchPermitProvider, /// Split footer cache. - pub split_footer_cache: MemorySizedCache, + pub split_footer_cache: SplitFooterCache, /// Per-split and per-query cache. pub leaf_search_cache: LeafSearchCache, /// Per-split and per-predicate cache. @@ -462,10 +460,7 @@ impl SearcherContext { split_cache_opt: Option>, lambda_invoker: Option, ) -> Self { - let global_split_footer_cache = MemorySizedCache::from_config( - &searcher_config.split_footer_cache, - &quickwit_storage::STORAGE_METRICS.split_footer_cache, - ); + let global_split_footer_cache = SplitFooterCache::new(&searcher_config.split_footer_cache); let leaf_search_split_semaphore = SearchPermitProvider::new( searcher_config.max_num_concurrent_split_searches, searcher_config.warmup_memory_budget, @@ -497,6 +492,54 @@ impl SearcherContext { } } + /// Upgrades selected caches to hybrid memory + disk using foyer. + /// + /// Each cache is upgraded only if its corresponding `*_disk_cache_capacity` + /// in `SearcherConfig` is non-zero. This is separate from `new()` to keep + /// construction synchronous (many call sites, including tests, create + /// `SearcherContext` synchronously). + /// + /// The `predicate_cache` is intentionally not upgraded — its access trait + /// `quickwit_query::query_ast::PredicateCache` is synchronous and is called + /// inside scoring hot paths, so a disk tier would regress those queries. + pub async fn enable_hybrid_caches( + &mut self, + disk_cache_path: &std::path::Path, + ) -> anyhow::Result<()> { + let partial_request_disk = self.searcher_config.partial_request_disk_cache_capacity; + if partial_request_disk.as_u64() > 0 { + self.leaf_search_cache = LeafSearchCache::new_hybrid( + &self.searcher_config.partial_request_cache, + &disk_cache_path.join("partial-request"), + partial_request_disk, + ) + .await?; + } + + let fast_field_disk = self.searcher_config.fast_field_disk_cache_capacity; + if fast_field_disk.as_u64() > 0 { + let storage_cache = QuickwitCache::with_foyer( + &self.searcher_config.fast_field_cache, + &disk_cache_path.join("fast-field"), + fast_field_disk, + ) + .await?; + self.fast_fields_cache = Arc::new(storage_cache); + } + + let split_footer_disk = self.searcher_config.split_footer_disk_cache_capacity; + if split_footer_disk.as_u64() > 0 { + self.split_footer_cache = SplitFooterCache::new_hybrid( + &self.searcher_config.split_footer_cache, + &disk_cache_path.join("split-footer"), + split_footer_disk, + ) + .await?; + } + + Ok(()) + } + /// Returns the shared instance to track the aggregation memory usage. pub fn get_aggregation_limits(&self) -> AggregationLimitsGuard { self.aggregation_limit.clone() diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 17cb4501a19..de58adbeefc 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -642,7 +642,7 @@ pub async fn serve_quickwit( }; // Initialize Lambda invoker if enabled and searcher service is running - let searcher_context = if node_config.is_service_enabled(QuickwitService::Searcher) { + let mut searcher_context = if node_config.is_service_enabled(QuickwitService::Searcher) { if let Some(lambda_config) = &node_config.searcher_config.lambda { #[cfg(feature = "lambda")] { @@ -650,11 +650,11 @@ pub async fn serve_quickwit( warn!("offloading to lambda is EXPERIMENTAL. Use at your own risk"); let invoker = quickwit_lambda_client::try_get_or_deploy_invoker(lambda_config).await?; - Arc::new(SearcherContext::new( + SearcherContext::new( node_config.searcher_config.clone(), split_cache_opt, Some(invoker), - )) + ) } #[cfg(not(feature = "lambda"))] { @@ -662,18 +662,31 @@ pub async fn serve_quickwit( bail!("lambda support is statically disabled, but enabled in configuration"); } } else { - Arc::new(SearcherContext::new_without_invoker( + SearcherContext::new_without_invoker( node_config.searcher_config.clone(), split_cache_opt, - )) + ) } } else { - Arc::new(SearcherContext::new_without_invoker( - node_config.searcher_config.clone(), - split_cache_opt, - )) + SearcherContext::new_without_invoker(node_config.searcher_config.clone(), split_cache_opt) }; + // Upgrade individual caches to hybrid (memory + foyer NVMe disk) when their + // corresponding `*_disk_cache_capacity` is set. No-op when all three knobs + // are zero. + let searcher_cfg = &node_config.searcher_config; + let any_disk_cache_enabled = searcher_cfg.fast_field_disk_cache_capacity.as_u64() > 0 + || searcher_cfg.split_footer_disk_cache_capacity.as_u64() > 0 + || searcher_cfg.partial_request_disk_cache_capacity.as_u64() > 0; + if any_disk_cache_enabled { + let disk_cache_path = node_config.data_dir_path.join("search-cache"); + searcher_context + .enable_hybrid_caches(&disk_cache_path) + .await?; + } + + let searcher_context = Arc::new(searcher_context); + let (search_job_placer, search_service, searcher_pool) = setup_searcher( &node_config, cluster.change_stream(), diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index 8f735e44db4..54cbc18fa27 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -17,6 +17,7 @@ base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } fnv = { workspace = true } +foyer = { workspace = true } futures = { workspace = true } http-body-util = { workspace = true} hyper = { workspace = true } diff --git a/quickwit/quickwit-storage/src/cache/foyer_cache.rs b/quickwit/quickwit-storage/src/cache/foyer_cache.rs new file mode 100644 index 00000000000..0c1a53b6d22 --- /dev/null +++ b/quickwit/quickwit-storage/src/cache/foyer_cache.rs @@ -0,0 +1,141 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Foyer-backed hybrid cache (memory + disk) implementing [`StorageCache`]. +//! +//! When entries are evicted from the memory tier, they spill to local disk +//! instead of being lost. The next access reads from disk (~0.1ms) instead +//! of fetching from S3 (~50-200ms). + +use std::ops::Range; +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use foyer::DeviceBuilder as _; +use serde::{Deserialize, Serialize}; +use tracing::{info, warn}; + +use crate::OwnedBytes; +use crate::cache::StorageCache; +use crate::metrics::SingleCacheMetrics; + +const FULL_SLICE: Range = 0..usize::MAX; + +/// Cache key for the foyer hybrid cache: (file path, byte range). +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +struct FoyerSliceKey { + path: String, + range_start: usize, + range_end: usize, +} + +impl FoyerSliceKey { + fn new(path: &Path, byte_range: Range) -> Self { + FoyerSliceKey { + path: path.to_string_lossy().into_owned(), + range_start: byte_range.start, + range_end: byte_range.end, + } + } +} + +/// Cache value wrapper for foyer — just bytes. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +struct FoyerSliceValue(Vec); + +/// A hybrid (memory + disk) cache implementing `StorageCache`. +/// +/// Backed by foyer's `HybridCache`. Memory tier holds hot entries, +/// disk tier catches evicted entries. +pub struct FoyerStorageCache { + cache: foyer::HybridCache, + metrics: SingleCacheMetrics, +} + +impl FoyerStorageCache { + /// Build a new hybrid storage cache. + /// + /// - `memory_capacity`: bytes for the in-memory tier + /// - `disk_path`: directory for the disk tier + /// - `disk_capacity`: bytes for the disk tier + /// - `metrics`: Prometheus counters for this cache component + pub async fn build( + memory_capacity: usize, + disk_path: &Path, + disk_capacity: usize, + metrics: SingleCacheMetrics, + ) -> anyhow::Result { + tokio::fs::create_dir_all(disk_path).await?; + + info!( + memory_capacity_mb = memory_capacity / 1024 / 1024, + disk_capacity_mb = disk_capacity / 1024 / 1024, + disk_path = %disk_path.display(), + "building foyer hybrid storage cache" + ); + + let device = foyer::FsDeviceBuilder::new(disk_path) + .with_capacity(disk_capacity) + .build()?; + let engine_config = foyer::BlockEngineConfig::new(device); + + let cache = foyer::HybridCacheBuilder::new() + .memory(memory_capacity) + .storage() + .with_engine_config(engine_config) + .build() + .await?; + + Ok(FoyerStorageCache { cache, metrics }) + } +} + +#[async_trait] +impl StorageCache for FoyerStorageCache { + async fn get(&self, path: &Path, byte_range: Range) -> Option { + let key = FoyerSliceKey::new(path, byte_range); + match self.cache.get(&key).await { + Ok(Some(entry)) => { + self.metrics.hits_num_items.inc(); + let bytes = &entry.value().0; + self.metrics.hits_num_bytes.inc_by(bytes.len() as u64); + Some(OwnedBytes::new(bytes.clone())) + } + Ok(None) => { + self.metrics.misses_num_items.inc(); + None + } + Err(err) => { + self.metrics.misses_num_items.inc(); + warn!(error = %err, "foyer storage cache get failed"); + None + } + } + } + + async fn get_all(&self, path: &Path) -> Option { + self.get(path, FULL_SLICE).await + } + + async fn put(&self, path: PathBuf, byte_range: Range, bytes: OwnedBytes) { + let key = FoyerSliceKey::new(&path, byte_range); + self.metrics.in_cache_count.inc(); + self.metrics.in_cache_num_bytes.add(bytes.len() as i64); + self.cache.insert(key, FoyerSliceValue(bytes.to_vec())); + } + + async fn put_all(&self, path: PathBuf, bytes: OwnedBytes) { + self.put(path, FULL_SLICE, bytes).await; + } +} diff --git a/quickwit/quickwit-storage/src/cache/mod.rs b/quickwit/quickwit-storage/src/cache/mod.rs index f73a96b90f4..15aac533d1b 100644 --- a/quickwit/quickwit-storage/src/cache/mod.rs +++ b/quickwit/quickwit-storage/src/cache/mod.rs @@ -14,6 +14,7 @@ mod base_cache; mod byte_range_cache; +pub mod foyer_cache; mod memory_sized_cache; mod quickwit_cache; mod slice_address; diff --git a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs index 20441bd5fd1..94d83a193f8 100644 --- a/quickwit/quickwit-storage/src/cache/quickwit_cache.rs +++ b/quickwit/quickwit-storage/src/cache/quickwit_cache.rs @@ -17,16 +17,19 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; +use bytesize::ByteSize; use quickwit_config::CacheConfig; use crate::OwnedBytes; +use crate::cache::foyer_cache::FoyerStorageCache; use crate::cache::{MemorySizedCache, StorageCache}; use crate::metrics::CacheMetrics; const FULL_SLICE: Range = 0..usize::MAX; /// Quickwit storage cache with a size limit. -/// It is used currently by to cache only fast fields data. +/// Routes cached data by file extension to separate caches. +/// Currently caches `.fast` files (fast fields). pub struct QuickwitCache { router: Vec<(&'static str, Arc)>, } @@ -38,7 +41,8 @@ impl From)>> for QuickwitCache { } impl QuickwitCache { - /// Creates a [`QuickwitCache`] with a cache on fast fields. + /// Creates a [`QuickwitCache`] with a cache on fast fields only + /// (backward-compatible). pub fn new(cache_config: &CacheConfig) -> Self { let mut quickwit_cache = QuickwitCache::empty(); let fast_field_cache_counters: &'static CacheMetrics = @@ -53,6 +57,29 @@ impl QuickwitCache { quickwit_cache } + /// Creates a [`QuickwitCache`] with a foyer hybrid (memory + disk) cache + /// on the fast field route. + pub async fn with_foyer( + fast_field_config: &CacheConfig, + disk_base_path: &std::path::Path, + disk_capacity: ByteSize, + ) -> anyhow::Result { + let mut quickwit_cache = QuickwitCache::empty(); + let fast_dir = disk_base_path.join("fast"); + let fast_cache = FoyerStorageCache::build( + fast_field_config.capacity().as_u64() as usize, + &fast_dir, + disk_capacity.as_u64() as usize, + crate::STORAGE_METRICS + .fast_field_cache + .cache_metrics + .clone(), + ) + .await?; + quickwit_cache.add_route(".fast", Arc::new(fast_cache)); + Ok(quickwit_cache) + } + /// Empties cache. pub fn empty() -> QuickwitCache { QuickwitCache::from(Vec::new()) diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 859264cb5b2..f7cfb8c20d2 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -29,7 +29,7 @@ mod cache; mod counting_storage; mod debouncer; mod file_descriptor_cache; -mod metrics; +pub mod metrics; mod storage; mod timeout_and_retry_storage; pub use debouncer::AsyncDebouncer;