From 144f00bcdf99e995a88465bff936742d78e270b9 Mon Sep 17 00:00:00 2001 From: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Date: Wed, 1 Jul 2026 22:11:01 -0400 Subject: [PATCH 1/3] perf(relay): defer post-commit event dispatch Co-authored-by: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Signed-off-by: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> --- crates/buzz-relay/src/handlers/event.rs | 59 ++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/crates/buzz-relay/src/handlers/event.rs b/crates/buzz-relay/src/handlers/event.rs index f655ae057..a9e784413 100644 --- a/crates/buzz-relay/src/handlers/event.rs +++ b/crates/buzz-relay/src/handlers/event.rs @@ -241,13 +241,53 @@ pub async fn fan_out_pubsub_event(state: &Arc, channel_event: buzz_pub } } -/// Publish a stored event to subscribers and kick off async side effects. +/// Schedule post-commit delivery/side effects for a stored event. +/// +/// This intentionally returns after spawning the post-commit task: NIP-01 `OK` +/// means the event was durably accepted, not that Redis publish, local fan-out, +/// audit enqueue, or workflow triggering have completed. The spawned task still +/// runs the same guarded fan-out path, Redis publish, `mark_local_event` echo +/// dedupe, and delivery metrics as the former inline path. pub(crate) async fn dispatch_persistent_event( tenant: &TenantContext, state: &Arc, stored_event: &StoredEvent, kind_u32: u32, actor_pubkey_hex: &str, +) -> usize { + let tenant = tenant.clone(); + let state = Arc::clone(state); + let stored_event = stored_event.clone(); + let actor_pubkey_hex = actor_pubkey_hex.to_owned(); + let event_id_hex = stored_event.event.id.to_hex(); + + metrics::counter!("buzz_post_commit_dispatch_scheduled_total").increment(1); + tokio::spawn(async move { + let recipients = dispatch_persistent_event_inner( + &tenant, + &state, + &stored_event, + kind_u32, + &actor_pubkey_hex, + ) + .await; + debug!( + event_id = %event_id_hex, + recipients, + "post-commit dispatch complete" + ); + }); + + 0 +} + +/// Run post-commit delivery/side effects for a stored event. +async fn dispatch_persistent_event_inner( + tenant: &TenantContext, + state: &Arc, + stored_event: &StoredEvent, + kind_u32: u32, + actor_pubkey_hex: &str, ) -> usize { // No `crate::conformance` emit here — the spec doesn't have a // separate fan-out action. Acceptance was already recorded at the @@ -285,8 +325,15 @@ pub(crate) async fn dispatch_persistent_event( "Fan-out" ); - let event_json = serde_json::to_string(&stored_event.event) - .expect("nostr::Event serialization is infallible for well-formed events"); + let event_json = match serde_json::to_string(&stored_event.event) { + Ok(json) => json, + Err(e) => { + error!(event_id = %event_id_hex, "Failed to serialize event for fan-out: {e}"); + metrics::counter!("buzz_post_commit_dispatch_errors_total", "stage" => "serialize") + .increment(1); + return 0; + } + }; // For viewer-private snapshots (kind:30622), live fan-out must reach only the // owner — a kindless `ids:[…]` subscription can otherwise match it. Pull paths // (HTTP /query, WS historical) are gated separately by reader_authorized_for_event. @@ -1459,7 +1506,7 @@ mod tests { let event_id_hex = event.id.to_hex(); let stored = StoredEvent::new(event, None); - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &tenant, &state, &stored, @@ -1559,7 +1606,7 @@ mod tests { "test precondition: the two events must have distinct ids" ); - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &ta, &state, &a_stored, @@ -1567,7 +1614,7 @@ mod tests { &actor_hex, ) .await; - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &tb, &state, &b_stored, From 605f7127f7f684b1bf9b6b93f1ff1b52f50d335c Mon Sep 17 00:00:00 2001 From: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Date: Wed, 1 Jul 2026 22:26:23 -0400 Subject: [PATCH 2/3] fix(relay): preserve audit backpressure after W1 Co-authored-by: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Signed-off-by: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> --- crates/buzz-relay/src/handlers/event.rs | 102 ++++++++++++++++-------- 1 file changed, 70 insertions(+), 32 deletions(-) diff --git a/crates/buzz-relay/src/handlers/event.rs b/crates/buzz-relay/src/handlers/event.rs index a9e784413..750018a5c 100644 --- a/crates/buzz-relay/src/handlers/event.rs +++ b/crates/buzz-relay/src/handlers/event.rs @@ -243,11 +243,13 @@ pub async fn fan_out_pubsub_event(state: &Arc, channel_event: buzz_pub /// Schedule post-commit delivery/side effects for a stored event. /// -/// This intentionally returns after spawning the post-commit task: NIP-01 `OK` -/// means the event was durably accepted, not that Redis publish, local fan-out, -/// audit enqueue, or workflow triggering have completed. The spawned task still -/// runs the same guarded fan-out path, Redis publish, `mark_local_event` echo -/// dedupe, and delivery metrics as the former inline path. +/// This intentionally returns after only the bounded audit enqueue has completed: +/// NIP-01 `OK` means the event was durably accepted, not that Redis publish, +/// local fan-out, or workflow triggering have completed. Keeping audit enqueue on +/// the awaited path preserves the bounded-channel backpressure posture when the +/// audit DB is overloaded; the spawned task still runs the same guarded fan-out +/// path, Redis publish, `mark_local_event` echo dedupe, and delivery metrics as +/// the former inline path. pub(crate) async fn dispatch_persistent_event( tenant: &TenantContext, state: &Arc, @@ -255,11 +257,21 @@ pub(crate) async fn dispatch_persistent_event( kind_u32: u32, actor_pubkey_hex: &str, ) -> usize { + let event_id_hex = stored_event.event.id.to_hex(); + enqueue_event_created_audit( + tenant, + state, + stored_event, + kind_u32, + actor_pubkey_hex, + &event_id_hex, + ) + .await; + let tenant = tenant.clone(); let state = Arc::clone(state); let stored_event = stored_event.clone(); let actor_pubkey_hex = actor_pubkey_hex.to_owned(); - let event_id_hex = stored_event.event.id.to_hex(); metrics::counter!("buzz_post_commit_dispatch_scheduled_total").increment(1); tokio::spawn(async move { @@ -269,6 +281,7 @@ pub(crate) async fn dispatch_persistent_event( &stored_event, kind_u32, &actor_pubkey_hex, + false, ) .await; debug!( @@ -288,6 +301,7 @@ async fn dispatch_persistent_event_inner( stored_event: &StoredEvent, kind_u32: u32, actor_pubkey_hex: &str, + enqueue_audit: bool, ) -> usize { // No `crate::conformance` emit here — the spec doesn't have a // separate fan-out action. Acceptance was already recorded at the @@ -379,32 +393,16 @@ async fn dispatch_persistent_event_inner( // out-of-band index to feed. The old Typesense `index_event` worker and its // `search_index_tx` mpsc are gone with the Typesense backend. - // Audit via bounded channel (capacity 1000). Uses .send().await so entries - // are never silently dropped — backpressure propagates to the event handler - // if the queue is full. This is intentional: the audit advisory lock already - // serializes writes (at most 1 in-flight), so a full queue means the audit - // DB is genuinely overloaded and the relay should slow down rather than - // accumulate unbounded in-memory state. DB write failures in the worker are - // logged but not retried (same as the previous per-event tokio::spawn). - let audit_entry = buzz_audit::NewAuditEntry { - community_id: tenant.community(), - action: buzz_audit::AuditAction::EventCreated, - // Record the *actor* the caller resolved (authenticated principal for - // ingest, triggering user for workflow posts), not `stored_event.event - // .pubkey`. For relay-signed events (workflow sink, side-effect emits) - // the claimed author is the relay key, so deriving from the event would - // erase the human behind the action from the audit trail. This mirrors - // the pre-rewrite semantics, ported to the raw-bytes column. - actor_pubkey: hex::decode(actor_pubkey_hex).ok(), - object_id: Some(event_id_hex.clone()), - detail: serde_json::json!({ - "event_kind": kind_u32, - "channel_id": stored_event.channel_id, - }), - }; - if let Err(e) = state.audit_tx.send(audit_entry).await { - error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}"); - metrics::counter!("buzz_audit_send_errors_total").increment(1); + if enqueue_audit { + enqueue_event_created_audit( + tenant, + state, + stored_event, + kind_u32, + actor_pubkey_hex, + &event_id_hex, + ) + .await; } // Skip workflow triggering for workflow-execution kinds and relay-signed workflow messages. @@ -445,6 +443,43 @@ async fn dispatch_persistent_event_inner( matches.len() } +async fn enqueue_event_created_audit( + tenant: &TenantContext, + state: &Arc, + stored_event: &StoredEvent, + kind_u32: u32, + actor_pubkey_hex: &str, + event_id_hex: &str, +) { + // Audit via bounded channel (capacity 1000). Uses .send().await so entries + // are never silently dropped — backpressure propagates to the event handler + // if the queue is full. This is intentional: the audit advisory lock already + // serializes writes (at most 1 in-flight), so a full queue means the audit + // DB is genuinely overloaded and the relay should slow down rather than + // accumulate unbounded in-memory state. DB write failures in the worker are + // logged but not retried (same as the previous per-event tokio::spawn). + let audit_entry = buzz_audit::NewAuditEntry { + community_id: tenant.community(), + action: buzz_audit::AuditAction::EventCreated, + // Record the *actor* the caller resolved (authenticated principal for + // ingest, triggering user for workflow posts), not `stored_event.event + // .pubkey`. For relay-signed events (workflow sink, side-effect emits) + // the claimed author is the relay key, so deriving from the event would + // erase the human behind the action from the audit trail. This mirrors + // the pre-rewrite semantics, ported to the raw-bytes column. + actor_pubkey: hex::decode(actor_pubkey_hex).ok(), + object_id: Some(event_id_hex.to_owned()), + detail: serde_json::json!({ + "event_kind": kind_u32, + "channel_id": stored_event.channel_id, + }), + }; + if let Err(e) = state.audit_tx.send(audit_entry).await { + error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}"); + metrics::counter!("buzz_audit_send_errors_total").increment(1); + } +} + /// Handle an EVENT message from a WebSocket connection. /// /// Extracts auth from the WS connection, dispatches ephemeral events locally, @@ -1512,6 +1547,7 @@ mod tests { &stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; @@ -1612,6 +1648,7 @@ mod tests { &a_stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; super::super::dispatch_persistent_event_inner( @@ -1620,6 +1657,7 @@ mod tests { &b_stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; From c61b4c149fe30f26e060370fea645412986adf93 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Wed, 1 Jul 2026 22:11:35 -0400 Subject: [PATCH 3/3] perf(relay): share event with sig-verify task via Arc instead of deep clone verify_event only needs a borrow; spawn_blocking only needs 'static. Wrapping the event in an Arc avoids deep-cloning tags + up to 256 KB of content on every ingest. After verification the Arc is uniquely held (the verify task's clone is dropped on completion), so try_unwrap returns the original event; the clone fallback is unreachable in practice but keeps the code total. W8 in PLANS/RELAY_PERF_OPTIMIZATION_PLAN.md; ruled GREEN (no invariant touched) in RESEARCH/RELAY_PERF_CORRECTNESS.md. cargo test -p buzz-relay: 436 passed, 0 failed. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-relay/src/handlers/ingest.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/buzz-relay/src/handlers/ingest.rs b/crates/buzz-relay/src/handlers/ingest.rs index 057ea972f..8412eec6f 100644 --- a/crates/buzz-relay/src/handlers/ingest.rs +++ b/crates/buzz-relay/src/handlers/ingest.rs @@ -1244,8 +1244,13 @@ async fn ingest_event_inner( return Err(IngestError::Rejected("restricted: relay-only kind".into())); } - let event_clone = event.clone(); - let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_clone)).await; + // Share the event with the verify task via Arc instead of deep-cloning it + // (tags + up to 256 KB of content). spawn_blocking only needs 'static, not + // ownership; once it completes its Arc is dropped, so try_unwrap returns + // the original event without ever having copied it. + let event = std::sync::Arc::new(event); + let event_for_verify = std::sync::Arc::clone(&event); + let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_for_verify)).await; match verify_result { Ok(Ok(())) => {} Ok(Err(e)) => { @@ -1258,6 +1263,7 @@ async fn ingest_event_inner( )); } } + let event = std::sync::Arc::try_unwrap(event).unwrap_or_else(|arc| (*arc).clone()); const MAX_TIMESTAMP_DRIFT_SECS: i64 = 900; // ±15 minutes let now = chrono::Utc::now().timestamp();