diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 4b860e403..59093a060 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -636,6 +636,11 @@ pub async fn query_events( handled.insert(idx); } + // Phase 1 — pure construction + validation, in filter order. Access-scope + // skips and the `before_id` BAD_REQUEST are decided here, before any DB + // work is issued (validation errors are deterministic client mistakes, so + // surfacing them ahead of transient DB errors is strictly more predictable). + let mut catchall_queries: Vec<(usize, buzz_db::EventQuery)> = Vec::new(); for (idx, (raw, filter)) in raw_filters.iter().zip(filters.iter()).enumerate() { if handled.contains(&idx) { continue; @@ -677,7 +682,24 @@ pub async fn query_events( query.offset = Some(offset); } - match state.db.query_events(&query).await { + catchall_queries.push((idx, query)); + } + + // Phase 2 — DB reads, bounded-concurrent, order-preserving (`buffered`). + // Phase 3 consumes results in original filter order, so response ordering + // and error semantics match the previous serial loop. + use futures_util::stream::{self, StreamExt}; + let db = state.db.clone(); + let mut catchall_results = stream::iter(catchall_queries.into_iter().map(|(idx, query)| { + let db = db.clone(); + async move { (idx, db.query_events(&query).await) } + })) + .buffered(crate::handlers::req::FILTER_QUERY_CONCURRENCY); + + // Phase 3 — post-processing, strictly in filter order. + while let Some((idx, filter_events)) = catchall_results.next().await { + let filter = &filters[idx]; + match filter_events { Ok(stored_events) => { for se in stored_events { if !event_in_accessible_channel(&se, &accessible_channels) { diff --git a/crates/buzz-relay/src/handlers/req.rs b/crates/buzz-relay/src/handlers/req.rs index 2e697eafa..795322245 100644 --- a/crates/buzz-relay/src/handlers/req.rs +++ b/crates/buzz-relay/src/handlers/req.rs @@ -22,6 +22,21 @@ use crate::state::AppState; const MAX_HISTORICAL_LIMIT: i64 = 2_000; const MAX_SUBSCRIPTIONS: usize = 1024; +/// Maximum `query_events` calls in flight per multi-filter REQ / bridge query. +/// +/// NIP-01 gives each filter its own DB query (OR semantics — see the comment at +/// the historical-delivery loop). Those queries are independent reads, so they +/// may overlap; this bound keeps one request from monopolising the Postgres +/// pool. Post-processing stays strictly in filter order (`buffered`, not +/// `buffer_unordered`), so dedupe/trace/error semantics are unchanged. +pub(crate) const FILTER_QUERY_CONCURRENCY: usize = 4; + +// Guard: keep the bound a small fraction of any sane Postgres pool size. +// Raising it past this range requires re-running the relay bench and +// reconsidering pool contention (see docs above). Compile-time — violating +// the range fails the build. +const _: () = assert!(FILTER_QUERY_CONCURRENCY >= 2 && FILTER_QUERY_CONCURRENCY <= 8); + /// Handle a REQ message: register the subscription, deliver historical events, then send EOSE. pub async fn handle_req( sub_id: String, @@ -242,29 +257,55 @@ pub async fn handle_req( let mut total_sent: usize = 0; let viewer_hex = hex::encode(&pubkey_bytes); - for filter in &filters { - // Use per-filter #h channel scope when available, falling back to the - // subscription-level channel_id. This prevents unrelated accessible-channel - // rows from consuming the LIMIT when filters target specific channels but - // the subscription is global (multiple distinct #h values across filters). - let per_filter_channel = { - let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H); - filter - .generic_tags - .get(&h) - .and_then(|vs| { - if vs.len() == 1 { - vs.iter().next()?.parse::().ok() - } else { - None - } - }) - .or(channel_id) - }; - let params = filter_to_query_params(filter, per_filter_channel, conn.tenant.community()); - - let filter_events = state.db.query_events(¶ms).await; + // Phase 1 — pure query construction, in filter order. + let filter_queries: Vec<(usize, Option, EventQuery)> = filters + .iter() + .enumerate() + .map(|(idx, filter)| { + // Use per-filter #h channel scope when available, falling back to the + // subscription-level channel_id. This prevents unrelated accessible-channel + // rows from consuming the LIMIT when filters target specific channels but + // the subscription is global (multiple distinct #h values across filters). + let per_filter_channel = { + let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H); + filter + .generic_tags + .get(&h) + .and_then(|vs| { + if vs.len() == 1 { + vs.iter().next()?.parse::().ok() + } else { + None + } + }) + .or(channel_id) + }; + let params = + filter_to_query_params(filter, per_filter_channel, conn.tenant.community()); + (idx, per_filter_channel, params) + }) + .collect(); + + // Phase 2 — DB reads, bounded-concurrent. `buffered` (not `buffer_unordered`) + // yields results in input order, so phase 3 observes filters in their + // original order and NIP-01 dedupe / conformance-trace / error semantics are + // byte-identical to the previous serial loop. + use futures_util::stream::{self, StreamExt}; + let db = state.db.clone(); + let mut results = stream::iter(filter_queries.into_iter().map( + |(idx, per_filter_channel, params)| { + let db = db.clone(); + async move { + let filter_events = db.query_events(¶ms).await; + (idx, per_filter_channel, filter_events) + } + }, + )) + .buffered(FILTER_QUERY_CONCURRENCY); + // Phase 3 — post-processing, strictly in filter order. + while let Some((idx, per_filter_channel, filter_events)) = results.next().await { + let filter = &filters[idx]; let events = match filter_events { Ok(evs) => evs, Err(e) => { @@ -1110,6 +1151,40 @@ mod tests { use super::*; use nostr::{Alphabet, Filter, SingleLetterTag}; + /// S2 invariant: the bounded-concurrency pipeline (phase 2) must yield + /// per-filter results in original filter order even when an earlier + /// filter's DB query completes *after* a later one. `buffered` guarantees + /// this; `buffer_unordered` would not. If this test fails, NIP-01 dedupe + /// order (`seen_ids` insertion order = filter order), conformance-trace + /// row order, and first-error-wins semantics are all broken. + #[tokio::test] + async fn filter_query_pipeline_preserves_filter_order() { + use futures_util::stream::{self, StreamExt}; + + // Simulated per-filter DB latencies: the FIRST filter is the SLOWEST. + let latencies_ms: Vec = vec![50, 5, 20, 1, 10, 2]; + let n = latencies_ms.len(); + + let mut results = stream::iter(latencies_ms.into_iter().enumerate().map( + |(idx, delay_ms)| async move { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + idx + }, + )) + .buffered(FILTER_QUERY_CONCURRENCY); + + let mut order = Vec::with_capacity(n); + while let Some(idx) = results.next().await { + order.push(idx); + } + + assert_eq!( + order, + (0..n).collect::>(), + "buffered pipeline must preserve input (filter) order regardless of completion order" + ); + } + #[test] fn request_local_access_cache_positive_no_db_no_repair() { let ch = uuid::Uuid::new_v4();