Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion crates/buzz-relay/src/api/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ pub async fn query_events(
handled.insert(idx);
}

// Phase 1 — pure construction + validation, in filter order. Access-scope
// skips and the `before_id` BAD_REQUEST are decided here, before any DB
// work is issued (validation errors are deterministic client mistakes, so
// surfacing them ahead of transient DB errors is strictly more predictable).
let mut catchall_queries: Vec<(usize, buzz_db::EventQuery)> = Vec::new();
for (idx, (raw, filter)) in raw_filters.iter().zip(filters.iter()).enumerate() {
if handled.contains(&idx) {
continue;
Expand Down Expand Up @@ -677,7 +682,24 @@ pub async fn query_events(
query.offset = Some(offset);
}

match state.db.query_events(&query).await {
catchall_queries.push((idx, query));
}

// Phase 2 — DB reads, bounded-concurrent, order-preserving (`buffered`).
// Phase 3 consumes results in original filter order, so response ordering
// and error semantics match the previous serial loop.
use futures_util::stream::{self, StreamExt};
let db = state.db.clone();
let mut catchall_results = stream::iter(catchall_queries.into_iter().map(|(idx, query)| {
let db = db.clone();
async move { (idx, db.query_events(&query).await) }
}))
.buffered(crate::handlers::req::FILTER_QUERY_CONCURRENCY);

// Phase 3 — post-processing, strictly in filter order.
while let Some((idx, filter_events)) = catchall_results.next().await {
let filter = &filters[idx];
match filter_events {
Ok(stored_events) => {
for se in stored_events {
if !event_in_accessible_channel(&se, &accessible_channels) {
Expand Down
119 changes: 97 additions & 22 deletions crates/buzz-relay/src/handlers/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ use crate::state::AppState;
const MAX_HISTORICAL_LIMIT: i64 = 2_000;
const MAX_SUBSCRIPTIONS: usize = 1024;

/// Maximum `query_events` calls in flight per multi-filter REQ / bridge query.
///
/// NIP-01 gives each filter its own DB query (OR semantics — see the comment at
/// the historical-delivery loop). Those queries are independent reads, so they
/// may overlap; this bound keeps one request from monopolising the Postgres
/// pool. Post-processing stays strictly in filter order (`buffered`, not
/// `buffer_unordered`), so dedupe/trace/error semantics are unchanged.
pub(crate) const FILTER_QUERY_CONCURRENCY: usize = 4;

// Guard: keep the bound a small fraction of any sane Postgres pool size.
// Raising it past this range requires re-running the relay bench and
// reconsidering pool contention (see docs above). Compile-time — violating
// the range fails the build.
const _: () = assert!(FILTER_QUERY_CONCURRENCY >= 2 && FILTER_QUERY_CONCURRENCY <= 8);

/// Handle a REQ message: register the subscription, deliver historical events, then send EOSE.
pub async fn handle_req(
sub_id: String,
Expand Down Expand Up @@ -242,29 +257,55 @@ pub async fn handle_req(
let mut total_sent: usize = 0;
let viewer_hex = hex::encode(&pubkey_bytes);

for filter in &filters {
// Use per-filter #h channel scope when available, falling back to the
// subscription-level channel_id. This prevents unrelated accessible-channel
// rows from consuming the LIMIT when filters target specific channels but
// the subscription is global (multiple distinct #h values across filters).
let per_filter_channel = {
let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H);
filter
.generic_tags
.get(&h)
.and_then(|vs| {
if vs.len() == 1 {
vs.iter().next()?.parse::<uuid::Uuid>().ok()
} else {
None
}
})
.or(channel_id)
};
let params = filter_to_query_params(filter, per_filter_channel, conn.tenant.community());

let filter_events = state.db.query_events(&params).await;
// Phase 1 — pure query construction, in filter order.
let filter_queries: Vec<(usize, Option<uuid::Uuid>, EventQuery)> = filters
.iter()
.enumerate()
.map(|(idx, filter)| {
// Use per-filter #h channel scope when available, falling back to the
// subscription-level channel_id. This prevents unrelated accessible-channel
// rows from consuming the LIMIT when filters target specific channels but
// the subscription is global (multiple distinct #h values across filters).
let per_filter_channel = {
let h = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H);
filter
.generic_tags
.get(&h)
.and_then(|vs| {
if vs.len() == 1 {
vs.iter().next()?.parse::<uuid::Uuid>().ok()
} else {
None
}
})
.or(channel_id)
};
let params =
filter_to_query_params(filter, per_filter_channel, conn.tenant.community());
(idx, per_filter_channel, params)
})
.collect();

// Phase 2 — DB reads, bounded-concurrent. `buffered` (not `buffer_unordered`)
// yields results in input order, so phase 3 observes filters in their
// original order and NIP-01 dedupe / conformance-trace / error semantics are
// byte-identical to the previous serial loop.
use futures_util::stream::{self, StreamExt};
let db = state.db.clone();
let mut results = stream::iter(filter_queries.into_iter().map(
|(idx, per_filter_channel, params)| {
let db = db.clone();
async move {
let filter_events = db.query_events(&params).await;
(idx, per_filter_channel, filter_events)
}
},
))
.buffered(FILTER_QUERY_CONCURRENCY);

// Phase 3 — post-processing, strictly in filter order.
while let Some((idx, per_filter_channel, filter_events)) = results.next().await {
let filter = &filters[idx];
let events = match filter_events {
Ok(evs) => evs,
Err(e) => {
Expand Down Expand Up @@ -1110,6 +1151,40 @@ mod tests {
use super::*;
use nostr::{Alphabet, Filter, SingleLetterTag};

/// S2 invariant: the bounded-concurrency pipeline (phase 2) must yield
/// per-filter results in original filter order even when an earlier
/// filter's DB query completes *after* a later one. `buffered` guarantees
/// this; `buffer_unordered` would not. If this test fails, NIP-01 dedupe
/// order (`seen_ids` insertion order = filter order), conformance-trace
/// row order, and first-error-wins semantics are all broken.
#[tokio::test]
async fn filter_query_pipeline_preserves_filter_order() {
use futures_util::stream::{self, StreamExt};

// Simulated per-filter DB latencies: the FIRST filter is the SLOWEST.
let latencies_ms: Vec<u64> = vec![50, 5, 20, 1, 10, 2];
let n = latencies_ms.len();

let mut results = stream::iter(latencies_ms.into_iter().enumerate().map(
|(idx, delay_ms)| async move {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
idx
},
))
.buffered(FILTER_QUERY_CONCURRENCY);

let mut order = Vec::with_capacity(n);
while let Some(idx) = results.next().await {
order.push(idx);
}

assert_eq!(
order,
(0..n).collect::<Vec<_>>(),
"buffered pipeline must preserve input (filter) order regardless of completion order"
);
}

#[test]
fn request_local_access_cache_positive_no_db_no_repair() {
let ch = uuid::Uuid::new_v4();
Expand Down
Loading