Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 117 additions & 32 deletions crates/buzz-relay/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,67 @@ pub async fn fan_out_pubsub_event(state: &Arc<AppState>, channel_event: buzz_pub
}
}

/// Publish a stored event to subscribers and kick off async side effects.
/// Schedule post-commit delivery/side effects for a stored event.
///
/// This intentionally returns after only the bounded audit enqueue has completed:
/// NIP-01 `OK` means the event was durably accepted, not that Redis publish,
/// local fan-out, or workflow triggering have completed. Keeping audit enqueue on
/// the awaited path preserves the bounded-channel backpressure posture when the
/// audit DB is overloaded; the spawned task still runs the same guarded fan-out
/// path, Redis publish, `mark_local_event` echo dedupe, and delivery metrics as
/// the former inline path.
pub(crate) async fn dispatch_persistent_event(
tenant: &TenantContext,
state: &Arc<AppState>,
stored_event: &StoredEvent,
kind_u32: u32,
actor_pubkey_hex: &str,
) -> usize {
let event_id_hex = stored_event.event.id.to_hex();
enqueue_event_created_audit(
tenant,
state,
stored_event,
kind_u32,
actor_pubkey_hex,
&event_id_hex,
)
.await;

let tenant = tenant.clone();
let state = Arc::clone(state);
let stored_event = stored_event.clone();
let actor_pubkey_hex = actor_pubkey_hex.to_owned();

metrics::counter!("buzz_post_commit_dispatch_scheduled_total").increment(1);
tokio::spawn(async move {
let recipients = dispatch_persistent_event_inner(
&tenant,
&state,
&stored_event,
kind_u32,
&actor_pubkey_hex,
false,
)
.await;
debug!(
event_id = %event_id_hex,
recipients,
"post-commit dispatch complete"
);
});

0
}

