From f6ef63ae7efa1d7dd3c8677314e64371ceed0869 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Thu, 2 Jul 2026 10:48:07 -0400 Subject: [PATCH 1/2] perf(relay): execute multi-filter /query and WS REQ DB reads with bounded concurrency (S2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NIP-01 multi-filter requests previously awaited db.query_events once per filter, serially — a K-filter POST /query paid K sequential DB round trips (the structural cost behind get_channels' N last-message filters and any batched client query). Both loops now run three phases: 1. pure query construction + validation, in filter order 2. DB reads via futures buffered(FILTER_QUERY_CONCURRENCY=4) — order-preserving bounded concurrency 3. post-processing, strictly in original filter order Because buffered() yields in input order, phase 3 is byte-identical to the old serial loop: NIP-01 OR/dedupe order (seen_ids insertion = filter order), per-event access gates (event_in_accessible_channel, filters_match, reader_authorized_for_event, author-only), conformance trace row order, and first-error semantics are all unchanged. Special-pass filters (feed_types, depth_limit, search) are handled in earlier loops and untouched; per-filter LIMITs unchanged. One deliberate edge shift, bridge only: before_id validation (400) for a later filter now surfaces before any earlier filter's DB query executes — deterministic client errors now always precede transient DB errors. New tests: filter_query_pipeline_preserves_filter_order (proves ordered yield when the first filter is slowest) and a concurrency-bound guard. cargo test -p buzz-relay: 453 passed, 0 failed. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-relay/src/api/bridge.rs | 24 ++++- crates/buzz-relay/src/handlers/req.rs | 122 +++++++++++++++++++++----- 2 files changed, 123 insertions(+), 23 deletions(-) diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 4b860e403..59093a060 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -636,6 +636,11 @@ pub async fn query_events( handled.insert(idx); } + // Phase 1 — pure construction + validation, in filter order. Access-scope + // skips and the `before_id` BAD_REQUEST are decided here, before any DB + // work is issued (validation errors are deterministic client mistakes, so + // surfacing them ahead of transient DB errors is strictly more predictable). + let mut catchall_queries: Vec<(usize, buzz_db::EventQuery)> = Vec::new(); for (idx, (raw, filter)) in raw_filters.iter().zip(filters.iter()).enumerate() { if handled.contains(&idx) { continue; @@ -677,7 +682,24 @@ pub async fn query_events( query.offset = Some(offset); } - match state.db.query_events(&query).await { + catchall_queries.push((idx, query)); + } + + // Phase 2 — DB reads, bounded-concurrent, order-preserving (`buffered`). + // Phase 3 consumes results in original filter order, so response ordering + // and error semantics match the previous serial loop. + use futures_util::stream::{self, StreamExt}; + let db = state.db.clone(); + let mut catchall_results = stream::iter(catchall_queries.into_iter().map(|(idx, query)| { + let db = db.clone(); + async move { (idx, db.query_events(&query).await) } + })) + .buffered(crate::handlers::req::FILTER_QUERY_CONCURRENCY); + + // Phase 3 — post-processing, strictly in filter order. + while let Some((idx, filter_events)) = catchall_results.next().await { + let filter = &filters[idx]; + match filter_events { Ok(stored_events) => { for se in stored_events { if !event_in_accessible_channel(&se, &accessible_channels) { diff --git a/crates/buzz-relay/src/handlers/req.rs b/crates/buzz-relay/src/handlers/req.rs index 2e697eafa..124d77fcc 100644 --- a/crates/buzz-relay/src/handlers/req.rs +++ b/crates/buzz-relay/src/handlers/req.rs @@ -22,6 +22,15 @@ use crate::state::AppState; const MAX_HISTORICAL_LIMIT: i64 = 2_000; const MAX_SUBSCRIPTIONS: usize = 1024; +/// Maximum `query_events` calls in flight per multi-filter REQ / bridge query. +/// +/// NIP-01 gives each filter its own DB query (OR semantics — see the comment at +/// the historical-delivery loop). Those queries are independent reads, so they +/// may overlap; this bound keeps one request from monopolising the Postgres +/// pool. Post-processing stays strictly in filter order (`buffered`, not +/// `buffer_unordered`), so dedupe/trace/error semantics are unchanged. +pub(crate) const FILTER_QUERY_CONCURRENCY: usize = 4; + /// Handle a REQ message: register the subscription, deliver historical events, then send EOSE. pub async fn handle_req( sub_id: String, @@ -242,29 +251,55 @@ pub async fn handle_req( let mut total_sent: usize = 0; let viewer_hex = hex::encode(&pubkey_bytes); - for filter in &filters { - // Use per-filter #h channel scope when available, falling back to the - // subscription-level channel_id. This prevents unrelated accessible-channel - // rows from consuming the LIMIT when filters target specific channels but - // the subscription is global (multiple distinct #h values across filters). - let per_filter_channel = { - let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H); - filter - .generic_tags - .get(&h) - .and_then(|vs| { - if vs.len() == 1 { - vs.iter().next()?.parse::().ok() - } else { - None - } - }) - .or(channel_id) - }; - let params = filter_to_query_params(filter, per_filter_channel, conn.tenant.community()); - - let filter_events = state.db.query_events(¶ms).await; + // Phase 1 — pure query construction, in filter order. + let filter_queries: Vec<(usize, Option, EventQuery)> = filters + .iter() + .enumerate() + .map(|(idx, filter)| { + // Use per-filter #h channel scope when available, falling back to the + // subscription-level channel_id. This prevents unrelated accessible-channel + // rows from consuming the LIMIT when filters target specific channels but + // the subscription is global (multiple distinct #h values across filters). + let per_filter_channel = { + let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H); + filter + .generic_tags + .get(&h) + .and_then(|vs| { + if vs.len() == 1 { + vs.iter().next()?.parse::().ok() + } else { + None + } + }) + .or(channel_id) + }; + let params = + filter_to_query_params(filter, per_filter_channel, conn.tenant.community()); + (idx, per_filter_channel, params) + }) + .collect(); + + // Phase 2 — DB reads, bounded-concurrent. `buffered` (not `buffer_unordered`) + // yields results in input order, so phase 3 observes filters in their + // original order and NIP-01 dedupe / conformance-trace / error semantics are + // byte-identical to the previous serial loop. + use futures_util::stream::{self, StreamExt}; + let db = state.db.clone(); + let mut results = stream::iter(filter_queries.into_iter().map( + |(idx, per_filter_channel, params)| { + let db = db.clone(); + async move { + let filter_events = db.query_events(¶ms).await; + (idx, per_filter_channel, filter_events) + } + }, + )) + .buffered(FILTER_QUERY_CONCURRENCY); + // Phase 3 — post-processing, strictly in filter order. + while let Some((idx, per_filter_channel, filter_events)) = results.next().await { + let filter = &filters[idx]; let events = match filter_events { Ok(evs) => evs, Err(e) => { @@ -1110,6 +1145,49 @@ mod tests { use super::*; use nostr::{Alphabet, Filter, SingleLetterTag}; + /// S2 invariant: the bounded-concurrency pipeline (phase 2) must yield + /// per-filter results in original filter order even when an earlier + /// filter's DB query completes *after* a later one. `buffered` guarantees + /// this; `buffer_unordered` would not. If this test fails, NIP-01 dedupe + /// order (`seen_ids` insertion order = filter order), conformance-trace + /// row order, and first-error-wins semantics are all broken. + #[tokio::test] + async fn filter_query_pipeline_preserves_filter_order() { + use futures_util::stream::{self, StreamExt}; + + // Simulated per-filter DB latencies: the FIRST filter is the SLOWEST. + let latencies_ms: Vec = vec![50, 5, 20, 1, 10, 2]; + let n = latencies_ms.len(); + + let mut results = stream::iter(latencies_ms.into_iter().enumerate().map( + |(idx, delay_ms)| async move { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + idx + }, + )) + .buffered(FILTER_QUERY_CONCURRENCY); + + let mut order = Vec::with_capacity(n); + while let Some(idx) = results.next().await { + order.push(idx); + } + + assert_eq!( + order, + (0..n).collect::>(), + "buffered pipeline must preserve input (filter) order regardless of completion order" + ); + } + + /// Guard: the concurrency bound stays a small fraction of any sane pool + /// size. If someone raises it past this, they must re-run the relay bench + /// and reconsider pool contention (see FILTER_QUERY_CONCURRENCY docs). + #[test] + fn filter_query_concurrency_is_bounded() { + assert!(FILTER_QUERY_CONCURRENCY >= 2); + assert!(FILTER_QUERY_CONCURRENCY <= 8); + } + #[test] fn request_local_access_cache_positive_no_db_no_repair() { let ch = uuid::Uuid::new_v4(); From 8b4c44fb11169404e1c1bfa6c4d6210c1fa42fa8 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Thu, 2 Jul 2026 10:56:53 -0400 Subject: [PATCH 2/2] fix(relay): replace constant-value guard test with compile-time const assert CI clippy runs --all-targets -D warnings, which lints test code and rejects assertions_on_constants. The compile-time const assert at the definition site is stronger anyway: violating the FILTER_QUERY_CONCURRENCY range now fails every build, not just a test run. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-relay/src/handlers/req.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/buzz-relay/src/handlers/req.rs b/crates/buzz-relay/src/handlers/req.rs index 124d77fcc..795322245 100644 --- a/crates/buzz-relay/src/handlers/req.rs +++ b/crates/buzz-relay/src/handlers/req.rs @@ -31,6 +31,12 @@ const MAX_SUBSCRIPTIONS: usize = 1024; /// `buffer_unordered`), so dedupe/trace/error semantics are unchanged. pub(crate) const FILTER_QUERY_CONCURRENCY: usize = 4; +// Guard: keep the bound a small fraction of any sane Postgres pool size. +// Raising it past this range requires re-running the relay bench and +// reconsidering pool contention (see docs above). Compile-time — violating +// the range fails the build. +const _: () = assert!(FILTER_QUERY_CONCURRENCY >= 2 && FILTER_QUERY_CONCURRENCY <= 8); + /// Handle a REQ message: register the subscription, deliver historical events, then send EOSE. pub async fn handle_req( sub_id: String, @@ -1179,15 +1185,6 @@ mod tests { ); } - /// Guard: the concurrency bound stays a small fraction of any sane pool - /// size. If someone raises it past this, they must re-run the relay bench - /// and reconsider pool contention (see FILTER_QUERY_CONCURRENCY docs). - #[test] - fn filter_query_concurrency_is_bounded() { - assert!(FILTER_QUERY_CONCURRENCY >= 2); - assert!(FILTER_QUERY_CONCURRENCY <= 8); - } - #[test] fn request_local_access_cache_positive_no_db_no_repair() { let ch = uuid::Uuid::new_v4();