From b295eec380fc4c7cae4ff9bab03a8e023fd6b689 Mon Sep 17 00:00:00 2001 From: Jordan Mecom Date: Mon, 29 Jun 2026 16:41:01 -0700 Subject: [PATCH] Revalidate workflow owners before webhooks --- crates/buzz-workflow/src/executor.rs | 205 +++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/crates/buzz-workflow/src/executor.rs b/crates/buzz-workflow/src/executor.rs index feceec07b..12e5b1840 100644 --- a/crates/buzz-workflow/src/executor.rs +++ b/crates/buzz-workflow/src/executor.rs @@ -523,6 +523,80 @@ fn resolve_send_message_channel( Ok(trigger_channel.trim().to_string()) } +/// Re-check a channel workflow owner's membership before an outbound webhook. +/// +/// Workflow definitions outlive the event that created them. Without this +/// check, removing a member from a private channel would not stop a workflow +/// they created from sending future message contents to an external endpoint. +/// The DB read is intentionally done at action time instead of relying on the +/// relay membership cache so a completed removal takes effect before the next +/// outbound request. +async fn revalidate_webhook_owner_membership( + engine: &WorkflowEngine, + community_id: CommunityId, + run_id: Uuid, +) -> Result<(), WorkflowError> { + let wf_run = engine + .db + .get_workflow_run(community_id, run_id) + .await + .map_err(|e| { + WorkflowError::WebhookError(format!( + "CallWebhook: failed to load workflow run {run_id}: {e}" + )) + })?; + let workflow = engine + .db + .get_workflow(community_id, wf_run.workflow_id) + .await + .map_err(|e| { + WorkflowError::WebhookError(format!( + "CallWebhook: failed to load workflow {}: {e}", + wf_run.workflow_id + )) + })?; + + let Some(channel_id) = workflow.channel_id else { + return Ok(()); + }; + + let owner_is_member = engine + .db + .is_member(community_id, channel_id, &workflow.owner_pubkey) + .await + .map_err(|e| { + WorkflowError::WebhookError(format!( + "CallWebhook: failed to re-check workflow owner membership: {e}" + )) + })?; + + if owner_is_member { + return Ok(()); + } + + if let Err(e) = engine + .db + .set_workflow_enabled(community_id, workflow.id, false) + .await + { + warn!( + workflow_id = %workflow.id, + channel_id = %channel_id, + "CallWebhook: failed to disable workflow after owner membership loss: {e}" + ); + } else { + info!( + workflow_id = %workflow.id, + channel_id = %channel_id, + "CallWebhook: disabled workflow after owner membership loss" + ); + } + + Err(WorkflowError::WebhookError(format!( + "CallWebhook: workflow owner is no longer a member of channel {channel_id}" + ))) +} + /// Dispatch a resolved action and return its output. /// /// For MVP, most actions log their intent and return a success output. @@ -636,6 +710,8 @@ pub async fn dispatch_action( headers, body, } => { + revalidate_webhook_owner_membership(engine, community_id, run_id).await?; + let method_str = method.as_deref().unwrap_or("POST"); info!(run_id = %run_id, step = step_id, "CallWebhook → {method_str} {url}"); @@ -1230,6 +1306,9 @@ async fn execute_steps( #[cfg(test)] mod tests { use super::*; + use buzz_db::channel::{ChannelType, ChannelVisibility, MemberRole}; + use buzz_db::{Db, DbConfig}; + use nostr::Keys; use serde_json::json; fn make_trigger() -> TriggerContext { @@ -1835,4 +1914,130 @@ mod tests { .expect("override should be accepted"); assert_eq!(resolved, override_channel_id.to_string()); } + + async fn setup_webhook_membership_test( + ) -> (WorkflowEngine, CommunityId, Uuid, Uuid, Vec, Vec) { + let database_url = std::env::var("BUZZ_TEST_DATABASE_URL") + .or_else(|_| std::env::var("DATABASE_URL")) + .unwrap_or_else(|_| "postgres://buzz:buzz_dev@localhost:5432/buzz".to_owned()); + let db = Db::new(&DbConfig { + database_url, + min_connections: 0, + ..DbConfig::default() + }) + .await + .expect("connect to test DB"); + db.migrate().await.expect("run migrations"); + + let community = db + .ensure_configured_community(&format!( + "workflow-webhook-membership-{}.example", + Uuid::new_v4().simple() + )) + .await + .expect("create test community") + .id; + let channel_owner = Keys::generate().public_key().to_bytes().to_vec(); + let workflow_owner = Keys::generate().public_key().to_bytes().to_vec(); + db.ensure_user(community, &channel_owner) + .await + .expect("create channel owner"); + db.ensure_user(community, &workflow_owner) + .await + .expect("create workflow owner"); + + let channel = db + .create_channel( + community, + "workflow-webhook-membership", + ChannelType::Stream, + ChannelVisibility::Private, + None, + &channel_owner, + None, + ) + .await + .expect("create channel"); + db.add_member( + community, + channel.id, + &workflow_owner, + MemberRole::Member, + Some(&channel_owner), + ) + .await + .expect("add workflow owner"); + + let workflow_id = db + .create_workflow( + community, + Some(channel.id), + &workflow_owner, + "webhook-membership-test", + r#"{"trigger":{"on":"message_posted"},"steps":[]}"#, + &[0u8; 32], + ) + .await + .expect("create workflow"); + let run_id = db + .create_workflow_run(community, workflow_id, None, None) + .await + .expect("create workflow run"); + + ( + WorkflowEngine::new(db, crate::WorkflowConfig::default()), + community, + channel.id, + run_id, + channel_owner, + workflow_owner, + ) + } + + #[tokio::test] + #[ignore = "requires Postgres"] + async fn webhook_owner_membership_revalidation_allows_current_member() { + let (engine, community, _channel_id, run_id, _channel_owner, _workflow_owner) = + setup_webhook_membership_test().await; + + revalidate_webhook_owner_membership(&engine, community, run_id) + .await + .expect("current member should keep webhook access"); + } + + #[tokio::test] + #[ignore = "requires Postgres"] + async fn webhook_owner_membership_revalidation_disables_removed_owner() { + let (engine, community, channel_id, run_id, channel_owner, workflow_owner) = + setup_webhook_membership_test().await; + engine + .db + .remove_member(community, channel_id, &workflow_owner, &channel_owner) + .await + .expect("remove workflow owner"); + + let err = revalidate_webhook_owner_membership(&engine, community, run_id) + .await + .expect_err("removed owner must not keep webhook access"); + assert!( + err.to_string().contains("no longer a member"), + "unexpected error: {err}" + ); + + let workflow_id = engine + .db + .get_workflow_run(community, run_id) + .await + .expect("load run") + .workflow_id; + let workflow = engine + .db + .get_workflow(community, workflow_id) + .await + .expect("load workflow"); + assert!( + !workflow.enabled, + "workflow should be disabled after owner removal" + ); + } }