/// Run post-commit delivery/side effects for a stored event.
async fn dispatch_persistent_event_inner(
tenant: &TenantContext,
state: &Arc<AppState>,
stored_event: &StoredEvent,
kind_u32: u32,
actor_pubkey_hex: &str,
enqueue_audit: bool,
) -> usize {
// No `crate::conformance` emit here — the spec doesn't have a
// separate fan-out action. Acceptance was already recorded at the
Expand Down Expand Up @@ -285,8 +339,15 @@ pub(crate) async fn dispatch_persistent_event(
"Fan-out"
);

let event_json = serde_json::to_string(&stored_event.event)
.expect("nostr::Event serialization is infallible for well-formed events");
let event_json = match serde_json::to_string(&stored_event.event) {
Ok(json) => json,
Err(e) => {
error!(event_id = %event_id_hex, "Failed to serialize event for fan-out: {e}");
metrics::counter!("buzz_post_commit_dispatch_errors_total", "stage" => "serialize")
.increment(1);
return 0;
}
};
// For viewer-private snapshots (kind:30622), live fan-out must reach only the
// owner — a kindless `ids:[…]` subscription can otherwise match it. Pull paths
// (HTTP /query, WS historical) are gated separately by reader_authorized_for_event.
Expand Down Expand Up @@ -332,32 +393,16 @@ pub(crate) async fn dispatch_persistent_event(
// out-of-band index to feed. The old Typesense `index_event` worker and its
// `search_index_tx` mpsc are gone with the Typesense backend.

// Audit via bounded channel (capacity 1000). Uses .send().await so entries
// are never silently dropped — backpressure propagates to the event handler
// if the queue is full. This is intentional: the audit advisory lock already
// serializes writes (at most 1 in-flight), so a full queue means the audit
// DB is genuinely overloaded and the relay should slow down rather than
// accumulate unbounded in-memory state. DB write failures in the worker are
// logged but not retried (same as the previous per-event tokio::spawn).
let audit_entry = buzz_audit::NewAuditEntry {
community_id: tenant.community(),
action: buzz_audit::AuditAction::EventCreated,
// Record the *actor* the caller resolved (authenticated principal for
// ingest, triggering user for workflow posts), not `stored_event.event
// .pubkey`. For relay-signed events (workflow sink, side-effect emits)
// the claimed author is the relay key, so deriving from the event would
// erase the human behind the action from the audit trail. This mirrors
// the pre-rewrite semantics, ported to the raw-bytes column.
actor_pubkey: hex::decode(actor_pubkey_hex).ok(),
object_id: Some(event_id_hex.clone()),
detail: serde_json::json!({
"event_kind": kind_u32,
"channel_id": stored_event.channel_id,
}),
};
if let Err(e) = state.audit_tx.send(audit_entry).await {
error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}");
metrics::counter!("buzz_audit_send_errors_total").increment(1);
if enqueue_audit {
enqueue_event_created_audit(
tenant,
state,
stored_event,
kind_u32,
actor_pubkey_hex,
&event_id_hex,
)
.await;
}

// Skip workflow triggering for workflow-execution kinds and relay-signed workflow messages.
Expand Down Expand Up @@ -398,6 +443,43 @@ pub(crate) async fn dispatch_persistent_event(
matches.len()
}

async fn enqueue_event_created_audit(
tenant: &TenantContext,
state: &Arc<AppState>,
stored_event: &StoredEvent,
kind_u32: u32,
actor_pubkey_hex: &str,
event_id_hex: &str,
) {
// Audit via bounded channel (capacity 1000). Uses .send().await so entries
// are never silently dropped — backpressure propagates to the event handler
// if the queue is full. This is intentional: the audit advisory lock already
// serializes writes (at most 1 in-flight), so a full queue means the audit
// DB is genuinely overloaded and the relay should slow down rather than
// accumulate unbounded in-memory state. DB write failures in the worker are
// logged but not retried (same as the previous per-event tokio::spawn).
let audit_entry = buzz_audit::NewAuditEntry {
community_id: tenant.community(),
action: buzz_audit::AuditAction::EventCreated,
// Record the *actor* the caller resolved (authenticated principal for
// ingest, triggering user for workflow posts), not `stored_event.event
// .pubkey`. For relay-signed events (workflow sink, side-effect emits)
// the claimed author is the relay key, so deriving from the event would
// erase the human behind the action from the audit trail. This mirrors
// the pre-rewrite semantics, ported to the raw-bytes column.
actor_pubkey: hex::decode(actor_pubkey_hex).ok(),
object_id: Some(event_id_hex.to_owned()),
detail: serde_json::json!({
"event_kind": kind_u32,
"channel_id": stored_event.channel_id,
}),
};
if let Err(e) = state.audit_tx.send(audit_entry).await {
error!(event_id = %event_id_hex, "Audit channel closed — entry lost: {e}");
metrics::counter!("buzz_audit_send_errors_total").increment(1);
}
}

/// Handle an EVENT message from a WebSocket connection.
///
/// Extracts auth from the WS connection, dispatches ephemeral events locally,
Expand Down Expand Up @@ -1459,12 +1541,13 @@ mod tests {
let event_id_hex = event.id.to_hex();
let stored = StoredEvent::new(event, None);

super::super::dispatch_persistent_event(
super::super::dispatch_persistent_event_inner(
&tenant,
&state,
&stored,
KIND_PRESENCE_UPDATE,
&actor_hex,
true,
)
.await;

Expand Down Expand Up @@ -1559,20 +1642,22 @@ mod tests {
"test precondition: the two events must have distinct ids"
);

super::super::dispatch_persistent_event(
super::super::dispatch_persistent_event_inner(
&ta,
&state,
&a_stored,
KIND_PRESENCE_UPDATE,
&actor_hex,
true,
)
.await;
super::super::dispatch_persistent_event(
super::super::dispatch_persistent_event_inner(
&tb,
&state,
&b_stored,
KIND_PRESENCE_UPDATE,
&actor_hex,
true,
)
.await;

Expand Down
10 changes: 8 additions & 2 deletions crates/buzz-relay/src/handlers/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,13 @@ async fn ingest_event_inner(
return Err(IngestError::Rejected("restricted: relay-only kind".into()));
}

let event_clone = event.clone();
let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_clone)).await;
// Share the event with the verify task via Arc instead of deep-cloning it
// (tags + up to 256 KB of content). spawn_blocking only needs 'static, not
// ownership; once it completes its Arc is dropped, so try_unwrap returns
// the original event without ever having copied it.
let event = std::sync::Arc::new(event);
let event_for_verify = std::sync::Arc::clone(&event);
let verify_result = tokio::task::spawn_blocking(move || verify_event(&event_for_verify)).await;
match verify_result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
Expand All @@ -1258,6 +1263,7 @@ async fn ingest_event_inner(
));
}
}
let event = std::sync::Arc::try_unwrap(event).unwrap_or_else(|arc| (*arc).clone());

const MAX_TIMESTAMP_DRIFT_SECS: i64 = 900; // ±15 minutes
let now = chrono::Utc::now().timestamp();
Expand Down
Loading