Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions crates/buzz-workflow/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,80 @@ fn resolve_send_message_channel(
Ok(trigger_channel.trim().to_string())
}

/// Re-check a channel workflow owner's membership before an outbound webhook.
///
/// Workflow definitions outlive the event that created them. Without this
/// check, removing a member from a private channel would not stop a workflow
/// they created from sending future message contents to an external endpoint.
/// The DB read is intentionally done at action time instead of relying on the
/// relay membership cache so a completed removal takes effect before the next
/// outbound request.
async fn revalidate_webhook_owner_membership(
engine: &WorkflowEngine,
community_id: CommunityId,
run_id: Uuid,
) -> Result<(), WorkflowError> {
let wf_run = engine
.db
.get_workflow_run(community_id, run_id)
.await
.map_err(|e| {
WorkflowError::WebhookError(format!(
"CallWebhook: failed to load workflow run {run_id}: {e}"
))
})?;
let workflow = engine
.db
.get_workflow(community_id, wf_run.workflow_id)
.await
.map_err(|e| {
WorkflowError::WebhookError(format!(
"CallWebhook: failed to load workflow {}: {e}",
wf_run.workflow_id
))
})?;

let Some(channel_id) = workflow.channel_id else {
return Ok(());
};

let owner_is_member = engine
.db
.is_member(community_id, channel_id, &workflow.owner_pubkey)
.await
.map_err(|e| {
WorkflowError::WebhookError(format!(
"CallWebhook: failed to re-check workflow owner membership: {e}"
))
})?;

if owner_is_member {
return Ok(());
}

if let Err(e) = engine
.db
.set_workflow_enabled(community_id, workflow.id, false)
.await
{
warn!(
workflow_id = %workflow.id,
channel_id = %channel_id,
"CallWebhook: failed to disable workflow after owner membership loss: {e}"
);
} else {
info!(
workflow_id = %workflow.id,
channel_id = %channel_id,
"CallWebhook: disabled workflow after owner membership loss"
);
}

Err(WorkflowError::WebhookError(format!(
"CallWebhook: workflow owner is no longer a member of channel {channel_id}"
)))
}

/// Dispatch a resolved action and return its output.
///
/// For MVP, most actions log their intent and return a success output.
Expand Down Expand Up @@ -636,6 +710,8 @@ pub async fn dispatch_action(
headers,
body,
} => {
revalidate_webhook_owner_membership(engine, community_id, run_id).await?;

let method_str = method.as_deref().unwrap_or("POST");
info!(run_id = %run_id, step = step_id, "CallWebhook → {method_str} {url}");

Expand Down Expand Up @@ -1230,6 +1306,9 @@ async fn execute_steps(
#[cfg(test)]
mod tests {
use super::*;
use buzz_db::channel::{ChannelType, ChannelVisibility, MemberRole};
use buzz_db::{Db, DbConfig};
use nostr::Keys;
use serde_json::json;

fn make_trigger() -> TriggerContext {
Expand Down Expand Up @@ -1835,4 +1914,130 @@ mod tests {
.expect("override should be accepted");
assert_eq!(resolved, override_channel_id.to_string());
}

async fn setup_webhook_membership_test(
) -> (WorkflowEngine, CommunityId, Uuid, Uuid, Vec<u8>, Vec<u8>) {
let database_url = std::env::var("BUZZ_TEST_DATABASE_URL")
.or_else(|_| std::env::var("DATABASE_URL"))
.unwrap_or_else(|_| "postgres://buzz:buzz_dev@localhost:5432/buzz".to_owned());
let db = Db::new(&DbConfig {
database_url,
min_connections: 0,
..DbConfig::default()
})
.await
.expect("connect to test DB");
db.migrate().await.expect("run migrations");

let community = db
.ensure_configured_community(&format!(
"workflow-webhook-membership-{}.example",
Uuid::new_v4().simple()
))
.await
.expect("create test community")
.id;
let channel_owner = Keys::generate().public_key().to_bytes().to_vec();
let workflow_owner = Keys::generate().public_key().to_bytes().to_vec();
db.ensure_user(community, &channel_owner)
.await
.expect("create channel owner");
db.ensure_user(community, &workflow_owner)
.await
.expect("create workflow owner");

let channel = db
.create_channel(
community,
"workflow-webhook-membership",
ChannelType::Stream,
ChannelVisibility::Private,
None,
&channel_owner,
None,
)
.await
.expect("create channel");
db.add_member(
community,
channel.id,
&workflow_owner,
MemberRole::Member,
Some(&channel_owner),
)
.await
.expect("add workflow owner");

let workflow_id = db
.create_workflow(
community,
Some(channel.id),
&workflow_owner,
"webhook-membership-test",
r#"{"trigger":{"on":"message_posted"},"steps":[]}"#,
&[0u8; 32],
)
.await
.expect("create workflow");
let run_id = db
.create_workflow_run(community, workflow_id, None, None)
.await
.expect("create workflow run");

(
WorkflowEngine::new(db, crate::WorkflowConfig::default()),
community,
channel.id,
run_id,
channel_owner,
workflow_owner,
)
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn webhook_owner_membership_revalidation_allows_current_member() {
let (engine, community, _channel_id, run_id, _channel_owner, _workflow_owner) =
setup_webhook_membership_test().await;

revalidate_webhook_owner_membership(&engine, community, run_id)
.await
.expect("current member should keep webhook access");
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn webhook_owner_membership_revalidation_disables_removed_owner() {
let (engine, community, channel_id, run_id, channel_owner, workflow_owner) =
setup_webhook_membership_test().await;
engine
.db
.remove_member(community, channel_id, &workflow_owner, &channel_owner)
.await
.expect("remove workflow owner");

let err = revalidate_webhook_owner_membership(&engine, community, run_id)
.await
.expect_err("removed owner must not keep webhook access");
assert!(
err.to_string().contains("no longer a member"),
"unexpected error: {err}"
);

let workflow_id = engine
.db
.get_workflow_run(community, run_id)
.await
.expect("load run")
.workflow_id;
let workflow = engine
.db
.get_workflow(community, workflow_id)
.await
.expect("load workflow");
assert!(
!workflow.enabled,
"workflow should be disabled after owner removal"
);
}
}
Loading