diff --git a/crates/buzz-relay/src/handlers/event.rs b/crates/buzz-relay/src/handlers/event.rs index f655ae057..750018a5c 100644 --- a/crates/buzz-relay/src/handlers/event.rs +++ b/crates/buzz-relay/src/handlers/event.rs @@ -241,13 +241,67 @@ pub async fn fan_out_pubsub_event(state: &Arc, channel_event: buzz_pub } } -/// Publish a stored event to subscribers and kick off async side effects. +/// Schedule post-commit delivery/side effects for a stored event. +/// +/// This intentionally returns after only the bounded audit enqueue has completed: +/// NIP-01 `OK` means the event was durably accepted, not that Redis publish, +/// local fan-out, or workflow triggering have completed. Keeping audit enqueue on +/// the awaited path preserves the bounded-channel backpressure posture when the +/// audit DB is overloaded; the spawned task still runs the same guarded fan-out +/// path, Redis publish, `mark_local_event` echo dedupe, and delivery metrics as +/// the former inline path. pub(crate) async fn dispatch_persistent_event( tenant: &TenantContext, state: &Arc, stored_event: &StoredEvent, kind_u32: u32, actor_pubkey_hex: &str, +) -> usize { + let event_id_hex = stored_event.event.id.to_hex(); + enqueue_event_created_audit( + tenant, + state, + stored_event, + kind_u32, + actor_pubkey_hex, + &event_id_hex, + ) + .await; + + let tenant = tenant.clone(); + let state = Arc::clone(state); + let stored_event = stored_event.clone(); + let actor_pubkey_hex = actor_pubkey_hex.to_owned(); + + metrics::counter!("buzz_post_commit_dispatch_scheduled_total").increment(1); + tokio::spawn(async move { + let recipients = dispatch_persistent_event_inner( + &tenant, + &state, + &stored_event, + kind_u32, + &actor_pubkey_hex, + false, + ) + .await; + debug!( + event_id = %event_id_hex, + recipients, + "post-commit dispatch complete" + ); + }); + + 0 +} + +/// Run post-commit delivery/side effects for a stored event. +async fn dispatch_persistent_event_inner( + tenant: &TenantContext, + state: &Arc, + stored_event: &StoredEvent, + kind_u32: u32, + actor_pubkey_hex: &str, + enqueue_audit: bool, ) -> usize { // No `crate::conformance` emit here — the spec doesn't have a // separate fan-out action. Acceptance was already recorded at the @@ -285,8 +339,15 @@ pub(crate) async fn dispatch_persistent_event( "Fan-out" ); - let event_json = serde_json::to_string(&stored_event.event) - .expect("nostr::Event serialization is infallible for well-formed events"); + let event_json = match serde_json::to_string(&stored_event.event) { + Ok(json) => json, + Err(e) => { + error!(event_id = %event_id_hex, "Failed to serialize event for fan-out: {e}"); + metrics::counter!("buzz_post_commit_dispatch_errors_total", "stage" => "serialize") + .increment(1); + return 0; + } + }; // For viewer-private snapshots (kind:30622), live fan-out must reach only the // owner — a kindless `ids:[…]` subscription can otherwise match it. Pull paths // (HTTP /query, WS historical) are gated separately by reader_authorized_for_event. @@ -332,32 +393,16 @@ pub(crate) async fn dispatch_persistent_event( // out-of-band index to feed. The old Typesense `index_event` worker and its // `search_index_tx` mpsc are gone with the Typesense backend. - // Audit via bounded channel (capacity 1000). Uses .send().await so entries - // are never silently dropped — backpressure propagates to the event handler - // if the queue is full. This is intentional: the audit advisory lock already - // serializes writes (at most 1 in-flight), so a full queue means the audit - // DB is genuinely overloaded and the relay should slow down rather than - // accumulate unbounded in-memory state. DB write failures in the worker are - // logged but not retried (same as the previous per-event tokio::spawn). - let audit_entry = buzz_audit::NewAuditEntry { - community_id: tenant.community(), - action: buzz_audit::AuditAction::EventCreated, - // Record the *actor* the caller resolved (authenticated principal for - // ingest, triggering user for workflow posts), not `stored_event.event - // .pubkey`. For relay-signed events (workflow sink, side-effect emits) - // the claimed author is the relay key, so deriving from the event would - // erase the human behind the action from the audit trail. This mirrors - // the pre-rewrite semantics, ported to the raw-bytes column. - actor_pubkey: hex::decode(actor_pubkey_hex).ok(), - object_id: Some(event_id_hex.clone()), - detail: serde_json::json!({ - "event_kind": kind_u32, - "channel_id": stored_event.channel_id, - }), - }; - if let Err(e) = state.audit_tx.send(audit_entry).await { - error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}"); - metrics::counter!("buzz_audit_send_errors_total").increment(1); + if enqueue_audit { + enqueue_event_created_audit( + tenant, + state, + stored_event, + kind_u32, + actor_pubkey_hex, + &event_id_hex, + ) + .await; } // Skip workflow triggering for workflow-execution kinds and relay-signed workflow messages. @@ -398,6 +443,43 @@ pub(crate) async fn dispatch_persistent_event( matches.len() } +async fn enqueue_event_created_audit( + tenant: &TenantContext, + state: &Arc, + stored_event: &StoredEvent, + kind_u32: u32, + actor_pubkey_hex: &str, + event_id_hex: &str, +) { + // Audit via bounded channel (capacity 1000). Uses .send().await so entries + // are never silently dropped — backpressure propagates to the event handler + // if the queue is full. This is intentional: the audit advisory lock already + // serializes writes (at most 1 in-flight), so a full queue means the audit + // DB is genuinely overloaded and the relay should slow down rather than + // accumulate unbounded in-memory state. DB write failures in the worker are + // logged but not retried (same as the previous per-event tokio::spawn). + let audit_entry = buzz_audit::NewAuditEntry { + community_id: tenant.community(), + action: buzz_audit::AuditAction::EventCreated, + // Record the *actor* the caller resolved (authenticated principal for + // ingest, triggering user for workflow posts), not `stored_event.event + // .pubkey`. For relay-signed events (workflow sink, side-effect emits) + // the claimed author is the relay key, so deriving from the event would + // erase the human behind the action from the audit trail. This mirrors + // the pre-rewrite semantics, ported to the raw-bytes column. + actor_pubkey: hex::decode(actor_pubkey_hex).ok(), + object_id: Some(event_id_hex.to_owned()), + detail: serde_json::json!({ + "event_kind": kind_u32, + "channel_id": stored_event.channel_id, + }), + }; + if let Err(e) = state.audit_tx.send(audit_entry).await { + error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}"); + metrics::counter!("buzz_audit_send_errors_total").increment(1); + } +} + /// Handle an EVENT message from a WebSocket connection. /// /// Extracts auth from the WS connection, dispatches ephemeral events locally, @@ -1459,12 +1541,13 @@ mod tests { let event_id_hex = event.id.to_hex(); let stored = StoredEvent::new(event, None); - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &tenant, &state, &stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; @@ -1559,20 +1642,22 @@ mod tests { "test precondition: the two events must have distinct ids" ); - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &ta, &state, &a_stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; - super::super::dispatch_persistent_event( + super::super::dispatch_persistent_event_inner( &tb, &state, &b_stored, KIND_PRESENCE_UPDATE, &actor_hex, + true, ) .await; diff --git a/crates/buzz-relay/src/handlers/ingest.rs b/crates/buzz-relay/src/handlers/ingest.rs index 057ea972f..8412eec6f 100644 --- a/crates/buzz-relay/src/handlers/ingest.rs +++ b/crates/buzz-relay/src/handlers/ingest.rs @@ -1244,8 +1244,13 @@ async fn ingest_event_inner( return Err(IngestError::Rejected("restricted: relay-only kind".into())); } - let event_clone = event.clone(); - let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_clone)).await; + // Share the event with the verify task via Arc instead of deep-cloning it + // (tags + up to 256 KB of content). spawn_blocking only needs 'static, not + // ownership; once it completes its Arc is dropped, so try_unwrap returns + // the original event without ever having copied it. + let event = std::sync::Arc::new(event); + let event_for_verify = std::sync::Arc::clone(&event); + let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_for_verify)).await; match verify_result { Ok(Ok(())) => {} Ok(Err(e)) => { @@ -1258,6 +1263,7 @@ async fn ingest_event_inner( )); } } + let event = std::sync::Arc::try_unwrap(event).unwrap_or_else(|arc| (*arc).clone()); const MAX_TIMESTAMP_DRIFT_SECS: i64 = 900; // ±15 minutes let now = chrono::Utc::now().timestamp();