diff --git a/AGENTS.md b/AGENTS.md index 3eadfd1..8ca3f7c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,7 +19,7 @@ If code and docs disagree, stop and reconcile them instead of guessing. - WorkGraph is not a generic agent runtime, generic workflow builder, generic task tracker, or generic memory layer. - The context graph is first-class and typed. Wiki-links are one edge source, not the graph definition. - The ledger is both audit trail and durable event stream. -- Triggers are core infrastructure, even when the current foundation pass only yields durable planned follow-up actions. +- Triggers are core infrastructure, even when the current phase only yields durable planned follow-up actions rather than live execution. - Threads are evidence-bearing coordination units, not chat logs or loose tasks. - Missions coordinate related work. Runs capture one execution instance. Triggers yield planned follow-up actions. - The actor model must scale to hundreds or thousands of actors while allowing opaque subactor lineages. @@ -82,12 +82,12 @@ The CLI (`wg-cli`) is the **default interface** for all agents with shell access - `workgraph status` should expose graph hygiene and evidence gaps, not only counts. - `workgraph show` should render coordination primitives in a way that makes their contracts obvious to humans and agents. -## Out Of Scope For This Foundation Pass +## Out Of Scope For This Phase - live trigger execution loops -- webhook ingress runtime +- webhook ingress HTTP runtime - remote MCP/API server implementation - approval workflow execution - ergonomic nested authoring flows beyond direct markdown editing -Those are later layers. The foundation pass exists to make those future layers disciplined rather than improvised. +Those are later layers. The current trigger-plane expansion exists to make those future layers disciplined rather than improvised. diff --git a/Cargo.lock b/Cargo.lock index fcbb85b..5c2556b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ name = "wg-adapter-webhook" version = "0.1.0" dependencies = [ "wg-adapter-api", + "wg-types", ] [[package]] @@ -993,6 +994,7 @@ name = "wg-cli" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", "serde", "serde_json", @@ -1011,6 +1013,7 @@ dependencies = [ "wg-registry", "wg-store", "wg-thread", + "wg-trigger", "wg-types", ] @@ -1024,6 +1027,10 @@ dependencies = [ [[package]] name = "wg-connector-api" version = "0.1.0" +dependencies = [ + "chrono", + "wg-types", +] [[package]] name = "wg-connector-github" @@ -1046,6 +1053,7 @@ dependencies = [ "wg-paths", "wg-policy", "wg-store", + "wg-trigger", "wg-types", ] @@ -1129,6 +1137,7 @@ dependencies = [ "wg-policy", "wg-store", "wg-thread", + "wg-trigger", "wg-types", ] @@ -1160,6 +1169,7 @@ dependencies = [ "wg-policy", "wg-store", "wg-thread", + "wg-trigger", "wg-types", ] @@ -1232,6 +1242,7 @@ dependencies = [ "wg-paths", "wg-policy", "wg-store", + "wg-trigger", "wg-types", ] @@ -1243,8 +1254,10 @@ version = "0.1.0" name = "wg-trigger" version = "0.1.0" dependencies = [ + "chrono", "serde", "serde_yaml", + "sha2", "tempfile", "tokio", "wg-clock", diff --git a/README.md b/README.md index 46592f0..77ba118 100644 --- a/README.md +++ b/README.md @@ -40,12 +40,13 @@ The current workspace encodes that durable foundation rather than only describin - first-class thread, mission, run, trigger, checkpoint, and actor-lineage contracts in `wg-types` - evidence-bearing thread persistence in `wg-thread` - mission and run persistence in `wg-mission` and `wg-dispatch`, including mission planning/approval/validation states, milestone thread auto-creation, and run start/end timestamps -- typed graph edges in `wg-graph`, including assignment, containment, evidence, trigger, reference, and actor-lineage edges derived from agent metadata +- typed graph edges in `wg-graph`, including assignment, containment, evidence, trigger, reference, actor-lineage, and trigger-receipt edges derived from durable coordination state - orientation and CLI surfaces that expose evidence gaps, graph issues, coordination contracts, and full primitive discovery metadata +- trigger evaluation over normalized ledger, internal, and webhook event envelopes with durable `trigger_receipt` primitives for replay-safe planned follow-up actions -This turn does not implement live trigger execution loops, webhook ingress, remote MCP, or API runtime surfaces yet. It establishes the durable contracts those surfaces must honor. +This turn does not implement live trigger execution loops, webhook HTTP runtime, remote MCP, or API runtime surfaces yet. It establishes the durable contracts those surfaces must honor. -CLI creation paths now evaluate persisted policy primitives before writing. Trigger action plans remain durable planned follow-up actions rather than auto-executed effects in this foundation pass. +CLI creation paths evaluate persisted policy primitives before writing. Trigger evaluation now records replay-safe `trigger_receipt` primitives and policy-aware planned action outcomes rather than auto-executed effects. ## Product Boundary diff --git a/crates/wg-adapter-webhook/Cargo.toml b/crates/wg-adapter-webhook/Cargo.toml index 19dfa88..cce9add 100644 --- a/crates/wg-adapter-webhook/Cargo.toml +++ b/crates/wg-adapter-webhook/Cargo.toml @@ -8,3 +8,4 @@ authors.workspace = true [dependencies] wg-adapter-api = { path = "../wg-adapter-api" } +wg-types = { path = "../wg-types" } diff --git a/crates/wg-adapter-webhook/src/lib.rs b/crates/wg-adapter-webhook/src/lib.rs index 57ca92d..40707d5 100644 --- a/crates/wg-adapter-webhook/src/lib.rs +++ b/crates/wg-adapter-webhook/src/lib.rs @@ -1,8 +1,9 @@ #![forbid(unsafe_code)] -//! HTTP webhook adapter placeholder. +//! Normalized webhook event ingress helpers for the trigger plane. use wg_adapter_api::{AdapterRequest, AdapterStatus, RuntimeAdapter}; +use wg_types::{EventEnvelope, EventSourceKind}; /// Placeholder adapter for webhook-triggered runs. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -17,6 +18,13 @@ impl WebhookAdapter { pub const fn new() -> Self { Self } + + /// Normalizes a provider webhook payload into a trigger-plane event envelope. + #[must_use] + pub fn normalize_event(self, event: EventEnvelope) -> EventEnvelope { + debug_assert_eq!(event.source, EventSourceKind::Webhook); + event + } } impl RuntimeAdapter for WebhookAdapter { diff --git a/crates/wg-api/src/lib.rs b/crates/wg-api/src/lib.rs index 1dad008..939ca74 100644 --- a/crates/wg-api/src/lib.rs +++ b/crates/wg-api/src/lib.rs @@ -1,6 +1,10 @@ #![forbid(unsafe_code)] //! API surface placeholders for HTTP, gRPC, SSE, and webhook endpoints. +//! +//! Phase 3 keeps event-plane semantics in the kernel and CLI. This crate remains +//! a transport-thin placeholder until remote runtime surfaces are intentionally +//! implemented in a later phase. /// Transport exposed by the placeholder API server. #[derive(Clone, Copy, Debug, Eq, PartialEq)] diff --git a/crates/wg-cli/Cargo.toml b/crates/wg-cli/Cargo.toml index a24f350..9734531 100644 --- a/crates/wg-cli/Cargo.toml +++ b/crates/wg-cli/Cargo.toml @@ -8,6 +8,7 @@ authors.workspace = true [dependencies] anyhow.workspace = true +chrono.workspace = true clap.workspace = true serde.workspace = true serde_json.workspace = true @@ -24,6 +25,7 @@ wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } wg-registry = { path = "../wg-registry" } wg-store = { path = "../wg-store" } +wg-trigger = { path = "../wg-trigger" } wg-error = { path = "../wg-error" } wg-types = { path = "../wg-types" } diff --git a/crates/wg-cli/src/args.rs b/crates/wg-cli/src/args.rs index 3924e66..343a272 100644 --- a/crates/wg-cli/src/args.rs +++ b/crates/wg-cli/src/args.rs @@ -173,6 +173,15 @@ pub enum Command { #[command(subcommand)] command: RunCommand, }, + /// Manages trigger validation, replay, and event ingestion workflows. + #[command( + after_help = "Examples:\n workgraph trigger validate trigger/react-to-thread-complete\n workgraph trigger replay --last 20\n workgraph trigger ingest --source internal --event-name signal.sent --field subject_reference=thread/thread-1 --field actor_id=agent:cursor" + )] + Trigger { + /// Trigger-specific subcommand to execute. + #[command(subcommand)] + command: TriggerCommand, + }, } impl Command { @@ -193,6 +202,7 @@ impl Command { Self::Query { .. } => "query", Self::Show { .. } => "show", Self::Run { command } => command.name(), + Self::Trigger { command } => command.name(), } } } @@ -293,6 +303,61 @@ impl RunCommand { } } +/// Supported `workgraph trigger` subcommands. +#[derive(Debug, Subcommand)] +pub enum TriggerCommand { + /// Validates a stored trigger definition by `/` reference. + #[command( + after_help = "Examples:\n workgraph trigger validate trigger/react-to-thread-complete\n workgraph --json trigger validate trigger/react-to-thread-complete" + )] + Validate { + /// Trigger reference in `/` form. + reference: String, + }, + /// Replays recent ledger entries through the trigger plane. + #[command( + after_help = "Examples:\n workgraph trigger replay\n workgraph trigger replay --last 20\n workgraph --json trigger replay --last 5" + )] + Replay { + /// Number of most recent ledger entries to replay. + #[arg(long)] + last: Option, + }, + /// Ingests one normalized event into the trigger plane without a live runtime. + #[command( + after_help = "Examples:\n workgraph trigger ingest --source internal --event-name signal.sent --field subject_reference=thread/thread-1\n workgraph --json trigger ingest --source webhook --provider github --event-name pull_request.merged --field subject_reference=project/dealer-portal" + )] + Ingest { + /// Event source kind. + #[arg(long)] + source: String, + /// Stable event name. + #[arg(long = "event-name")] + event_name: Option, + /// Provider or emitter for webhook/internal events. + #[arg(long)] + provider: Option, + /// Explicit event id. When omitted, a deterministic id is derived from fields. + #[arg(long = "event-id")] + event_id: Option, + /// Event payload fields expressed as `key=value`. + #[arg(long = "field", value_parser = parse_key_value_input)] + fields: Vec, + }, +} + +impl TriggerCommand { + /// Returns the stable command name associated with this parsed trigger subcommand. + #[must_use] + pub const fn name(&self) -> &'static str { + match self { + Self::Validate { .. } => "trigger_validate", + Self::Replay { .. } => "trigger_replay", + Self::Ingest { .. } => "trigger_ingest", + } + } +} + /// A parsed `key=value` argument pair used by create and query commands. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KeyValueInput { diff --git a/crates/wg-cli/src/commands/brief.rs b/crates/wg-cli/src/commands/brief.rs index 9c4bb57..6c71113 100644 --- a/crates/wg-cli/src/commands/brief.rs +++ b/crates/wg-cli/src/commands/brief.rs @@ -66,6 +66,10 @@ fn build_dynamic_suggestions(type_counts: &BTreeMap) -> Vec 0 { + suggestions.push("workgraph trigger replay".to_owned()); + } + suggestions.push("workgraph status".to_owned()); suggestions } diff --git a/crates/wg-cli/src/commands/create.rs b/crates/wg-cli/src/commands/create.rs index d771e81..29027c0 100644 --- a/crates/wg-cli/src/commands/create.rs +++ b/crates/wg-cli/src/commands/create.rs @@ -3,6 +3,7 @@ use anyhow::{Context, anyhow, bail}; use tokio::fs; use wg_store::{PrimitiveFrontmatter, StoredPrimitive, read_primitive}; +use wg_trigger::{TriggerMutationService, load_trigger}; use wg_types::ActorId; use crate::app::AppContext; @@ -99,9 +100,41 @@ pub async fn handle( .default_actor_id .unwrap_or_else(|| ActorId::new("cli")); - let (path, ledger_entry) = PrimitiveMutationService::new(app, ®istry) - .create(actor, &primitive) - .await?; + let (path, ledger_entry) = if primitive_type == "trigger" { + let trigger = load_trigger_payload(&primitive)?; + TriggerMutationService::new(app.workspace()) + .save_trigger_as(&trigger, actor.clone()) + .await?; + let stored_trigger = load_trigger(app.workspace(), &trigger.id).await?; + let stored_primitive = read_primitive(app.workspace(), "trigger", &stored_trigger.id) + .await + .with_context(|| { + format!( + "failed to read stored trigger 'trigger/{}'", + stored_trigger.id + ) + })?; + primitive = stored_primitive; + let ledger_entry = app + .read_ledger_entries() + .await? + .into_iter() + .rev() + .find(|entry| entry.primitive_type == "trigger" && entry.primitive_id == trigger.id) + .ok_or_else(|| anyhow!("failed to locate ledger entry for trigger/{}", trigger.id))?; + ( + app.workspace() + .primitive_path("trigger", &trigger.id) + .as_path() + .display() + .to_string(), + ledger_entry, + ) + } else { + PrimitiveMutationService::new(app, ®istry) + .create(actor, &primitive) + .await? + }; Ok(CreateOutput { outcome: CreateOutcome::Created, @@ -141,3 +174,9 @@ fn resolve_create_inputs( }; Ok((title, fields)) } + +fn load_trigger_payload(primitive: &StoredPrimitive) -> anyhow::Result { + let trigger = wg_trigger::trigger_from_primitive(primitive) + .context("failed to decode trigger payload from create request")?; + Ok(trigger) +} diff --git a/crates/wg-cli/src/commands/mod.rs b/crates/wg-cli/src/commands/mod.rs index 9749aae..3d336a5 100644 --- a/crates/wg-cli/src/commands/mod.rs +++ b/crates/wg-cli/src/commands/mod.rs @@ -13,9 +13,10 @@ mod schema; mod show; mod status; mod thread_complete; +mod trigger; use crate::app::AppContext; -use crate::args::{Command, RunCommand}; +use crate::args::{Command, RunCommand, TriggerCommand}; use crate::output::CommandOutput; /// Executes the selected CLI command using the shared application context. @@ -111,5 +112,37 @@ pub async fn execute(app: &AppContext, command: Command) -> anyhow::Result match command { + TriggerCommand::Validate { reference } => Ok(CommandOutput::TriggerValidate( + trigger::validate(app, &reference).await?, + )), + TriggerCommand::Replay { last } => Ok(CommandOutput::TriggerReplay( + trigger::replay(app, last).await?, + )), + TriggerCommand::Ingest { + source, + event_id, + event_name, + provider, + fields, + } => Ok(CommandOutput::TriggerIngest( + trigger::ingest( + app, + trigger::TriggerIngestArgs { + source, + event_id: event_id.unwrap_or_else(|| "manual-ingest".to_owned()), + event_name, + provider, + actor_id: trigger::field_value(&fields, "actor_id"), + subject_reference: trigger::field_value(&fields, "subject_reference"), + primitive_type: trigger::field_value(&fields, "primitive_type"), + primitive_id: trigger::field_value(&fields, "primitive_id"), + op: trigger::field_value(&fields, "op"), + fields, + }, + ) + .await?, + )), + }, } } diff --git a/crates/wg-cli/src/commands/status.rs b/crates/wg-cli/src/commands/status.rs index d66fbf5..40fbdec 100644 --- a/crates/wg-cli/src/commands/status.rs +++ b/crates/wg-cli/src/commands/status.rs @@ -23,5 +23,8 @@ pub async fn handle(app: &AppContext) -> anyhow::Result { graph_issues: workspace_status.graph_issues, orphan_nodes: workspace_status.orphan_nodes, thread_evidence_gaps: workspace_status.thread_evidence_gaps, + trigger_health: workspace_status.trigger_health, + recent_trigger_receipts: workspace_status.recent_trigger_receipts, + pending_trigger_actions: workspace_status.pending_trigger_actions, }) } diff --git a/crates/wg-cli/src/commands/trigger.rs b/crates/wg-cli/src/commands/trigger.rs new file mode 100644 index 0000000..f1cdc67 --- /dev/null +++ b/crates/wg-cli/src/commands/trigger.rs @@ -0,0 +1,218 @@ +//! Implementation of the `workgraph trigger` command family. + +use std::collections::BTreeMap; + +use anyhow::{Context, bail}; +use chrono::Utc; +use wg_trigger::{ + event_from_ledger_entry, ingest_event, load_trigger, validate_trigger_definition, +}; +use wg_types::{ActorId, EventEnvelope, EventSourceKind}; + +use crate::app::AppContext; +use crate::args::KeyValueInput; +use crate::output::{ + TriggerIngestOutput, TriggerReplayOutput, TriggerReplayResult, TriggerValidateOutput, +}; +use crate::util::workspace::parse_reference; + +/// Parsed arguments for `workgraph trigger ingest`. +#[derive(Debug, Clone)] +pub struct TriggerIngestArgs { + /// Stable source kind used for evaluation. + pub source: String, + /// Stable event identifier. + pub event_id: String, + /// Optional stable event name. + pub event_name: Option, + /// Optional provider or emitter name. + pub provider: Option, + /// Optional actor identifier associated with the event. + pub actor_id: Option, + /// Optional subject reference in `/` form. + pub subject_reference: Option, + /// Optional primitive type for the event subject. + pub primitive_type: Option, + /// Optional primitive id for the event subject. + pub primitive_id: Option, + /// Optional ledger operation for ledger-style events. + pub op: Option, + /// Payload fields observed on the event. + pub fields: Vec, +} + +/// Validates a trigger definition already stored in the workspace. +/// +/// # Errors +/// +/// Returns an error when the trigger cannot be loaded or fails validation. +pub async fn validate(app: &AppContext, trigger_id: &str) -> anyhow::Result { + let (primitive_type, trigger_id) = parse_reference(trigger_id)?; + if primitive_type != "trigger" { + bail!("trigger validation expects a trigger/ reference"); + } + let trigger = load_trigger(app.workspace(), trigger_id) + .await + .with_context(|| format!("failed to load trigger '{trigger_id}'"))?; + validate_trigger_definition(&trigger) + .with_context(|| format!("failed to validate trigger '{trigger_id}'"))?; + Ok(TriggerValidateOutput { + reference: format!("trigger/{trigger_id}"), + trigger, + valid: true, + }) +} + +/// Replays recent ledger entries through the trigger plane. +/// +/// # Errors +/// +/// Returns an error when the ledger cannot be read or receipts cannot be persisted. +pub async fn replay(app: &AppContext, last: Option) -> anyhow::Result { + let mut entries = app.read_ledger_entries().await?; + if let Some(limit) = last { + if entries.len() > limit { + let start = entries.len().saturating_sub(limit); + entries = entries.split_off(start); + } + } + + let mut results = Vec::with_capacity(entries.len()); + for entry in entries { + let event = event_from_ledger_entry(&entry); + let receipts = ingest_event(app.workspace(), &event) + .await + .with_context(|| format!("failed to replay ledger event '{}'", event.id))?; + results.push(TriggerReplayResult { event, receipts }); + } + + Ok(TriggerReplayOutput { + events_replayed: results.len(), + results, + }) +} + +/// Ingests one normalized event through the trigger plane. +/// +/// # Errors +/// +/// Returns an error when the event payload is invalid or receipts cannot be persisted. +pub async fn ingest( + app: &AppContext, + args: TriggerIngestArgs, +) -> anyhow::Result { + let event = build_event(args)?; + let receipts = ingest_event(app.workspace(), &event) + .await + .with_context(|| format!("failed to ingest event '{}'", event.id))?; + Ok(TriggerIngestOutput { event, receipts }) +} + +fn build_event(args: TriggerIngestArgs) -> anyhow::Result { + let source = parse_source(&args.source)?; + let op = args.op.as_deref().map(parse_op).transpose()?; + if matches!(source, EventSourceKind::Ledger) && op.is_none() { + bail!("ledger trigger ingest requires --op "); + } + + let payload_fields = args + .fields + .iter() + .map(|field| (field.key.clone(), field.value.clone())) + .collect::>(); + let primitive_type = args.primitive_type.clone().or_else(|| { + args.subject_reference.as_deref().and_then(|reference| { + reference + .split_once('/') + .map(|(primitive_type, _)| primitive_type.to_owned()) + }) + }); + let primitive_id = args.primitive_id.clone().or_else(|| { + args.subject_reference.as_deref().and_then(|reference| { + reference + .split_once('/') + .map(|(_, primitive_id)| primitive_id.to_owned()) + }) + }); + + Ok(EventEnvelope { + id: args.event_id, + source, + event_name: args.event_name, + provider: args.provider, + actor_id: args.actor_id.map(ActorId::new), + occurred_at: Utc::now(), + op, + primitive_type, + primitive_id, + subject_reference: args.subject_reference, + field_names: payload_fields.keys().cloned().collect(), + payload_fields, + }) +} + +pub(crate) fn field_value(fields: &[KeyValueInput], key: &str) -> Option { + fields + .iter() + .find(|field| field.key == key) + .map(|field| field.value.clone()) +} + +fn parse_source(input: &str) -> anyhow::Result { + match input { + "ledger" => Ok(EventSourceKind::Ledger), + "webhook" => Ok(EventSourceKind::Webhook), + "internal" => Ok(EventSourceKind::Internal), + other => bail!("unsupported event source '{other}'"), + } +} + +fn parse_op(input: &str) -> anyhow::Result { + match input { + "create" => Ok(wg_types::LedgerOp::Create), + "update" => Ok(wg_types::LedgerOp::Update), + "delete" => Ok(wg_types::LedgerOp::Delete), + "claim" => Ok(wg_types::LedgerOp::Claim), + "release" => Ok(wg_types::LedgerOp::Release), + "start" => Ok(wg_types::LedgerOp::Start), + "done" => Ok(wg_types::LedgerOp::Done), + "cancel" => Ok(wg_types::LedgerOp::Cancel), + "reopen" => Ok(wg_types::LedgerOp::Reopen), + "assign" => Ok(wg_types::LedgerOp::Assign), + "unassign" => Ok(wg_types::LedgerOp::Unassign), + other => bail!("unsupported ledger op '{other}'"), + } +} + +#[cfg(test)] +mod tests { + use super::{TriggerIngestArgs, build_event}; + use crate::args::KeyValueInput; + + #[test] + fn build_event_infers_subject_parts() { + let event = build_event(TriggerIngestArgs { + source: "internal".to_owned(), + event_id: "event-1".to_owned(), + event_name: Some("signal.sent".to_owned()), + provider: Some("signal-bus".to_owned()), + actor_id: Some("agent:cursor".to_owned()), + subject_reference: Some("thread/thread-1".to_owned()), + primitive_type: None, + primitive_id: None, + op: None, + fields: vec![KeyValueInput { + key: "status".to_owned(), + value: "active".to_owned(), + }], + }) + .expect("event should build"); + + assert_eq!(event.primitive_type.as_deref(), Some("thread")); + assert_eq!(event.primitive_id.as_deref(), Some("thread-1")); + assert_eq!( + event.payload_fields.get("status").map(String::as_str), + Some("active") + ); + } +} diff --git a/crates/wg-cli/src/output/envelope.rs b/crates/wg-cli/src/output/envelope.rs index 9759fd5..a54201c 100644 --- a/crates/wg-cli/src/output/envelope.rs +++ b/crates/wg-cli/src/output/envelope.rs @@ -118,6 +118,11 @@ fn fix_for_clap_error(command: Option<&str>, _error: &clap::Error) -> String { Some("create") => "workgraph create --title \"\" --field key=value", Some("query") => "workgraph query <type> --filter key=value", Some("show") => "workgraph show <type>/<id>", + Some("trigger_validate") => "workgraph trigger validate <trigger-id>", + Some("trigger_replay") => "workgraph trigger replay --last <n>", + Some("trigger_ingest") => { + "workgraph trigger ingest --source <internal|webhook> --event-id <id> --event-name <name>" + } Some("brief") => "workgraph brief --lens workspace", Some("schema") => "workgraph schema <type>", Some("capabilities") => "workgraph capabilities", @@ -165,6 +170,11 @@ fn default_fix(command: Option<&str>) -> &'static str { Some("create") => "workgraph create <type> --title \"<title>\"", Some("query") => "workgraph query <type>", Some("show") => "workgraph show <type>/<id>", + Some("trigger_validate") => "workgraph trigger validate <trigger-id>", + Some("trigger_replay") => "workgraph trigger replay --last <n>", + Some("trigger_ingest") => { + "workgraph trigger ingest --source <internal|webhook> --event-id <id> --event-name <name>" + } _ => "workgraph --help", } } diff --git a/crates/wg-cli/src/output/human.rs b/crates/wg-cli/src/output/human.rs index 54503db..035e943 100644 --- a/crates/wg-cli/src/output/human.rs +++ b/crates/wg-cli/src/output/human.rs @@ -7,7 +7,8 @@ use serde_yaml::Value; use super::{ CapabilitiesOutput, CheckpointOutput, CommandOutput, CreateOutcome, CreateOutput, InitOutput, LedgerOutput, QueryOutput, RunCreateOutcome, RunCreateOutput, RunLifecycleOutput, SchemaOutput, - ShowOutput, StatusOutput, ThreadClaimOutput, ThreadCompleteOutput, + ShowOutput, StatusOutput, ThreadClaimOutput, ThreadCompleteOutput, TriggerIngestOutput, + TriggerReplayOutput, TriggerValidateOutput, }; /// Renders a structured command output to human-readable text. @@ -26,6 +27,9 @@ pub fn render(output: &CommandOutput, next_actions: &[String]) -> String { CommandOutput::Create(output) => render_create(output), CommandOutput::RunCreate(output) => render_run_create(output), CommandOutput::RunLifecycle(output) => render_run_lifecycle(output), + CommandOutput::TriggerValidate(output) => render_trigger_validate(output), + CommandOutput::TriggerReplay(output) => render_trigger_replay(output), + CommandOutput::TriggerIngest(output) => render_trigger_ingest(output), CommandOutput::Query(output) => render_query(output), CommandOutput::Show(output) => render_show(output), }; @@ -193,6 +197,44 @@ fn render_status(output: &StatusOutput) -> String { } } + let _ = writeln!(rendered, "Trigger health:"); + if output.trigger_health.is_empty() { + let _ = writeln!(rendered, "- none"); + } else { + for trigger in &output.trigger_health { + let _ = writeln!( + rendered, + "- {} [{}] last_event={} last_receipt={}", + trigger.trigger_reference, + trigger.status, + trigger.last_event_id.as_deref().unwrap_or("none"), + trigger.last_receipt_id.as_deref().unwrap_or("none") + ); + } + } + + let _ = writeln!(rendered, "Recent trigger receipts:"); + if output.recent_trigger_receipts.is_empty() { + let _ = writeln!(rendered, "- none"); + } else { + for receipt in &output.recent_trigger_receipts { + let _ = writeln!( + rendered, + "- {} trigger={} source={} pending={}", + receipt.receipt_reference, + receipt.trigger_reference, + receipt.event_source, + receipt.pending_plans + ); + } + } + + let _ = writeln!( + rendered, + "Pending trigger actions: {}", + output.pending_trigger_actions + ); + let _ = writeln!(rendered, "Graph issues:"); if output.graph_issues.is_empty() { let _ = writeln!(rendered, "- none"); @@ -329,6 +371,99 @@ fn render_run_lifecycle(output: &RunLifecycleOutput) -> String { rendered.trim_end().to_owned() } +fn render_trigger_validate(output: &TriggerValidateOutput) -> String { + let mut rendered = String::new(); + let _ = writeln!(rendered, "Validated trigger: {}", output.reference); + let _ = writeln!( + rendered, + "matches source: {}", + output.trigger.event_pattern.source.as_str() + ); + let _ = writeln!( + rendered, + "action plans: {}", + output.trigger.action_plans.len() + ); + rendered.trim_end().to_owned() +} + +fn render_trigger_replay(output: &TriggerReplayOutput) -> String { + let mut rendered = String::new(); + let emitted_receipts = output + .results + .iter() + .map(|result| result.receipts.len()) + .sum::<usize>(); + let _ = writeln!( + rendered, + "Replayed ledger events: {}", + output.events_replayed + ); + let _ = writeln!(rendered, "Receipts emitted: {}", emitted_receipts); + if output.results.is_empty() { + let _ = writeln!(rendered, "- none"); + } else { + for result in &output.results { + let _ = writeln!( + rendered, + "- event {} [{}] receipts={}", + result.event.id, + result.event.source.as_str(), + result.receipts.len() + ); + for receipt in &result.receipts { + let _ = writeln!( + rendered, + " • trigger_receipt/{} — trigger={} pending={}", + receipt.id, + receipt.trigger_id, + receipt + .action_outcomes + .iter() + .filter(|outcome| matches!( + outcome.decision, + wg_types::TriggerPlanDecision::Allow + )) + .count() + ); + } + } + } + rendered.trim_end().to_owned() +} + +fn render_trigger_ingest(output: &TriggerIngestOutput) -> String { + let mut rendered = String::new(); + let _ = writeln!( + rendered, + "Ingested trigger event: {} [{}]", + output.event.id, + output.event.source.as_str() + ); + let _ = writeln!(rendered, "Receipts emitted: {}", output.receipts.len()); + if output.receipts.is_empty() { + let _ = writeln!(rendered, "- none"); + } else { + for receipt in &output.receipts { + let _ = writeln!( + rendered, + "- trigger_receipt/{} — trigger={} pending={}", + receipt.id, + receipt.trigger_id, + receipt + .action_outcomes + .iter() + .filter(|outcome| matches!( + outcome.decision, + wg_types::TriggerPlanDecision::Allow + )) + .count() + ); + } + } + rendered.trim_end().to_owned() +} + fn render_capabilities(output: &CapabilitiesOutput) -> String { let mut rendered = String::new(); let _ = writeln!(rendered, "WorkGraph CLI capabilities"); @@ -422,6 +557,7 @@ fn render_show(output: &ShowOutput) -> String { "mission" => render_mission_sections(&mut rendered, primitive), "run" => render_run_sections(&mut rendered, primitive), "trigger" => render_trigger_sections(&mut rendered, primitive), + "trigger_receipt" => render_trigger_receipt_sections(&mut rendered, primitive), _ => render_generic_fields(&mut rendered, primitive), } @@ -537,6 +673,43 @@ fn render_trigger_sections(rendered: &mut String, primitive: &wg_store::StoredPr "action_plans", primitive.frontmatter.extra_fields.get("action_plans"), ); + render_section_list( + rendered, + "subscription_state", + primitive.frontmatter.extra_fields.get("subscription_state"), + ); +} + +fn render_trigger_receipt_sections(rendered: &mut String, primitive: &wg_store::StoredPrimitive) { + for key in [ + "trigger_id", + "trigger_title", + "event_id", + "event_source", + "event_name", + "provider", + "actor_id", + "subject_reference", + "occurred_at", + "dedup_key", + ] { + render_field(rendered, key, primitive.frontmatter.extra_fields.get(key)); + } + render_section_list( + rendered, + "field_names", + primitive.frontmatter.extra_fields.get("field_names"), + ); + render_section_list( + rendered, + "payload_fields", + primitive.frontmatter.extra_fields.get("payload_fields"), + ); + render_section_list( + rendered, + "action_outcomes", + primitive.frontmatter.extra_fields.get("action_outcomes"), + ); } fn render_generic_fields(rendered: &mut String, primitive: &wg_store::StoredPrimitive) { diff --git a/crates/wg-cli/src/output/mod.rs b/crates/wg-cli/src/output/mod.rs index def7c2f..7412a23 100644 --- a/crates/wg-cli/src/output/mod.rs +++ b/crates/wg-cli/src/output/mod.rs @@ -9,9 +9,15 @@ use std::collections::BTreeMap; use serde::Serialize; use serde_json::Value as JsonValue; use wg_dispatch::Run; -use wg_orientation::{GraphIssue, GraphOrphan, RecentActivity, ThreadEvidenceGap, WorkspaceBrief}; +use wg_orientation::{ + GraphIssue, GraphOrphan, RecentActivity, ThreadEvidenceGap, TriggerHealth, + TriggerReceiptSummary, WorkspaceBrief, +}; use wg_store::StoredPrimitive; -use wg_types::{LedgerEntry, ThreadPrimitive, WorkgraphConfig}; +use wg_types::{ + EventEnvelope, LedgerEntry, ThreadPrimitive, TriggerPrimitive, TriggerReceiptPrimitive, + WorkgraphConfig, +}; /// Stable schema version for the JSON agent contract emitted by the CLI. pub const AGENT_SCHEMA_VERSION: &str = "v1"; @@ -48,6 +54,12 @@ pub enum CommandOutput { RunCreate(RunCreateOutput), /// Result of run lifecycle transitions. RunLifecycle(RunLifecycleOutput), + /// Result of `workgraph trigger validate`. + TriggerValidate(TriggerValidateOutput), + /// Result of `workgraph trigger replay`. + TriggerReplay(TriggerReplayOutput), + /// Result of `workgraph trigger ingest`. + TriggerIngest(TriggerIngestOutput), } /// Output model produced by the `init` command. @@ -84,6 +96,12 @@ pub struct StatusOutput { pub orphan_nodes: Vec<GraphOrphan>, /// Threads that cannot yet complete because required evidence is missing. pub thread_evidence_gaps: Vec<ThreadEvidenceGap>, + /// Health and replay metadata for active triggers. + pub trigger_health: Vec<TriggerHealth>, + /// Recent durable trigger receipts. + pub recent_trigger_receipts: Vec<TriggerReceiptSummary>, + /// Count of planned trigger actions still pending execution. + pub pending_trigger_actions: usize, } /// Output model produced by the `claim` command. @@ -227,6 +245,44 @@ pub struct RunLifecycleOutput { pub run: Run, } +/// Output model produced by `workgraph trigger validate`. +#[derive(Debug, Serialize)] +pub struct TriggerValidateOutput { + /// Validated trigger reference. + pub reference: String, + /// Loaded trigger after validation. + pub trigger: TriggerPrimitive, + /// Whether validation succeeded. + pub valid: bool, +} + +/// One replay result emitted while replaying ledger events. +#[derive(Debug, Serialize)] +pub struct TriggerReplayResult { + /// The replayed event envelope. + pub event: EventEnvelope, + /// Durable receipts emitted for the replayed event. + pub receipts: Vec<TriggerReceiptPrimitive>, +} + +/// Output model produced by `workgraph trigger replay`. +#[derive(Debug, Serialize)] +pub struct TriggerReplayOutput { + /// Number of events replayed from the ledger. + pub events_replayed: usize, + /// Replay results in chronological order. + pub results: Vec<TriggerReplayResult>, +} + +/// Output model produced by `workgraph trigger ingest`. +#[derive(Debug, Serialize)] +pub struct TriggerIngestOutput { + /// The normalized ingested event. + pub event: EventEnvelope, + /// Durable receipts emitted for the ingested event. + pub receipts: Vec<TriggerReceiptPrimitive>, +} + /// Output model produced by the `query` command. #[derive(Debug, Serialize)] pub struct QueryOutput { @@ -304,6 +360,9 @@ impl CommandOutput { "Cancelled" => "run_cancel", _ => "run_lifecycle", }, + Self::TriggerValidate(_) => "trigger_validate", + Self::TriggerReplay(_) => "trigger_replay", + Self::TriggerIngest(_) => "trigger_ingest", } } @@ -328,6 +387,9 @@ impl CommandOutput { Self::Show(output) => serde_json::to_value(output), Self::RunCreate(output) => serde_json::to_value(output), Self::RunLifecycle(output) => serde_json::to_value(output), + Self::TriggerValidate(output) => serde_json::to_value(output), + Self::TriggerReplay(output) => serde_json::to_value(output), + Self::TriggerIngest(output) => serde_json::to_value(output), } .map_err(Into::into) } @@ -350,6 +412,7 @@ impl CommandOutput { Self::Status(_) => vec![ "workgraph brief".to_owned(), "workgraph query org".to_owned(), + "workgraph trigger replay --last 10".to_owned(), ], Self::Claim(output) => vec![ format!("workgraph show thread/{}", output.thread.id), @@ -376,6 +439,7 @@ impl CommandOutput { "workgraph schema".to_owned(), "workgraph brief".to_owned(), "workgraph create org --title \"<title>\"".to_owned(), + "workgraph trigger validate <trigger-id>".to_owned(), ], Self::Schema(_) => vec![ "workgraph capabilities".to_owned(), @@ -425,6 +489,21 @@ impl CommandOutput { "workgraph ledger --last 10".to_owned(), ], }, + Self::TriggerValidate(output) => vec![ + format!("workgraph show trigger/{}", output.trigger.id), + "workgraph status".to_owned(), + "workgraph trigger replay --last 10".to_owned(), + ], + Self::TriggerReplay(_) => vec![ + "workgraph status".to_owned(), + "workgraph query trigger_receipt".to_owned(), + "workgraph trigger replay --last 20".to_owned(), + ], + Self::TriggerIngest(_) => vec![ + "workgraph status".to_owned(), + "workgraph query trigger_receipt".to_owned(), + "workgraph trigger replay --last 10".to_owned(), + ], } } } diff --git a/crates/wg-cli/src/services/discovery.rs b/crates/wg-cli/src/services/discovery.rs index 01bb0b9..50f3bb7 100644 --- a/crates/wg-cli/src/services/discovery.rs +++ b/crates/wg-cli/src/services/discovery.rs @@ -225,6 +225,45 @@ pub fn capabilities_catalog() -> CapabilitiesCatalog { "workgraph run cancel cursor-pass --summary \"Superseded by newer run\"", ], ), + capability( + "trigger validate", + "Validate a trigger definition by reference against the normalized event-plane contract.", + vec!["<trigger-ref>"], + &global_flags, + vec![ + "workgraph trigger validate trigger/thread-done --json", + "workgraph trigger validate trigger/thread-done", + ], + ), + capability( + "trigger replay", + "Replay recent ledger entries through the trigger plane and persist durable trigger receipts.", + vec![], + &[global_flags[0], global_flags[1], "--last <n>"], + vec![ + "workgraph trigger replay --json", + "workgraph trigger replay --last 20", + ], + ), + capability( + "trigger ingest", + "Ingest one normalized internal or webhook event payload into the trigger plane.", + vec![], + &[ + global_flags[0], + global_flags[1], + "--source <ledger|internal|webhook>", + "--event-id <event-id>", + "--event-name <event-name>", + "--provider <provider>", + "--subject <type/id>", + "--field key=value", + ], + vec![ + "workgraph trigger ingest --source internal --event-id event-1 --event-name handoff.ready --subject thread/thread-1 --json", + "workgraph trigger ingest --source webhook --event-id gh-123 --event-name pull_request.merged --provider github --subject project/dealer-portal", + ], + ), capability( "capabilities", "List command contracts for autonomous self-discovery.", diff --git a/crates/wg-cli/src/services/mutation.rs b/crates/wg-cli/src/services/mutation.rs index 01bc834..d024619 100644 --- a/crates/wg-cli/src/services/mutation.rs +++ b/crates/wg-cli/src/services/mutation.rs @@ -4,6 +4,7 @@ use anyhow::{Context, bail}; use wg_clock::RealClock; use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::{AuditedWriteRequest, StoredPrimitive, write_primitive_audited}; +use wg_trigger::ingest_ledger_entry; use wg_types::{ActorId, LedgerEntry, LedgerOp, Registry}; use crate::app::AppContext; @@ -61,13 +62,19 @@ impl<'a> PrimitiveMutationService<'a> { .await .with_context(|| format!("failed to create {primitive_type}/{primitive_id}"))?; - self.after_mutation(primitive).await?; + self.after_mutation(primitive, &ledger_entry).await?; Ok((path.as_path().display().to_string(), ledger_entry)) } - async fn after_mutation(&self, _primitive: &StoredPrimitive) -> anyhow::Result<()> { - // Reserved for future generic trigger-aware follow-up hooks. + async fn after_mutation( + &self, + _primitive: &StoredPrimitive, + ledger_entry: &LedgerEntry, + ) -> anyhow::Result<()> { + ingest_ledger_entry(self.app.workspace(), ledger_entry) + .await + .context("failed to ingest ledger entry into trigger plane")?; Ok(()) } } diff --git a/crates/wg-cli/src/services/orientation.rs b/crates/wg-cli/src/services/orientation.rs index c039b6a..eace371 100644 --- a/crates/wg-cli/src/services/orientation.rs +++ b/crates/wg-cli/src/services/orientation.rs @@ -25,6 +25,7 @@ pub async fn build_workspace_brief(app: &AppContext) -> anyhow::Result<Workspace ), primitive_counts_section(&workspace_status), recent_ledger_section(&workspace_status), + trigger_plane_section(&workspace_status), next_actions_section(), ]; @@ -42,6 +43,16 @@ pub async fn build_workspace_brief(app: &AppContext) -> anyhow::Result<Workspace .take(10) .cloned() .collect(), + trigger_health: workspace_status.trigger_health.clone(), + trigger_receipts: workspace_status.recent_trigger_receipts.clone(), + trigger_planned_actions: wg_orientation::TriggerPlannedActionSummary { + pending_count: workspace_status.pending_trigger_actions, + suppressed_count: workspace_status + .recent_trigger_receipts + .iter() + .map(|receipt| receipt.suppressed_plans) + .sum(), + }, warnings: build_warnings(&workspace_status), }) } @@ -117,6 +128,41 @@ fn next_actions_section() -> BriefSection { ) } +fn trigger_plane_section(workspace_status: &WorkspaceStatus) -> BriefSection { + let mut items = workspace_status + .trigger_health + .iter() + .map(|trigger| BriefItem { + kind: "trigger".to_owned(), + reference: Some(trigger.trigger_reference.clone()), + title: trigger.trigger_reference.clone(), + detail: Some(format!( + "status={} last_event={} last_receipt={}", + trigger.status, + trigger.last_event_id.as_deref().unwrap_or("none"), + trigger.last_receipt_id.as_deref().unwrap_or("none") + )), + }) + .collect::<Vec<_>>(); + if let Some(receipt) = workspace_status.recent_trigger_receipts.first() { + items.push(BriefItem { + kind: "trigger_receipt".to_owned(), + reference: Some(receipt.receipt_reference.clone()), + title: receipt.trigger_reference.clone(), + detail: Some(format!( + "event={} pending={} suppressed={}", + receipt + .event_name + .clone() + .unwrap_or_else(|| receipt.event_source.clone()), + receipt.pending_plans, + receipt.suppressed_plans + )), + }); + } + section("trigger_plane", "Trigger plane", items) +} + fn build_warnings(workspace_status: &WorkspaceStatus) -> Vec<String> { let mut warnings = Vec::new(); @@ -133,6 +179,14 @@ fn build_warnings(workspace_status: &WorkspaceStatus) -> Vec<String> { issue.source_reference, issue.target_reference, issue.kind, issue.provenance )); } + for trigger in workspace_status.trigger_health.iter().take(5) { + if trigger.status == "draft" || trigger.status == "paused" || trigger.status == "disabled" { + warnings.push(format!( + "Trigger status: {} is {}", + trigger.trigger_reference, trigger.status + )); + } + } warnings } diff --git a/crates/wg-connector-api/Cargo.toml b/crates/wg-connector-api/Cargo.toml index e788f5c..c7b13db 100644 --- a/crates/wg-connector-api/Cargo.toml +++ b/crates/wg-connector-api/Cargo.toml @@ -7,3 +7,5 @@ license.workspace = true authors.workspace = true [dependencies] +chrono.workspace = true +wg-types = { path = "../wg-types" } diff --git a/crates/wg-connector-api/src/lib.rs b/crates/wg-connector-api/src/lib.rs index 0e16877..5bda91e 100644 --- a/crates/wg-connector-api/src/lib.rs +++ b/crates/wg-connector-api/src/lib.rs @@ -2,13 +2,55 @@ //! Contracts for external event sources and reconciliation flows. -/// Minimal event envelope shared by connector placeholders. +use chrono::{DateTime, Utc}; +use std::collections::BTreeMap; +use wg_types::{ActorId, EventEnvelope, EventSourceKind}; + +/// Normalized externally-originated event used by connector placeholders. #[derive(Clone, Debug, Eq, PartialEq)] -pub struct ExternalEvent<'a> { +pub struct ExternalEvent { + /// Stable event identifier used for replay-safe deduplication. + pub id: String, /// Stable source identifier, such as a provider name. - pub source: &'a str, + pub source: String, + /// Stable event name emitted by the provider. + pub event_name: String, /// Resource or subject associated with the event. - pub subject: &'a str, + pub subject: String, + /// Optional acting actor associated with the event. + pub actor_id: Option<ActorId>, + /// Time the external event occurred. + pub occurred_at: DateTime<Utc>, + /// Normalized payload values retained for trigger matching. + pub payload_fields: BTreeMap<String, String>, +} + +impl ExternalEvent { + /// Converts the external event into the normalized WorkGraph trigger envelope. + #[must_use] + pub fn into_event_envelope(self) -> EventEnvelope { + let subject_reference = self.subject.contains('/').then_some(self.subject.clone()); + EventEnvelope { + id: self.id, + source: EventSourceKind::Webhook, + event_name: Some(self.event_name), + provider: Some(self.source), + actor_id: self.actor_id, + occurred_at: self.occurred_at, + op: None, + primitive_type: subject_reference.as_deref().and_then(|reference| { + reference + .split_once('/') + .map(|(primitive_type, _)| primitive_type.to_owned()) + }), + primitive_id: subject_reference + .as_deref() + .and_then(|reference| reference.split_once('/').map(|(_, id)| id.to_owned())), + subject_reference, + field_names: self.payload_fields.keys().cloned().collect(), + payload_fields: self.payload_fields, + } + } } /// Polling result returned by a placeholder event source. @@ -44,5 +86,5 @@ pub trait Reconciler { fn reconciler_kind(&self) -> &'static str; /// Applies connector-specific reconciliation logic. - fn reconcile(&self, event: ExternalEvent<'_>) -> ReconcileStatus; + fn reconcile(&self, event: ExternalEvent) -> ReconcileStatus; } diff --git a/crates/wg-connector-github/src/lib.rs b/crates/wg-connector-github/src/lib.rs index 38bb4bf..764d635 100644 --- a/crates/wg-connector-github/src/lib.rs +++ b/crates/wg-connector-github/src/lib.rs @@ -1,10 +1,10 @@ #![forbid(unsafe_code)] -//! GitHub connector placeholder for webhooks and reconciliation. +//! GitHub connector placeholder for trigger-plane event normalization. use wg_connector_api::{EventSource, ExternalEvent, PollStatus, ReconcileStatus, Reconciler}; -/// Placeholder connector for GitHub-originated events. +/// Placeholder connector for GitHub-originated trigger events. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct GithubConnector; @@ -34,7 +34,10 @@ impl Reconciler for GithubConnector { Self::KIND } - fn reconcile(&self, _event: ExternalEvent<'_>) -> ReconcileStatus { - ReconcileStatus::Skipped + fn reconcile(&self, event: ExternalEvent) -> ReconcileStatus { + match event.event_name.as_str() { + "pull_request" | "push" => ReconcileStatus::Applied, + _ => ReconcileStatus::Skipped, + } } } diff --git a/crates/wg-dispatch/Cargo.toml b/crates/wg-dispatch/Cargo.toml index a8c2c11..95e8ac0 100644 --- a/crates/wg-dispatch/Cargo.toml +++ b/crates/wg-dispatch/Cargo.toml @@ -14,6 +14,7 @@ wg-error = { path = "../wg-error" } wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } wg-store = { path = "../wg-store" } +wg-trigger = { path = "../wg-trigger" } wg-types = { path = "../wg-types" } [dev-dependencies] diff --git a/crates/wg-dispatch/src/lib.rs b/crates/wg-dispatch/src/lib.rs index f9209b5..a466a19 100644 --- a/crates/wg-dispatch/src/lib.rs +++ b/crates/wg-dispatch/src/lib.rs @@ -242,10 +242,11 @@ pub(crate) async fn save_run_with_audit( workspace: &WorkspacePath, run: &Run, audit: AuditedWriteRequest, -) -> Result<()> { +) -> Result<wg_types::LedgerEntry> { let primitive = run_to_primitive(run)?; - write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; - Ok(()) + let (_, ledger_entry) = + write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; + Ok(ledger_entry) } fn run_to_primitive(run: &Run) -> Result<StoredPrimitive> { diff --git a/crates/wg-dispatch/src/mutation.rs b/crates/wg-dispatch/src/mutation.rs index 6f6ef11..083c469 100644 --- a/crates/wg-dispatch/src/mutation.rs +++ b/crates/wg-dispatch/src/mutation.rs @@ -3,6 +3,7 @@ use wg_error::{Result, WorkgraphError}; use wg_paths::WorkspacePath; use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::AuditedWriteRequest; +use wg_trigger::ingest_ledger_entry; use wg_types::{LedgerOp, RunStatus}; use crate::{DispatchRequest, RUN_TYPE, Run, load_run, save_run_with_audit}; @@ -207,8 +208,8 @@ impl<'a> RunMutationService<'a> { async fn persist(self, run: &Run, audit: AuditedWriteRequest) -> Result<()> { self.authorize(run.id.as_str(), &audit).await?; - save_run_with_audit(self.workspace, run, audit.clone()).await?; - self.after_mutation(run, &audit).await + let ledger_entry = save_run_with_audit(self.workspace, run, audit.clone()).await?; + self.after_mutation(run, &audit, &ledger_entry).await } async fn authorize(self, run_id: &str, audit: &AuditedWriteRequest) -> Result<()> { @@ -231,8 +232,13 @@ impl<'a> RunMutationService<'a> { Ok(()) } - async fn after_mutation(self, _run: &Run, _audit: &AuditedWriteRequest) -> Result<()> { - // Reserved for future trigger-aware follow-up hooks. + async fn after_mutation( + self, + _run: &Run, + _audit: &AuditedWriteRequest, + ledger_entry: &wg_types::LedgerEntry, + ) -> Result<()> { + ingest_ledger_entry(self.workspace, ledger_entry).await?; Ok(()) } } diff --git a/crates/wg-graph/src/build.rs b/crates/wg-graph/src/build.rs index a536f53..c33cf2b 100644 --- a/crates/wg-graph/src/build.rs +++ b/crates/wg-graph/src/build.rs @@ -10,7 +10,8 @@ use wg_fs::list_md_files; use wg_paths::WorkspacePath; use wg_store::{PrimitiveFrontmatter, StoredPrimitive, list_primitives}; use wg_types::{ - EventPattern, EvidenceItem, GraphEdgeKind, GraphEdgeSource, Registry, TriggerActionPlan, + EventPattern, EvidenceItem, GraphEdgeKind, GraphEdgeSource, Registry, TriggerActionOutcome, + TriggerActionPlan, }; use crate::model::{BrokenLink, Edge, GraphSnapshot, NodeRef}; @@ -140,6 +141,9 @@ fn emit_structured_edges( "mission" => emit_mission_edges(source, primitive, nodes, id_index, edges, broken_links), "run" => emit_run_edges(source, primitive, nodes, id_index, edges, broken_links), "trigger" => emit_trigger_edges(source, primitive, nodes, id_index, edges, broken_links), + "trigger_receipt" => { + emit_trigger_receipt_edges(source, primitive, nodes, id_index, edges, broken_links) + } _ => {} } } @@ -508,6 +512,76 @@ fn emit_trigger_edges( } } +fn emit_trigger_receipt_edges( + source: &NodeRef, + primitive: &StoredPrimitive, + nodes: &BTreeSet<NodeRef>, + id_index: &BTreeMap<String, Vec<NodeRef>>, + edges: &mut BTreeSet<Edge>, + broken_links: &mut BTreeSet<BrokenLink>, +) { + if let Some(trigger_id) = primitive + .frontmatter + .extra_fields + .get("trigger_id") + .and_then(string_value) + { + resolve_and_record_edge( + source, + &NodeRef::new("trigger", trigger_id), + source.id.as_str(), + Some("trigger_receipt"), + GraphEdgeKind::Trigger, + GraphEdgeSource::Field, + nodes, + id_index, + edges, + broken_links, + ); + } + if let Some(subject_reference) = primitive + .frontmatter + .extra_fields + .get("subject_reference") + .and_then(string_value) + { + resolve_and_record_edge( + source, + source, + subject_reference, + None, + GraphEdgeKind::Trigger, + GraphEdgeSource::Field, + nodes, + id_index, + edges, + broken_links, + ); + } + if let Some(value) = primitive.frontmatter.extra_fields.get("action_outcomes") { + if let Ok(action_outcomes) = + serde_yaml::from_value::<Vec<TriggerActionOutcome>>(value.clone()) + { + for action_outcome in action_outcomes { + if let Some(target_reference) = action_outcome.plan.target_reference { + resolve_and_record_edge( + source, + source, + &target_reference, + None, + GraphEdgeKind::Trigger, + GraphEdgeSource::TriggerRule, + nodes, + id_index, + edges, + broken_links, + ); + } + } + } + } +} + #[allow(clippy::too_many_arguments)] fn resolve_and_record_edge( declared_by: &NodeRef, diff --git a/crates/wg-mission/Cargo.toml b/crates/wg-mission/Cargo.toml index b1c904f..83ecf46 100644 --- a/crates/wg-mission/Cargo.toml +++ b/crates/wg-mission/Cargo.toml @@ -15,6 +15,7 @@ wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } wg-store = { path = "../wg-store" } wg-thread = { path = "../wg-thread" } +wg-trigger = { path = "../wg-trigger" } wg-types = { path = "../wg-types" } [dev-dependencies] diff --git a/crates/wg-mission/src/lib.rs b/crates/wg-mission/src/lib.rs index 5f8ff3b..f792175 100644 --- a/crates/wg-mission/src/lib.rs +++ b/crates/wg-mission/src/lib.rs @@ -239,10 +239,11 @@ pub(crate) async fn save_mission_with_audit( workspace: &WorkspacePath, mission: &Mission, audit: AuditedWriteRequest, -) -> Result<()> { +) -> Result<wg_types::LedgerEntry> { let primitive = mission_to_primitive(mission)?; - write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; - Ok(()) + let (_, ledger_entry) = + write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; + Ok(ledger_entry) } pub(crate) fn system_actor() -> ActorId { diff --git a/crates/wg-mission/src/mutation.rs b/crates/wg-mission/src/mutation.rs index a4e6d5d..968f38b 100644 --- a/crates/wg-mission/src/mutation.rs +++ b/crates/wg-mission/src/mutation.rs @@ -5,6 +5,7 @@ use wg_error::{Result, WorkgraphError}; use wg_paths::WorkspacePath; use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::AuditedWriteRequest; +use wg_trigger::ingest_ledger_entry; use wg_types::{LedgerOp, MissionMilestone, MissionStatus, ThreadStatus}; use crate::{ @@ -397,8 +398,8 @@ impl<'a> MissionMutationService<'a> { async fn persist(self, mission: &Mission, audit: AuditedWriteRequest) -> Result<()> { self.authorize(mission.id.as_str(), &audit).await?; - save_mission_with_audit(self.workspace, mission, audit.clone()).await?; - self.after_mutation(mission, &audit).await + let ledger_entry = save_mission_with_audit(self.workspace, mission, audit.clone()).await?; + self.after_mutation(mission, &audit, &ledger_entry).await } async fn authorize(self, mission_id: &str, audit: &AuditedWriteRequest) -> Result<()> { @@ -421,8 +422,13 @@ impl<'a> MissionMutationService<'a> { Ok(()) } - async fn after_mutation(self, _mission: &Mission, _audit: &AuditedWriteRequest) -> Result<()> { - // Reserved for future trigger-aware follow-up hooks. + async fn after_mutation( + self, + _mission: &Mission, + _audit: &AuditedWriteRequest, + ledger_entry: &wg_types::LedgerEntry, + ) -> Result<()> { + ingest_ledger_entry(self.workspace, ledger_entry).await?; Ok(()) } diff --git a/crates/wg-orientation/Cargo.toml b/crates/wg-orientation/Cargo.toml index 8df2668..f465694 100644 --- a/crates/wg-orientation/Cargo.toml +++ b/crates/wg-orientation/Cargo.toml @@ -19,6 +19,7 @@ wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } wg-store = { path = "../wg-store" } wg-thread = { path = "../wg-thread" } +wg-trigger = { path = "../wg-trigger" } wg-types = { path = "../wg-types" } [dev-dependencies] diff --git a/crates/wg-orientation/src/lib.rs b/crates/wg-orientation/src/lib.rs index 7f746e4..7544586 100644 --- a/crates/wg-orientation/src/lib.rs +++ b/crates/wg-orientation/src/lib.rs @@ -135,6 +135,59 @@ pub struct ThreadEvidenceGap { pub missing_criteria: Vec<String>, } +/// Health and replay summary for one trigger subscription. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerHealth { + /// Trigger reference in `type/id` form. + pub trigger_reference: String, + /// Trigger lifecycle status. + pub status: String, + /// Most recent evaluated event id, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_event_id: Option<String>, + /// Most recent receipt id, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_receipt_id: Option<String>, + /// Most recent event timestamp, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_evaluated_at: Option<String>, + /// Most recent match timestamp, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_matched_at: Option<String>, +} + +/// Summary of durable planned actions emitted by trigger receipts. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerPlannedActionSummary { + /// Number of pending/allowed plans across receipts. + pub pending_count: usize, + /// Number of policy-suppressed plans across receipts. + pub suppressed_count: usize, +} + +/// Recent durable trigger receipt surfaced for orientation and status output. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerReceiptSummary { + /// Receipt reference in `type/id` form. + pub receipt_reference: String, + /// Trigger reference in `type/id` form. + pub trigger_reference: String, + /// Event source for the matched event. + pub event_source: String, + /// Event name when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_name: Option<String>, + /// Subject reference when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub subject_reference: Option<String>, + /// Receipt timestamp. + pub occurred_at: String, + /// Count of allowed/pending plans in this receipt. + pub pending_plans: usize, + /// Count of suppressed plans in this receipt. + pub suppressed_plans: usize, +} + /// A structured, reusable workspace brief suitable for humans, agents, and future MCP resources. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct WorkspaceBrief { @@ -158,6 +211,14 @@ pub struct WorkspaceBrief { /// Recent immutable ledger activity. #[serde(default)] pub recent_activity: Vec<RecentActivity>, + /// Trigger subscription health summaries. + #[serde(default)] + pub trigger_health: Vec<TriggerHealth>, + /// Recent durable trigger receipts. + #[serde(default)] + pub trigger_receipts: Vec<TriggerReceiptSummary>, + /// Aggregate trigger-planned action summary. + pub trigger_planned_actions: TriggerPlannedActionSummary, /// Warnings or gaps an entering agent should notice immediately. #[serde(default)] pub warnings: Vec<String>, @@ -197,7 +258,10 @@ impl Briefing { mod tests { use std::collections::BTreeMap; - use super::{BriefItem, BriefSection, ContextLens, RecentActivity, WorkspaceBrief}; + use super::{ + BriefItem, BriefSection, ContextLens, RecentActivity, TriggerHealth, + TriggerPlannedActionSummary, TriggerReceiptSummary, WorkspaceBrief, + }; #[test] fn context_lens_parses_supported_values() { @@ -238,6 +302,28 @@ mod tests { op: "create".to_owned(), reference: "org/versatly".to_owned(), }], + trigger_health: vec![TriggerHealth { + trigger_reference: "trigger/demo".to_owned(), + status: "active".to_owned(), + last_event_id: Some("event-1".to_owned()), + last_receipt_id: Some("trigger_receipt/demo".to_owned()), + last_evaluated_at: Some("2026-03-15T16:37:24Z".to_owned()), + last_matched_at: Some("2026-03-15T16:37:24Z".to_owned()), + }], + trigger_receipts: vec![TriggerReceiptSummary { + receipt_reference: "trigger_receipt/demo".to_owned(), + trigger_reference: "trigger/demo".to_owned(), + event_source: "ledger".to_owned(), + event_name: Some("thread.done".to_owned()), + subject_reference: Some("thread/thread-1".to_owned()), + occurred_at: "2026-03-15T16:37:24Z".to_owned(), + pending_plans: 1, + suppressed_plans: 0, + }], + trigger_planned_actions: TriggerPlannedActionSummary { + pending_count: 1, + suppressed_count: 0, + }, warnings: vec!["No policies recorded yet".to_owned()], }; diff --git a/crates/wg-orientation/src/mutation.rs b/crates/wg-orientation/src/mutation.rs index 5677557..66f1d90 100644 --- a/crates/wg-orientation/src/mutation.rs +++ b/crates/wg-orientation/src/mutation.rs @@ -9,6 +9,7 @@ use wg_store::{ AuditedWriteRequest, PrimitiveFrontmatter, StoredPrimitive, read_primitive, write_primitive_audited_now, }; +use wg_trigger::ingest_ledger_entry; use wg_types::{ActorId, FieldDefinition, LedgerOp, PrimitiveType, Registry}; /// Domain mutation service for checkpoint persistence. @@ -73,14 +74,15 @@ impl<'a> CheckpointMutationService<'a> { .with_note(format!("Saved checkpoint '{}'", id)); self.authorize(&id, &audit).await?; - write_primitive_audited_now( + let (_, ledger_entry) = write_primitive_audited_now( self.workspace, &checkpoint_registry(), &primitive, audit.clone(), ) .await?; - self.after_mutation(&primitive, &audit).await?; + self.after_mutation(&primitive, &audit, &ledger_entry) + .await?; read_primitive(self.workspace, "checkpoint", &id).await } @@ -106,8 +108,9 @@ impl<'a> CheckpointMutationService<'a> { self, _primitive: &StoredPrimitive, _audit: &AuditedWriteRequest, + ledger_entry: &wg_types::LedgerEntry, ) -> Result<()> { - // Reserved for future trigger-aware follow-up hooks. + ingest_ledger_entry(self.workspace, ledger_entry).await?; Ok(()) } } diff --git a/crates/wg-orientation/src/runtime.rs b/crates/wg-orientation/src/runtime.rs index d2fa4ad..d058351 100644 --- a/crates/wg-orientation/src/runtime.rs +++ b/crates/wg-orientation/src/runtime.rs @@ -9,7 +9,7 @@ use wg_types::ActorId; use crate::{ BriefItem, CheckpointMutationService, GraphIssue, GraphOrphan, RecentActivity, - ThreadEvidenceGap, brief_runtime, status_runtime, + ThreadEvidenceGap, TriggerHealth, TriggerReceiptSummary, brief_runtime, status_runtime, }; /// Workspace orientation summary derived from real persisted data. @@ -25,6 +25,12 @@ pub struct WorkspaceStatus { pub orphan_nodes: Vec<GraphOrphan>, /// Threads with unsatisfied required exit criteria. pub thread_evidence_gaps: Vec<ThreadEvidenceGap>, + /// Trigger health summaries recorded for active and draft trigger subscriptions. + pub trigger_health: Vec<TriggerHealth>, + /// Recent durable trigger receipts emitted by the trigger plane. + pub recent_trigger_receipts: Vec<TriggerReceiptSummary>, + /// Count of planned trigger actions that remain pending policy allow. + pub pending_trigger_actions: usize, } /// Actor-specific orientation brief based on assignment and recent activity. @@ -317,6 +323,9 @@ mod tests { primitive_id: None, field_names: vec!["evidence".to_owned()], provider: None, + actor_id: None, + subject_reference: None, + payload_fields: BTreeMap::new(), }) .expect("event pattern should serialize"), ), diff --git a/crates/wg-orientation/src/runtime_support.rs b/crates/wg-orientation/src/runtime_support.rs index 48477de..e203d20 100644 --- a/crates/wg-orientation/src/runtime_support.rs +++ b/crates/wg-orientation/src/runtime_support.rs @@ -4,9 +4,13 @@ use wg_ledger::{LedgerCursor, LedgerReader}; use wg_mission::Mission; use wg_paths::WorkspacePath; use wg_thread::Thread; -use wg_types::{GraphEdgeKind, GraphEdgeSource, LedgerEntry}; +use wg_trigger::{Trigger, TriggerReceipt}; +use wg_types::{GraphEdgeKind, GraphEdgeSource, LedgerEntry, TriggerPlanDecision}; -use crate::{GraphOrphan, RecentActivity, ThreadEvidenceGap}; +use crate::{ + GraphOrphan, RecentActivity, ThreadEvidenceGap, TriggerHealth, TriggerPlannedActionSummary, + TriggerReceiptSummary, +}; pub(crate) async fn load_recent_activity( workspace: &WorkspacePath, @@ -55,6 +59,97 @@ pub(crate) async fn load_runs(workspace: &WorkspacePath) -> Result<Vec<Run>> { wg_dispatch::list_runs(workspace).await } +pub(crate) async fn load_triggers(workspace: &WorkspacePath) -> Result<Vec<Trigger>> { + wg_trigger::list_triggers(workspace).await +} + +pub(crate) async fn load_trigger_receipts( + workspace: &WorkspacePath, +) -> Result<Vec<TriggerReceipt>> { + wg_trigger::list_trigger_receipts(workspace).await +} + +pub(crate) async fn load_trigger_health(workspace: &WorkspacePath) -> Result<Vec<TriggerHealth>> { + Ok(load_triggers(workspace) + .await? + .into_iter() + .map(|trigger| TriggerHealth { + trigger_reference: format!("trigger/{}", trigger.id), + status: trigger.status.as_str().to_owned(), + last_evaluated_at: trigger.subscription_state.as_ref().and_then(|state| { + state + .last_evaluated_at + .map(|timestamp| timestamp.to_rfc3339()) + }), + last_matched_at: trigger.subscription_state.as_ref().and_then(|state| { + state + .last_matched_at + .map(|timestamp| timestamp.to_rfc3339()) + }), + last_event_id: trigger + .subscription_state + .as_ref() + .and_then(|state| state.last_event_id.clone()), + last_receipt_id: trigger + .subscription_state + .as_ref() + .and_then(|state| state.last_receipt_id.clone()), + }) + .collect()) +} + +pub(crate) async fn load_recent_trigger_receipts( + workspace: &WorkspacePath, + limit: usize, +) -> Result<Vec<TriggerReceiptSummary>> { + Ok(load_trigger_receipts(workspace) + .await? + .into_iter() + .rev() + .take(limit) + .map(|receipt| { + let pending_plans = receipt + .action_outcomes + .iter() + .filter(|outcome| outcome.decision == TriggerPlanDecision::Allow) + .count(); + let suppressed_plans = receipt + .action_outcomes + .iter() + .filter(|outcome| outcome.decision == TriggerPlanDecision::Deny) + .count(); + TriggerReceiptSummary { + receipt_reference: format!("trigger_receipt/{}", receipt.id), + trigger_reference: format!("trigger/{}", receipt.trigger_id), + event_source: receipt.event_source.as_str().to_owned(), + event_name: receipt.event_name, + subject_reference: receipt.subject_reference, + occurred_at: receipt.occurred_at.to_rfc3339(), + pending_plans, + suppressed_plans, + } + }) + .collect()) +} + +pub(crate) async fn pending_trigger_actions( + workspace: &WorkspacePath, +) -> Result<TriggerPlannedActionSummary> { + let mut summary = TriggerPlannedActionSummary { + pending_count: 0, + suppressed_count: 0, + }; + for receipt in load_trigger_receipts(workspace).await? { + for outcome in receipt.action_outcomes { + match outcome.decision { + TriggerPlanDecision::Allow => summary.pending_count += 1, + TriggerPlanDecision::Deny => summary.suppressed_count += 1, + } + } + } + Ok(summary) +} + pub(crate) fn entry_to_recent_activity(entry: LedgerEntry) -> RecentActivity { RecentActivity { ts: entry.ts.to_rfc3339(), diff --git a/crates/wg-orientation/src/status_runtime.rs b/crates/wg-orientation/src/status_runtime.rs index 2b13189..7a84d70 100644 --- a/crates/wg-orientation/src/status_runtime.rs +++ b/crates/wg-orientation/src/status_runtime.rs @@ -7,8 +7,8 @@ use wg_paths::WorkspacePath; use crate::{GraphIssue, WorkspaceStatus}; use super::runtime_support::{ - edge_kind_label, edge_source_label, load_recent_activity, load_thread_evidence_gaps, - orphan_nodes, + edge_kind_label, edge_source_label, load_recent_activity, load_recent_trigger_receipts, + load_thread_evidence_gaps, load_trigger_health, orphan_nodes, pending_trigger_actions, }; /// Builds a workspace status summary from persisted primitives and ledger entries. @@ -43,5 +43,8 @@ pub async fn status(workspace: &WorkspacePath) -> Result<WorkspaceStatus> { graph_issues, orphan_nodes, thread_evidence_gaps: load_thread_evidence_gaps(workspace).await?, + trigger_health: load_trigger_health(workspace).await?, + recent_trigger_receipts: load_recent_trigger_receipts(workspace, 10).await?, + pending_trigger_actions: pending_trigger_actions(workspace).await?.pending_count, }) } diff --git a/crates/wg-registry/src/lib.rs b/crates/wg-registry/src/lib.rs index 52f95b8..62b658d 100644 --- a/crates/wg-registry/src/lib.rs +++ b/crates/wg-registry/src/lib.rs @@ -98,7 +98,7 @@ mod tests { let registry = RuntimeRegistry::with_builtins(); let listed = registry.list_types(); - assert_eq!(listed.len(), 17); + assert_eq!(listed.len(), 18); assert_eq!( registry .get_type("person") @@ -107,6 +107,7 @@ mod tests { "people" ); assert!(registry.get_type("run").is_some()); + assert!(registry.get_type("trigger_receipt").is_some()); assert!(registry.get_type("checkpoint").is_some()); assert!(listed.windows(2).all(|pair| pair[0].name <= pair[1].name)); } diff --git a/crates/wg-thread/Cargo.toml b/crates/wg-thread/Cargo.toml index 685a70f..a708734 100644 --- a/crates/wg-thread/Cargo.toml +++ b/crates/wg-thread/Cargo.toml @@ -14,6 +14,7 @@ wg-error = { path = "../wg-error" } wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } wg-store = { path = "../wg-store" } +wg-trigger = { path = "../wg-trigger" } wg-types = { path = "../wg-types" } [dev-dependencies] diff --git a/crates/wg-thread/src/lib.rs b/crates/wg-thread/src/lib.rs index 7cd2f80..b3a16f3 100644 --- a/crates/wg-thread/src/lib.rs +++ b/crates/wg-thread/src/lib.rs @@ -243,16 +243,16 @@ async fn save_thread_with_audit( workspace: &WorkspacePath, thread: &Thread, audit: AuditedWriteRequest, -) -> Result<()> { +) -> Result<wg_types::LedgerEntry> { let primitive = thread_to_primitive(thread)?; - wg_store::write_primitive_audited_now( + let (_, ledger_entry) = wg_store::write_primitive_audited_now( workspace, &wg_types::Registry::builtins(), &primitive, audit, ) .await?; - Ok(()) + Ok(ledger_entry) } #[cfg(test)] diff --git a/crates/wg-thread/src/mutation_support.rs b/crates/wg-thread/src/mutation_support.rs index dae5f89..d9c72f9 100644 --- a/crates/wg-thread/src/mutation_support.rs +++ b/crates/wg-thread/src/mutation_support.rs @@ -2,6 +2,7 @@ use wg_error::{Result, WorkgraphError}; use wg_paths::WorkspacePath; use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::AuditedWriteRequest; +use wg_trigger::ingest_ledger_entry; use wg_types::LedgerOp; use crate::{THREAD_TYPE, Thread, save_thread_with_audit}; @@ -12,8 +13,8 @@ pub(super) async fn persist_thread( audit: AuditedWriteRequest, ) -> Result<()> { authorize_thread_mutation(workspace, thread.id.as_str(), &audit).await?; - save_thread_with_audit(workspace, thread, audit.clone()).await?; - after_thread_mutation(thread, &audit).await + let ledger_entry = save_thread_with_audit(workspace, thread, audit.clone()).await?; + after_thread_mutation(workspace, thread, &audit, &ledger_entry).await } async fn authorize_thread_mutation( @@ -40,8 +41,13 @@ async fn authorize_thread_mutation( Ok(()) } -async fn after_thread_mutation(_thread: &Thread, _audit: &AuditedWriteRequest) -> Result<()> { - // Reserved for future trigger-aware follow-up hooks. +async fn after_thread_mutation( + workspace: &WorkspacePath, + _thread: &Thread, + _audit: &AuditedWriteRequest, + ledger_entry: &wg_types::LedgerEntry, +) -> Result<()> { + ingest_ledger_entry(workspace, ledger_entry).await?; Ok(()) } diff --git a/crates/wg-trigger/Cargo.toml b/crates/wg-trigger/Cargo.toml index 01da233..b4e4837 100644 --- a/crates/wg-trigger/Cargo.toml +++ b/crates/wg-trigger/Cargo.toml @@ -7,8 +7,10 @@ license.workspace = true authors.workspace = true [dependencies] +chrono.workspace = true serde.workspace = true serde_yaml.workspace = true +sha2.workspace = true wg-error = { path = "../wg-error" } wg-paths = { path = "../wg-paths" } wg-policy = { path = "../wg-policy" } diff --git a/crates/wg-trigger/src/lib.rs b/crates/wg-trigger/src/lib.rs index 0d4b11e..4b50781 100644 --- a/crates/wg-trigger/src/lib.rs +++ b/crates/wg-trigger/src/lib.rs @@ -1,26 +1,36 @@ #![forbid(unsafe_code)] #![deny(missing_docs)] -//! Durable trigger parsing, validation, and ledger-event matching for WorkGraph. +//! Durable trigger parsing, validation, and event-plane matching for WorkGraph. +use std::collections::BTreeMap; + +use chrono::{DateTime, Utc}; use serde_yaml::Value; +use sha2::{Digest, Sha256}; use wg_error::{Result, WorkgraphError}; use wg_paths::WorkspacePath; +use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::{ AuditedWriteRequest, PrimitiveFrontmatter, StoredPrimitive, list_primitives, read_primitive, write_primitive_audited_now, }; use wg_types::{ - EventPattern, EventSourceKind, LedgerEntry, Registry, TriggerActionPlan, TriggerPrimitive, + ActorId, EventEnvelope, EventPattern, EventSourceKind, LedgerEntry, Registry, + TriggerActionOutcome, TriggerPlanDecision, TriggerPrimitive, TriggerReceiptPrimitive, TriggerStatus, }; mod mutation; const TRIGGER_TYPE: &str = "trigger"; +const TRIGGER_RECEIPT_TYPE: &str = "trigger_receipt"; +const TRIGGER_ACTION_PLAN_TYPE: &str = "trigger_action_plan"; /// Typed trigger model persisted by this crate. pub type Trigger = TriggerPrimitive; +/// Typed trigger receipt model persisted by this crate. +pub type TriggerReceipt = TriggerReceiptPrimitive; pub use mutation::TriggerMutationService; @@ -31,8 +41,12 @@ pub struct MatchedTrigger { pub trigger_id: String, /// Matched trigger title. pub title: String, - /// Action plans emitted by the trigger. - pub action_plans: Vec<TriggerActionPlan>, + /// Trigger event that matched the rule. + pub event: EventEnvelope, + /// Replay-safe trigger/event deduplication key. + pub dedup_key: String, + /// Action outcomes emitted by the trigger. + pub action_outcomes: Vec<TriggerActionOutcome>, } /// Persists a trigger after validating its contract. @@ -69,6 +83,58 @@ pub async fn list_triggers(workspace: &WorkspacePath) -> Result<Vec<Trigger>> { .collect() } +/// Loads a persisted trigger receipt by identifier. +/// +/// # Errors +/// +/// Returns an error when the trigger receipt cannot be loaded or decoded. +pub async fn load_trigger_receipt( + workspace: &WorkspacePath, + receipt_id: &str, +) -> Result<TriggerReceipt> { + let primitive = read_primitive(workspace, TRIGGER_RECEIPT_TYPE, receipt_id).await?; + trigger_receipt_from_primitive(&primitive) +} + +/// Lists all persisted trigger receipts. +/// +/// # Errors +/// +/// Returns an error when trigger receipt primitives cannot be loaded or decoded. +pub async fn list_trigger_receipts(workspace: &WorkspacePath) -> Result<Vec<TriggerReceipt>> { + list_primitives(workspace, TRIGGER_RECEIPT_TYPE) + .await? + .iter() + .map(trigger_receipt_from_primitive) + .collect() +} + +/// Ingests one normalized event, evaluates matching triggers, and persists durable receipts. +/// +/// # Errors +/// +/// Returns an error when evaluation or receipt persistence fails. +pub async fn ingest_event( + workspace: &WorkspacePath, + event: &EventEnvelope, +) -> Result<Vec<TriggerReceipt>> { + TriggerMutationService::new(workspace) + .ingest_event(event) + .await +} + +/// Ingests one ledger entry into the trigger plane, persisting any resulting receipts. +/// +/// # Errors +/// +/// Returns an error when evaluation or receipt persistence fails. +pub async fn ingest_ledger_entry( + workspace: &WorkspacePath, + entry: &LedgerEntry, +) -> Result<Vec<TriggerReceipt>> { + ingest_event(workspace, &event_from_ledger_entry(entry)).await +} + /// Validates a trigger definition without persisting it. /// /// # Errors @@ -94,6 +160,64 @@ pub fn validate_trigger_definition(trigger: &Trigger) -> Result<()> { validate_event_pattern(&trigger.id, &trigger.event_pattern) } +/// Converts a ledger entry into the normalized event envelope used by the trigger plane. +#[must_use] +pub fn event_from_ledger_entry(entry: &LedgerEntry) -> EventEnvelope { + let reference = format!("{}/{}", entry.primitive_type, entry.primitive_id); + let event_name = format!( + "{}.{}", + entry.primitive_type, + format!("{:?}", entry.op).to_lowercase() + ); + EventEnvelope { + id: entry.hash.clone(), + source: EventSourceKind::Ledger, + event_name: Some(event_name), + provider: None, + actor_id: Some(entry.actor.clone()), + occurred_at: entry.ts, + op: Some(entry.op), + primitive_type: Some(entry.primitive_type.clone()), + primitive_id: Some(entry.primitive_id.clone()), + subject_reference: Some(reference), + field_names: entry.fields_changed.clone(), + payload_fields: BTreeMap::from([ + ("op".to_owned(), format!("{:?}", entry.op).to_lowercase()), + ("primitive_type".to_owned(), entry.primitive_type.clone()), + ("primitive_id".to_owned(), entry.primitive_id.clone()), + ]), + } +} + +/// Evaluates all active triggers against a normalized event envelope. +/// +/// # Errors +/// +/// Returns an error when persisted triggers cannot be loaded or decoded. +pub async fn evaluate_event( + workspace: &WorkspacePath, + event: &EventEnvelope, +) -> Result<Vec<MatchedTrigger>> { + let triggers = list_triggers(workspace).await?; + let mut matches = Vec::new(); + for trigger in triggers { + if trigger.status != TriggerStatus::Active { + continue; + } + if !event_pattern_matches(&trigger.event_pattern, event) { + continue; + } + matches.push(MatchedTrigger { + trigger_id: trigger.id.clone(), + title: trigger.title.clone(), + event: event.clone(), + dedup_key: dedup_key(&trigger.id, &event.id), + action_outcomes: build_action_outcomes(workspace, &trigger, event).await?, + }); + } + Ok(matches) +} + /// Evaluates all active triggers against a ledger entry. /// /// # Errors @@ -103,47 +227,110 @@ pub async fn evaluate_ledger_entry( workspace: &WorkspacePath, entry: &LedgerEntry, ) -> Result<Vec<MatchedTrigger>> { - let triggers = list_triggers(workspace).await?; - Ok(triggers - .into_iter() - .filter(|trigger| { - trigger.status == TriggerStatus::Active - && trigger.event_pattern.source == EventSourceKind::Ledger - && ledger_pattern_matches(&trigger.event_pattern, entry) - }) - .map(|trigger| MatchedTrigger { - trigger_id: trigger.id, - title: trigger.title, - action_plans: trigger.action_plans, - }) - .collect()) + evaluate_event(workspace, &event_from_ledger_entry(entry)).await +} + +async fn build_action_outcomes( + workspace: &WorkspacePath, + trigger: &Trigger, + event: &EventEnvelope, +) -> Result<Vec<TriggerActionOutcome>> { + let planning_actor = event.actor_id.clone().unwrap_or_else(system_actor); + let mut action_outcomes = Vec::with_capacity(trigger.action_plans.len()); + for plan in &trigger.action_plans { + let context = action_policy_context(trigger, event, plan); + let decision = evaluate_policy( + workspace, + &planning_actor, + PolicyAction::Create, + TRIGGER_ACTION_PLAN_TYPE, + &context, + ) + .await?; + let (decision, reason) = match decision { + PolicyDecision::Allow => (TriggerPlanDecision::Allow, None), + PolicyDecision::Deny => ( + TriggerPlanDecision::Deny, + Some(format!( + "policy denied create of {TRIGGER_ACTION_PLAN_TYPE} for actor '{}'", + planning_actor + )), + ), + }; + action_outcomes.push(TriggerActionOutcome { + plan: plan.clone(), + decision, + reason, + }); + } + Ok(action_outcomes) +} + +fn action_policy_context( + trigger: &Trigger, + event: &EventEnvelope, + plan: &wg_types::TriggerActionPlan, +) -> PolicyContext { + let mut fields = BTreeMap::new(); + fields.insert("trigger_id".to_owned(), Value::String(trigger.id.clone())); + fields.insert( + "event_source".to_owned(), + Value::String(event.source.as_str().to_owned()), + ); + if let Some(event_name) = &event.event_name { + fields.insert("event_name".to_owned(), Value::String(event_name.clone())); + } + fields.insert("action_kind".to_owned(), Value::String(plan.kind.clone())); + if let Some(target_reference) = &plan.target_reference { + fields.insert( + "target_reference".to_owned(), + Value::String(target_reference.clone()), + ); + } + PolicyContext { fields } } async fn save_trigger_with_audit( workspace: &WorkspacePath, trigger: &Trigger, audit: AuditedWriteRequest, -) -> Result<()> { +) -> Result<LedgerEntry> { let primitive = trigger_to_primitive(trigger)?; - write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; - Ok(()) + let (_, ledger_entry) = + write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; + Ok(ledger_entry) +} + +async fn save_trigger_receipt_with_audit( + workspace: &WorkspacePath, + receipt: &TriggerReceipt, + audit: AuditedWriteRequest, +) -> Result<LedgerEntry> { + let primitive = trigger_receipt_to_primitive(receipt)?; + let (_, ledger_entry) = + write_primitive_audited_now(workspace, &Registry::builtins(), &primitive, audit).await?; + Ok(ledger_entry) } fn validate_event_pattern(trigger_id: &str, pattern: &EventPattern) -> Result<()> { match pattern.source { EventSourceKind::Ledger => { - if pattern.event_name.is_some() || pattern.provider.is_some() { + if pattern.provider.is_some() { return Err(WorkgraphError::ValidationError(format!( - "trigger '{trigger_id}' ledger patterns may not set provider or event_name" + "trigger '{trigger_id}' ledger patterns may not set provider" ))); } - if pattern.ops.is_empty() + if pattern.event_name.is_none() + && pattern.ops.is_empty() && pattern.primitive_types.is_empty() && pattern.primitive_id.is_none() && pattern.field_names.is_empty() + && pattern.actor_id.is_none() + && pattern.subject_reference.is_none() + && pattern.payload_fields.is_empty() { return Err(WorkgraphError::ValidationError(format!( - "trigger '{trigger_id}' ledger pattern must constrain at least one of ops, primitive_types, primitive_id, or field_names" + "trigger '{trigger_id}' ledger pattern must constrain at least one matcher" ))); } } @@ -165,17 +352,43 @@ fn validate_event_pattern(trigger_id: &str, pattern: &EventPattern) -> Result<() Ok(()) } -fn ledger_pattern_matches(pattern: &EventPattern, entry: &LedgerEntry) -> bool { - if !pattern.ops.is_empty() && !pattern.ops.contains(&entry.op) { +fn event_pattern_matches(pattern: &EventPattern, event: &EventEnvelope) -> bool { + if pattern.source != event.source { + return false; + } + if let Some(event_name) = &pattern.event_name { + if event.event_name.as_ref() != Some(event_name) { + return false; + } + } + if let Some(provider) = &pattern.provider { + if event.provider.as_ref() != Some(provider) { + return false; + } + } + if let Some(actor_id) = &pattern.actor_id { + if event.actor_id.as_ref() != Some(actor_id) { + return false; + } + } + if let Some(subject_reference) = &pattern.subject_reference { + if event.subject_reference.as_ref() != Some(subject_reference) { + return false; + } + } + if !pattern.ops.is_empty() && !event.op.is_some_and(|op| pattern.ops.contains(&op)) { return false; } if !pattern.primitive_types.is_empty() - && !pattern.primitive_types.contains(&entry.primitive_type) + && event + .primitive_type + .as_ref() + .is_none_or(|primitive_type| !pattern.primitive_types.contains(primitive_type)) { return false; } if let Some(primitive_id) = &pattern.primitive_id { - if primitive_id != &entry.primitive_id { + if event.primitive_id.as_ref() != Some(primitive_id) { return false; } } @@ -183,15 +396,20 @@ fn ledger_pattern_matches(pattern: &EventPattern, entry: &LedgerEntry) -> bool { && !pattern .field_names .iter() - .all(|field_name| entry.fields_changed.contains(field_name)) + .all(|field_name| event.field_names.contains(field_name)) { return false; } + for (field_name, field_value) in &pattern.payload_fields { + if event.payload_fields.get(field_name) != Some(field_value) { + return false; + } + } true } fn trigger_to_primitive(trigger: &Trigger) -> Result<StoredPrimitive> { - let mut extra_fields = std::collections::BTreeMap::new(); + let mut extra_fields = BTreeMap::new(); extra_fields.insert( "status".to_owned(), serde_yaml::to_value(trigger.status).map_err(encoding_error)?, @@ -206,6 +424,12 @@ fn trigger_to_primitive(trigger: &Trigger) -> Result<StoredPrimitive> { serde_yaml::to_value(&trigger.action_plans).map_err(encoding_error)?, ); } + if let Some(subscription_state) = &trigger.subscription_state { + extra_fields.insert( + "subscription_state".to_owned(), + serde_yaml::to_value(subscription_state).map_err(encoding_error)?, + ); + } Ok(StoredPrimitive { frontmatter: PrimitiveFrontmatter { @@ -218,7 +442,12 @@ fn trigger_to_primitive(trigger: &Trigger) -> Result<StoredPrimitive> { }) } -fn trigger_from_primitive(primitive: &StoredPrimitive) -> Result<Trigger> { +/// Decodes a stored primitive into a typed trigger. +/// +/// # Errors +/// +/// Returns an error when the stored primitive is not a valid trigger payload. +pub fn trigger_from_primitive(primitive: &StoredPrimitive) -> Result<Trigger> { if primitive.frontmatter.r#type != TRIGGER_TYPE { return Err(WorkgraphError::ValidationError(format!( "expected trigger primitive, found '{}'", @@ -251,11 +480,173 @@ fn trigger_from_primitive(primitive: &StoredPrimitive) -> Result<Trigger> { .extra_fields .get("action_plans") .map_or(Ok(Vec::new()), parse_yaml_value)?, + subscription_state: primitive + .frontmatter + .extra_fields + .get("subscription_state") + .map_or(Ok(None), parse_optional_yaml_value)?, }; validate_trigger_definition(&trigger)?; Ok(trigger) } +fn trigger_receipt_to_primitive(receipt: &TriggerReceipt) -> Result<StoredPrimitive> { + let mut extra_fields = BTreeMap::new(); + extra_fields.insert( + "trigger_id".to_owned(), + Value::String(receipt.trigger_id.clone()), + ); + extra_fields.insert( + "trigger_title".to_owned(), + Value::String(receipt.trigger_title.clone()), + ); + extra_fields.insert( + "event_id".to_owned(), + Value::String(receipt.event_id.clone()), + ); + extra_fields.insert( + "event_source".to_owned(), + serde_yaml::to_value(receipt.event_source).map_err(encoding_error)?, + ); + if let Some(event_name) = &receipt.event_name { + extra_fields.insert("event_name".to_owned(), Value::String(event_name.clone())); + } + if let Some(provider) = &receipt.provider { + extra_fields.insert("provider".to_owned(), Value::String(provider.clone())); + } + if let Some(actor_id) = &receipt.actor_id { + extra_fields.insert("actor_id".to_owned(), Value::String(actor_id.to_string())); + } + if let Some(subject_reference) = &receipt.subject_reference { + extra_fields.insert( + "subject_reference".to_owned(), + Value::String(subject_reference.clone()), + ); + } + extra_fields.insert( + "occurred_at".to_owned(), + Value::String(receipt.occurred_at.to_rfc3339()), + ); + extra_fields.insert( + "dedup_key".to_owned(), + Value::String(receipt.dedup_key.clone()), + ); + if !receipt.field_names.is_empty() { + extra_fields.insert( + "field_names".to_owned(), + serde_yaml::to_value(&receipt.field_names).map_err(encoding_error)?, + ); + } + if !receipt.payload_fields.is_empty() { + extra_fields.insert( + "payload_fields".to_owned(), + serde_yaml::to_value(&receipt.payload_fields).map_err(encoding_error)?, + ); + } + if !receipt.action_outcomes.is_empty() { + extra_fields.insert( + "action_outcomes".to_owned(), + serde_yaml::to_value(&receipt.action_outcomes).map_err(encoding_error)?, + ); + } + + Ok(StoredPrimitive { + frontmatter: PrimitiveFrontmatter { + r#type: TRIGGER_RECEIPT_TYPE.to_owned(), + id: receipt.id.clone(), + title: receipt.title.clone(), + extra_fields, + }, + body: String::new(), + }) +} + +/// Decodes a stored primitive into a typed trigger receipt. +/// +/// # Errors +/// +/// Returns an error when the stored primitive is not a valid trigger receipt payload. +pub fn trigger_receipt_from_primitive(primitive: &StoredPrimitive) -> Result<TriggerReceipt> { + if primitive.frontmatter.r#type != TRIGGER_RECEIPT_TYPE { + return Err(WorkgraphError::ValidationError(format!( + "expected trigger receipt primitive, found '{}'", + primitive.frontmatter.r#type + ))); + } + + Ok(TriggerReceiptPrimitive { + id: primitive.frontmatter.id.clone(), + title: primitive.frontmatter.title.clone(), + trigger_id: required_string_field(primitive, "trigger_id")?, + trigger_title: required_string_field(primitive, "trigger_title")?, + event_id: required_string_field(primitive, "event_id")?, + event_source: primitive + .frontmatter + .extra_fields + .get("event_source") + .map(parse_yaml_value) + .transpose()? + .ok_or_else(|| { + WorkgraphError::ValidationError(format!( + "trigger receipt '{}' is missing required event_source", + primitive.frontmatter.id + )) + })?, + event_name: primitive + .frontmatter + .extra_fields + .get("event_name") + .and_then(string_value) + .map(str::to_owned), + provider: primitive + .frontmatter + .extra_fields + .get("provider") + .and_then(string_value) + .map(str::to_owned), + actor_id: primitive + .frontmatter + .extra_fields + .get("actor_id") + .and_then(string_value) + .map(ActorId::new), + subject_reference: primitive + .frontmatter + .extra_fields + .get("subject_reference") + .and_then(string_value) + .map(str::to_owned), + occurred_at: primitive + .frontmatter + .extra_fields + .get("occurred_at") + .map(parse_datetime) + .transpose()? + .ok_or_else(|| { + WorkgraphError::ValidationError(format!( + "trigger receipt '{}' is missing required occurred_at", + primitive.frontmatter.id + )) + })?, + dedup_key: required_string_field(primitive, "dedup_key")?, + field_names: primitive + .frontmatter + .extra_fields + .get("field_names") + .map_or(Ok(Vec::new()), parse_yaml_value)?, + payload_fields: primitive + .frontmatter + .extra_fields + .get("payload_fields") + .map_or(Ok(BTreeMap::new()), parse_yaml_value)?, + action_outcomes: primitive + .frontmatter + .extra_fields + .get("action_outcomes") + .map_or(Ok(Vec::new()), parse_yaml_value)?, + }) +} + fn parse_yaml_value<T>(value: &Value) -> Result<T> where T: serde::de::DeserializeOwned, @@ -263,22 +654,120 @@ where serde_yaml::from_value::<T>(value.clone()).map_err(encoding_error) } +fn parse_optional_yaml_value<T>(value: &Value) -> Result<Option<T>> +where + T: serde::de::DeserializeOwned, +{ + serde_yaml::from_value::<T>(value.clone()) + .map(Some) + .map_err(encoding_error) +} + +fn required_string_field(primitive: &StoredPrimitive, field_name: &str) -> Result<String> { + primitive + .frontmatter + .extra_fields + .get(field_name) + .and_then(string_value) + .map(str::to_owned) + .ok_or_else(|| { + WorkgraphError::ValidationError(format!( + "{} '{}' is missing required {}", + primitive.frontmatter.r#type, primitive.frontmatter.id, field_name + )) + }) +} + +fn string_value(value: &Value) -> Option<&str> { + match value { + Value::String(value) => Some(value.as_str()), + Value::Tagged(tagged) => string_value(&tagged.value), + Value::Null + | Value::Bool(_) + | Value::Number(_) + | Value::Sequence(_) + | Value::Mapping(_) => None, + } +} + +fn parse_datetime(value: &Value) -> Result<DateTime<Utc>> { + match value { + Value::String(value) => DateTime::parse_from_rfc3339(value) + .map(|timestamp| timestamp.with_timezone(&Utc)) + .map_err(encoding_error), + Value::Tagged(tagged) => parse_datetime(&tagged.value), + Value::Null + | Value::Bool(_) + | Value::Number(_) + | Value::Sequence(_) + | Value::Mapping(_) => Err(WorkgraphError::ValidationError( + "expected RFC3339 datetime string".to_owned(), + )), + } +} + +pub(crate) fn dedup_key(trigger_id: &str, event_id: &str) -> String { + format!("{trigger_id}::{event_id}") +} + +pub(crate) fn trigger_receipt_id(trigger_id: &str, event_id: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(trigger_id.as_bytes()); + hasher.update(b"::"); + hasher.update(event_id.as_bytes()); + let digest = hasher.finalize(); + let hash = digest[..12] + .iter() + .map(|byte| format!("{byte:02x}")) + .collect::<String>(); + format!("{}-{hash}", slug_component(trigger_id)) +} + fn encoding_error(error: impl std::fmt::Display) -> WorkgraphError { WorkgraphError::EncodingError(error.to_string()) } +fn system_actor() -> ActorId { + ActorId::new("system:workgraph") +} + +fn slug_component(input: &str) -> String { + let slug = input + .chars() + .map(|character| { + if character.is_ascii_alphanumeric() { + character.to_ascii_lowercase() + } else { + '-' + } + }) + .collect::<String>(); + let trimmed = slug.trim_matches('-'); + if trimmed.is_empty() { + "trigger".to_owned() + } else { + trimmed.to_owned() + } +} + #[cfg(test)] mod tests { + use std::collections::BTreeMap; use tempfile::tempdir; use wg_clock::MockClock; use wg_ledger::{LedgerEntryDraft, LedgerReader, LedgerWriter}; use wg_paths::WorkspacePath; + + use chrono::{TimeZone, Utc}; use wg_types::{ - ActorId, EventPattern, EventSourceKind, LedgerOp, TriggerActionPlan, TriggerPrimitive, - TriggerStatus, + ActorId, EventEnvelope, EventPattern, EventSourceKind, LedgerOp, TriggerActionPlan, + TriggerPlanDecision, TriggerPrimitive, TriggerStatus, }; - use crate::{evaluate_ledger_entry, save_trigger, validate_trigger_definition}; + use crate::{ + evaluate_event, evaluate_ledger_entry, event_from_ledger_entry, save_trigger, + validate_trigger_definition, + }; fn ledger_trigger(id: &str) -> TriggerPrimitive { TriggerPrimitive { @@ -293,12 +782,16 @@ mod tests { primitive_id: None, field_names: vec!["evidence".to_owned()], provider: None, + actor_id: None, + subject_reference: None, + payload_fields: BTreeMap::new(), }, action_plans: vec![TriggerActionPlan { kind: "rebrief_actor".to_owned(), target_reference: Some("agent/cursor".to_owned()), instruction: "Refresh the actor brief".to_owned(), }], + subscription_state: None, } } @@ -316,12 +809,16 @@ mod tests { primitive_id: None, field_names: Vec::new(), provider: None, + actor_id: None, + subject_reference: None, + payload_fields: BTreeMap::new(), }, action_plans: vec![TriggerActionPlan { kind: "notify".to_owned(), target_reference: None, instruction: "Notify someone".to_owned(), }], + subscription_state: None, }; let error = validate_trigger_definition(&invalid) @@ -365,8 +862,13 @@ mod tests { .await .expect("matching should succeed"); assert_eq!(matches.len(), 1); - assert_eq!(matches[0].action_plans.len(), 1); - assert_eq!(matches[0].action_plans[0].kind, "rebrief_actor"); + assert_eq!(matches[0].event.source, EventSourceKind::Ledger); + assert_eq!(matches[0].action_outcomes.len(), 1); + assert_eq!(matches[0].action_outcomes[0].plan.kind, "rebrief_actor"); + assert_eq!( + matches[0].action_outcomes[0].decision, + TriggerPlanDecision::Allow + ); } #[tokio::test] @@ -385,12 +887,16 @@ mod tests { primitive_id: None, field_names: Vec::new(), provider: Some("github".to_owned()), + actor_id: None, + subject_reference: None, + payload_fields: BTreeMap::new(), }, action_plans: vec![TriggerActionPlan { kind: "create_thread".to_owned(), target_reference: Some("project/dealer-portal".to_owned()), instruction: "Create a follow-up thread".to_owned(), }], + subscription_state: None, }; save_trigger(&workspace, &webhook) .await @@ -418,4 +924,90 @@ mod tests { .expect("matching should succeed"); assert!(matches.is_empty()); } + + #[test] + fn event_from_ledger_entry_populates_normalized_fields() { + let entry = wg_types::LedgerEntry { + ts: Utc + .with_ymd_and_hms(2026, 3, 22, 10, 0, 0) + .single() + .expect("timestamp should be valid"), + actor: ActorId::new("agent:cursor"), + op: LedgerOp::Done, + primitive_type: "thread".to_owned(), + primitive_id: "thread-1".to_owned(), + fields_changed: vec!["status".to_owned(), "evidence".to_owned()], + hash: "hash-123".to_owned(), + prev_hash: None, + note: None, + }; + let event = event_from_ledger_entry(&entry); + assert_eq!(event.id, "hash-123"); + assert_eq!(event.source, EventSourceKind::Ledger); + assert_eq!(event.event_name.as_deref(), Some("thread.done")); + assert_eq!(event.subject_reference.as_deref(), Some("thread/thread-1")); + assert_eq!( + event.payload_fields.get("primitive_type"), + Some(&"thread".to_owned()) + ); + } + + #[tokio::test] + async fn internal_event_matching_uses_event_envelope_fields() { + let temp_dir = tempdir().expect("temporary directory should be created"); + let workspace = WorkspacePath::new(temp_dir.path()); + let trigger = TriggerPrimitive { + id: "internal-trigger".to_owned(), + title: "Internal trigger".to_owned(), + status: TriggerStatus::Active, + event_pattern: EventPattern { + source: EventSourceKind::Internal, + event_name: Some("checkpoint.saved".to_owned()), + ops: Vec::new(), + primitive_types: vec!["checkpoint".to_owned()], + primitive_id: None, + field_names: vec!["focus".to_owned()], + provider: Some("signal-bus".to_owned()), + actor_id: Some(ActorId::new("agent:cursor")), + subject_reference: Some("checkpoint/checkpoint-1".to_owned()), + payload_fields: BTreeMap::from([("focus".to_owned(), "Phase 3".to_owned())]), + }, + action_plans: vec![TriggerActionPlan { + kind: "rebrief_actor".to_owned(), + target_reference: Some("agent/cursor".to_owned()), + instruction: "Refresh trigger plane summary".to_owned(), + }], + subscription_state: None, + }; + save_trigger(&workspace, &trigger) + .await + .expect("internal trigger should persist"); + + let event = EventEnvelope { + id: "internal-event-1".to_owned(), + source: EventSourceKind::Internal, + event_name: Some("checkpoint.saved".to_owned()), + provider: Some("signal-bus".to_owned()), + actor_id: Some(ActorId::new("agent:cursor")), + occurred_at: Utc + .with_ymd_and_hms(2026, 3, 22, 10, 0, 0) + .single() + .expect("timestamp should be valid"), + op: None, + primitive_type: Some("checkpoint".to_owned()), + primitive_id: Some("checkpoint-1".to_owned()), + subject_reference: Some("checkpoint/checkpoint-1".to_owned()), + field_names: vec!["focus".to_owned()], + payload_fields: BTreeMap::from([("focus".to_owned(), "Phase 3".to_owned())]), + }; + + let matches = evaluate_event(&workspace, &event) + .await + .expect("internal event should evaluate"); + assert_eq!(matches.len(), 1); + assert_eq!( + matches[0].event.event_name.as_deref(), + Some("checkpoint.saved") + ); + } } diff --git a/crates/wg-trigger/src/mutation.rs b/crates/wg-trigger/src/mutation.rs index 7c381e6..4dde852 100644 --- a/crates/wg-trigger/src/mutation.rs +++ b/crates/wg-trigger/src/mutation.rs @@ -2,9 +2,15 @@ use wg_error::{Result, WorkgraphError}; use wg_paths::WorkspacePath; use wg_policy::{PolicyAction, PolicyContext, PolicyDecision, evaluate as evaluate_policy}; use wg_store::AuditedWriteRequest; -use wg_types::{LedgerOp, TriggerPrimitive}; +use wg_types::{ + ActorId, EventEnvelope, LedgerEntry, LedgerOp, TriggerPrimitive, TriggerReceiptPrimitive, + TriggerSubscriptionState, +}; -use crate::{TRIGGER_TYPE, Trigger, save_trigger_with_audit, validate_trigger_definition}; +use crate::{ + TRIGGER_TYPE, Trigger, TriggerReceipt, dedup_key, save_trigger_receipt_with_audit, + save_trigger_with_audit, trigger_receipt_id, validate_trigger_definition, +}; /// Domain mutation service for durable trigger definitions. /// @@ -29,21 +35,72 @@ impl<'a> TriggerMutationService<'a> { /// /// Returns an error when validation or persistence fails. pub async fn save_trigger(self, trigger: &Trigger) -> Result<()> { + self.save_trigger_as(trigger, system_actor()).await?; + Ok(()) + } + + /// Persists a trigger after validating its contract and auditing the invoking actor. + /// + /// # Errors + /// + /// Returns an error when validation or persistence fails. + pub async fn save_trigger_as(self, trigger: &Trigger, audit_actor: ActorId) -> Result<()> { validate_trigger_definition(trigger)?; let op = if trigger_exists(self.workspace, &trigger.id).await? { LedgerOp::Update } else { LedgerOp::Create }; - let audit = AuditedWriteRequest::new(system_actor(), op) + let audit = AuditedWriteRequest::new(audit_actor, op) .with_note(format!("Saved trigger '{}'", trigger.id)); self.persist(trigger, audit).await } + /// Evaluates a normalized event envelope and persists any resulting trigger receipts. + /// + /// # Errors + /// + /// Returns an error when evaluation or persistence fails. + pub async fn ingest_event(self, event: &EventEnvelope) -> Result<Vec<TriggerReceipt>> { + let matches = crate::evaluate_event(self.workspace, event).await?; + let mut receipts = Vec::with_capacity(matches.len()); + for matched in matches { + let receipt_id = trigger_receipt_id(&matched.trigger_id, &matched.event.id); + let receipt = TriggerReceiptPrimitive { + id: receipt_id.clone(), + title: format!("Trigger receipt: {}", matched.title), + trigger_id: matched.trigger_id.clone(), + trigger_title: matched.title.clone(), + event_id: matched.event.id.clone(), + event_source: matched.event.source, + event_name: matched.event.event_name.clone(), + provider: matched.event.provider.clone(), + actor_id: matched.event.actor_id.clone(), + subject_reference: matched.event.subject_reference.clone(), + occurred_at: matched.event.occurred_at, + dedup_key: dedup_key(&matched.trigger_id, &matched.event.id), + field_names: matched.event.field_names.clone(), + payload_fields: matched.event.payload_fields.clone(), + action_outcomes: matched.action_outcomes.clone(), + }; + let persisted = if trigger_receipt_exists(self.workspace, &receipt.id).await? { + crate::load_trigger_receipt(self.workspace, &receipt.id).await? + } else { + let audit = AuditedWriteRequest::new(system_actor(), LedgerOp::Create) + .with_note(format!("Recorded trigger receipt '{}'", receipt.id)); + save_trigger_receipt_with_audit(self.workspace, &receipt, audit).await?; + receipt + }; + self.update_subscription_state(&persisted).await?; + receipts.push(persisted); + } + Ok(receipts) + } + async fn persist(self, trigger: &TriggerPrimitive, audit: AuditedWriteRequest) -> Result<()> { self.authorize(trigger.id.as_str(), &audit).await?; - save_trigger_with_audit(self.workspace, trigger, audit.clone()).await?; - self.after_mutation(trigger, &audit).await + let ledger_entry = save_trigger_with_audit(self.workspace, trigger, audit.clone()).await?; + self.after_mutation(trigger, &audit, &ledger_entry).await } async fn authorize(self, trigger_id: &str, audit: &AuditedWriteRequest) -> Result<()> { @@ -70,8 +127,40 @@ impl<'a> TriggerMutationService<'a> { self, _trigger: &TriggerPrimitive, _audit: &AuditedWriteRequest, + ledger_entry: &LedgerEntry, ) -> Result<()> { - // Reserved for future trigger runtime update hooks. + crate::ingest_ledger_entry(self.workspace, ledger_entry).await?; + Ok(()) + } + + async fn update_subscription_state(self, receipt: &TriggerReceipt) -> Result<()> { + let mut trigger = crate::load_trigger(self.workspace, &receipt.trigger_id).await?; + let subscription_state = + trigger + .subscription_state + .get_or_insert(TriggerSubscriptionState { + last_evaluated_at: None, + last_matched_at: None, + last_event_id: None, + last_event_name: None, + last_event_cursor: None, + last_receipt_id: None, + }); + subscription_state.last_evaluated_at = Some(receipt.occurred_at); + subscription_state.last_matched_at = Some(receipt.occurred_at); + subscription_state.last_event_id = Some(receipt.event_id.clone()); + subscription_state.last_event_name = receipt.event_name.clone(); + subscription_state.last_event_cursor = Some(receipt.event_id.clone()); + subscription_state.last_receipt_id = Some(receipt.id.clone()); + save_trigger_with_audit( + self.workspace, + &trigger, + AuditedWriteRequest::new(system_actor(), LedgerOp::Update).with_note(format!( + "Updated trigger subscription state for '{}'", + trigger.id + )), + ) + .await?; Ok(()) } } @@ -86,6 +175,16 @@ async fn trigger_exists(workspace: &WorkspacePath, trigger_id: &str) -> Result<b } } +async fn trigger_receipt_exists(workspace: &WorkspacePath, receipt_id: &str) -> Result<bool> { + match crate::load_trigger_receipt(workspace, receipt_id).await { + Ok(_) => Ok(true), + Err(WorkgraphError::IoError(error)) if error.kind() == std::io::ErrorKind::NotFound => { + Ok(false) + } + Err(other) => Err(other), + } +} + fn policy_action_for(op: LedgerOp) -> PolicyAction { match op { LedgerOp::Create => PolicyAction::Create, diff --git a/crates/wg-types/src/coordination.rs b/crates/wg-types/src/coordination.rs index 20b5e0c..3b3086d 100644 --- a/crates/wg-types/src/coordination.rs +++ b/crates/wg-types/src/coordination.rs @@ -1,5 +1,7 @@ //! Coordination, graph, and trigger contracts shared across WorkGraph crates. +use std::collections::BTreeMap; + use crate::{ActorId, ExternalRef, LedgerOp}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -318,6 +320,18 @@ pub enum EventSourceKind { Internal, } +impl EventSourceKind { + /// Returns the stable serialized value used in frontmatter and JSON. + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Ledger => "ledger", + Self::Webhook => "webhook", + Self::Internal => "internal", + } + } +} + /// Trigger event matching contract. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct EventPattern { @@ -341,6 +355,53 @@ pub struct EventPattern { /// Optional provider or emitter name for webhook/internal sources. #[serde(default, skip_serializing_if = "Option::is_none")] pub provider: Option<String>, + /// Optional actor identifier that must have emitted the event. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub actor_id: Option<ActorId>, + /// Optional subject reference that must match the event subject. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub subject_reference: Option<String>, + /// Optional payload field values that must match exactly. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub payload_fields: BTreeMap<String, String>, +} + +/// Normalized event envelope evaluated against trigger definitions. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EventEnvelope { + /// Stable event identifier used for replay-safe deduplication. + pub id: String, + /// Event source kind. + pub source: EventSourceKind, + /// Optional stable event name. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_name: Option<String>, + /// Optional provider or emitter name. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub provider: Option<String>, + /// Actor that emitted or initiated the event, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub actor_id: Option<ActorId>, + /// Time the event occurred. + pub occurred_at: DateTime<Utc>, + /// Ledger operation for ledger-backed events, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub op: Option<LedgerOp>, + /// Primitive type associated with the event subject, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub primitive_type: Option<String>, + /// Primitive identifier associated with the event subject, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub primitive_id: Option<String>, + /// Durable subject reference in `type/id` form, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub subject_reference: Option<String>, + /// Field names included in the event payload. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub field_names: Vec<String>, + /// Normalized payload values for richer trigger matching. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub payload_fields: BTreeMap<String, String>, } /// Action plan yielded by a matched trigger. @@ -355,6 +416,51 @@ pub struct TriggerActionPlan { pub instruction: String, } +/// Persistent health and replay state tracked on a trigger definition. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerSubscriptionState { + /// Most recent time an event was evaluated against this trigger. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_evaluated_at: Option<DateTime<Utc>>, + /// Most recent time an event matched this trigger. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_matched_at: Option<DateTime<Utc>>, + /// Identifier of the last evaluated event, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_event_id: Option<String>, + /// Stable event name of the last evaluated event, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_event_name: Option<String>, + /// Cursor or hash for replay-oriented sources, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_event_cursor: Option<String>, + /// Most recent receipt emitted by this trigger, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_receipt_id: Option<String>, +} + +/// Policy decision attached to one trigger-emitted action plan. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TriggerPlanDecision { + /// The action plan is allowed to remain pending. + Allow, + /// The action plan is recorded but suppressed by policy. + Deny, +} + +/// Durable outcome for one action plan yielded by a trigger. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerActionOutcome { + /// Planned follow-up action produced by the trigger. + pub plan: TriggerActionPlan, + /// Policy decision recorded for this plan. + pub decision: TriggerPlanDecision, + /// Optional explanation when the plan was suppressed. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, +} + /// Durable trigger document model. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TriggerPrimitive { @@ -369,6 +475,51 @@ pub struct TriggerPrimitive { /// Action plans emitted when the pattern matches. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub action_plans: Vec<TriggerActionPlan>, + /// Persistent subscription and replay metadata for this trigger. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub subscription_state: Option<TriggerSubscriptionState>, +} + +/// Durable trigger receipt recording one matched event and its planned actions. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TriggerReceiptPrimitive { + /// Stable trigger receipt identifier. + pub id: String, + /// Trigger receipt title. + pub title: String, + /// Trigger that matched the event. + pub trigger_id: String, + /// Trigger title at the time of receipt creation. + pub trigger_title: String, + /// Stable identifier of the matched event. + pub event_id: String, + /// Event source kind that produced the receipt. + pub event_source: EventSourceKind, + /// Stable event name, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_name: Option<String>, + /// Provider or emitter for webhook/internal events, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub provider: Option<String>, + /// Actor associated with the matched event, when any. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub actor_id: Option<ActorId>, + /// Durable subject reference associated with the event, when known. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub subject_reference: Option<String>, + /// Time the triggering event occurred. + pub occurred_at: DateTime<Utc>, + /// Replay-safe deduplication key for this trigger/event pair. + pub dedup_key: String, + /// Payload field names observed on the event. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub field_names: Vec<String>, + /// Normalized payload values retained for inspection and replay. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub payload_fields: BTreeMap<String, String>, + /// Durable action outcomes recorded for this receipt. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub action_outcomes: Vec<TriggerActionOutcome>, } /// Durable checkpoint document model. @@ -443,10 +594,12 @@ const fn default_true() -> bool { #[cfg(test)] mod tests { use super::{ - CheckpointPrimitive, ConversationMessage, CoordinationAction, EventPattern, + CheckpointPrimitive, ConversationMessage, CoordinationAction, EventEnvelope, EventPattern, EventSourceKind, EvidenceItem, GraphEdgeKind, GraphEdgeReference, GraphEdgeSource, LineageMode, MessageKind, MissionMilestone, MissionPrimitive, MissionStatus, RunPrimitive, - ThreadExitCriterion, ThreadPrimitive, TriggerActionPlan, TriggerPrimitive, TriggerStatus, + ThreadExitCriterion, ThreadPrimitive, TriggerActionOutcome, TriggerActionPlan, + TriggerPlanDecision, TriggerPrimitive, TriggerReceiptPrimitive, TriggerStatus, + TriggerSubscriptionState, }; use crate::{ActorId, ExternalRef, RunStatus, ThreadStatus}; use chrono::{TimeZone, Utc}; @@ -577,12 +730,76 @@ mod tests { primitive_id: None, field_names: vec!["evidence".into()], provider: None, + actor_id: Some(ActorId::new("agent:cursor")), + subject_reference: Some("thread/thread-1".into()), + payload_fields: BTreeMap::from([("status".to_owned(), "done".to_owned())]), }, action_plans: vec![TriggerActionPlan { kind: "rebrief_actor".into(), target_reference: Some("agent:cursor".into()), instruction: "Refresh the active brief with new evidence.".into(), }], + subscription_state: Some(TriggerSubscriptionState { + last_evaluated_at: Some( + Utc.with_ymd_and_hms(2026, 3, 22, 9, 30, 0) + .single() + .expect("valid timestamp"), + ), + last_matched_at: Some( + Utc.with_ymd_and_hms(2026, 3, 22, 9, 31, 0) + .single() + .expect("valid timestamp"), + ), + last_event_id: Some("event-1".into()), + last_event_name: Some("thread.done".into()), + last_event_cursor: Some("hash-1".into()), + last_receipt_id: Some("trigger-receipt-1".into()), + }), + }; + let event = EventEnvelope { + id: "event-1".into(), + source: EventSourceKind::Ledger, + event_name: Some("thread.done".into()), + provider: None, + actor_id: Some(ActorId::new("agent:cursor")), + occurred_at: Utc + .with_ymd_and_hms(2026, 3, 22, 9, 31, 0) + .single() + .expect("valid timestamp"), + op: Some(crate::LedgerOp::Done), + primitive_type: Some("thread".into()), + primitive_id: Some("thread-1".into()), + subject_reference: Some("thread/thread-1".into()), + field_names: vec!["evidence".into(), "status".into()], + payload_fields: BTreeMap::from([("status".to_owned(), "done".to_owned())]), + }; + let receipt = TriggerReceiptPrimitive { + id: "trigger-receipt-1".into(), + title: "Trigger receipt: React to completed deployments".into(), + trigger_id: "trigger-1".into(), + trigger_title: "React to completed deployments".into(), + event_id: "event-1".into(), + event_source: EventSourceKind::Ledger, + event_name: Some("thread.done".into()), + provider: None, + actor_id: Some(ActorId::new("agent:cursor")), + subject_reference: Some("thread/thread-1".into()), + occurred_at: Utc + .with_ymd_and_hms(2026, 3, 22, 9, 31, 0) + .single() + .expect("valid timestamp"), + dedup_key: "trigger-1::event-1".into(), + field_names: vec!["evidence".into(), "status".into()], + payload_fields: BTreeMap::from([("status".to_owned(), "done".to_owned())]), + action_outcomes: vec![TriggerActionOutcome { + plan: TriggerActionPlan { + kind: "rebrief_actor".into(), + target_reference: Some("agent:cursor".into()), + instruction: "Refresh the active brief with new evidence.".into(), + }, + decision: TriggerPlanDecision::Allow, + reason: None, + }], }; let checkpoint = CheckpointPrimitive { id: "checkpoint-1".into(), @@ -606,6 +823,8 @@ mod tests { roundtrip(&mission); roundtrip(&run); roundtrip(&trigger); + roundtrip(&event); + roundtrip(&receipt); roundtrip(&checkpoint); roundtrip(&edge); roundtrip(&LineageMode::Opaque); diff --git a/crates/wg-types/src/lib.rs b/crates/wg-types/src/lib.rs index c510db7..b420eb2 100644 --- a/crates/wg-types/src/lib.rs +++ b/crates/wg-types/src/lib.rs @@ -15,10 +15,12 @@ mod tier_two; pub use config::WorkgraphConfig; pub use coordination::{ - CheckpointPrimitive, ConversationMessage, CoordinationAction, EventPattern, EventSourceKind, - EvidenceItem, GraphEdgeKind, GraphEdgeReference, GraphEdgeSource, LineageMode, MessageKind, - MissionMilestone, MissionPrimitive, MissionStatus, RunPrimitive, ThreadExitCriterion, - ThreadPrimitive, TriggerActionPlan, TriggerPrimitive, TriggerStatus, + CheckpointPrimitive, ConversationMessage, CoordinationAction, EventEnvelope, EventPattern, + EventSourceKind, EvidenceItem, GraphEdgeKind, GraphEdgeReference, GraphEdgeSource, LineageMode, + MessageKind, MissionMilestone, MissionPrimitive, MissionStatus, RunPrimitive, + ThreadExitCriterion, ThreadPrimitive, TriggerActionOutcome, TriggerActionPlan, + TriggerPlanDecision, TriggerPrimitive, TriggerReceiptPrimitive, TriggerStatus, + TriggerSubscriptionState, }; pub use identity::{ActorId, NodeId, WorkspaceId}; pub use ledger::{LedgerEntry, LedgerOp}; diff --git a/crates/wg-types/src/registry.rs b/crates/wg-types/src/registry.rs index 77cb7e4..40179c5 100644 --- a/crates/wg-types/src/registry.rs +++ b/crates/wg-types/src/registry.rs @@ -534,7 +534,7 @@ fn builtin_types() -> Vec<PrimitiveType> { builtin_type( "trigger", "triggers", - "Reactive automation definition driven by ledger events.", + "Reactive automation definition driven by normalized event evaluation.", vec![ field("id", "string", "Stable trigger identifier", true, false), field("title", "string", "Trigger title", true, false), @@ -559,6 +559,119 @@ fn builtin_types() -> Vec<PrimitiveType> { false, true, ), + field( + "subscription_state", + "object", + "Persistent replay and health metadata for the trigger subscription", + false, + false, + ), + ], + ), + builtin_type( + "trigger_receipt", + "trigger_receipts", + "Durable receipt for one matched trigger event and its planned follow-up actions.", + vec![ + field( + "id", + "string", + "Stable trigger receipt identifier", + true, + false, + ), + field("title", "string", "Trigger receipt title", true, false), + field( + "trigger_id", + "string", + "Trigger identifier that matched the event", + true, + false, + ), + field( + "trigger_title", + "string", + "Trigger title at receipt creation time", + true, + false, + ), + field( + "event_id", + "string", + "Stable matched event identifier", + true, + false, + ), + field( + "event_source", + "event_source_kind", + "Source kind that produced the matched event", + true, + false, + ), + field( + "event_name", + "string", + "Stable matched event name when known", + false, + false, + ), + field( + "provider", + "string", + "Provider or emitter for webhook/internal events", + false, + false, + ), + field( + "actor_id", + "string", + "Actor associated with the matched event", + false, + false, + ), + field( + "subject_reference", + "string", + "Durable subject reference associated with the event", + false, + false, + ), + field( + "occurred_at", + "datetime", + "Timestamp when the matched event occurred", + true, + false, + ), + field( + "dedup_key", + "string", + "Replay-safe trigger/event deduplication key", + true, + false, + ), + field( + "field_names", + "string[]", + "Normalized field names observed on the event", + false, + true, + ), + field( + "payload_fields", + "object", + "Normalized payload values retained for inspection and replay", + false, + false, + ), + field( + "action_outcomes", + "object[]", + "Durable action outcomes recorded for this receipt", + false, + true, + ), ], ), builtin_type( @@ -643,7 +756,7 @@ mod tests { fn builtin_registry_contains_all_required_types() { let registry = Registry::builtins(); - assert_eq!(registry.types.len(), 17); + assert_eq!(registry.types.len(), 18); assert_eq!( registry .get_type("person") @@ -660,6 +773,7 @@ mod tests { ); assert!(registry.get_type("thread").is_some()); assert!(registry.get_type("trigger").is_some()); + assert!(registry.get_type("trigger_receipt").is_some()); assert!(registry.get_type("checkpoint").is_some()); } @@ -670,6 +784,6 @@ mod tests { let decoded: Registry = serde_json::from_str(&json).expect("registry should deserialize"); assert_eq!(decoded, registry); - assert_eq!(decoded.list_types().len(), 17); + assert_eq!(decoded.list_types().len(), 18); } } diff --git a/docs/context-graph.md b/docs/context-graph.md index 0b49c9d..bca359d 100644 --- a/docs/context-graph.md +++ b/docs/context-graph.md @@ -14,7 +14,7 @@ Important node classes include: - Tier 1 knowledge primitives: `decision`, `pattern`, `lesson`, `policy`, `relationship`, `strategic_note` - Tier 2 context primitives: `org`, `team`, `person`, `agent`, `client`, `project` -- Coordination primitives: `thread`, `mission`, `run`, `trigger`, `checkpoint` +- Coordination primitives: `thread`, `mission`, `run`, `trigger`, `trigger_receipt`, `checkpoint` External systems remain authoritative unless WorkGraph has an explicit reason to cache or model their state locally. @@ -27,7 +27,7 @@ The graph uses semantic edge kinds. Wiki-links are only one source. - `assignment` — actor-to-thread, actor-to-run, or equivalent ownership/assignment edges - `containment` — mission-to-thread, thread-to-run, or mission-to-run structure - `evidence` — evidence support edges from threads to supporting records -- `trigger` — trigger-rule edges to relevant targets or action targets +- `trigger` — trigger-rule edges to relevant targets, subjects, or action targets Agent lineage references such as `parent_actor_id` and `root_actor_id` are part of the durable assignment/lineage surface and should appear in graph-derived orientation outputs when present. @@ -75,7 +75,7 @@ Status surfaces should expose at least: ## Events And The Graph -Ledger events are graph-adjacent coordination facts. +Ledger events and normalized ingested events are graph-adjacent coordination facts. They are not themselves the graph, but they are durable signals that: @@ -83,15 +83,15 @@ They are not themselves the graph, but they are durable signals that: - orientation surfaces summarize - future automation planes react to -The graph holds state. The ledger records state change. Both matter. +The graph holds state. The ledger records state change. Trigger receipts preserve durable event-to-plan outcomes. All three matter. Durable primitive mutation paths should emit ledger events consistently so status, trigger -evaluation, and auditability observe the same state transitions. +evaluation, trigger receipt persistence, and auditability observe the same state transitions. Coordination-family writes should flow through explicit domain mutation services above the audited store layer. Those services own operation-specific semantics, policy checks, audited -writes, and future trigger hook integration so the graph and ledger observe one coherent mutation -contract per primitive family instead of scattered persistence helpers. +writes, and trigger hook integration so the graph, ledger, and trigger receipt plane observe one +coherent mutation contract per primitive family instead of scattered persistence helpers. ## Query Expectations @@ -102,5 +102,6 @@ A useful WorkGraph query should be able to answer not only “what links to this - what evidence supports completion? - what durable relationship makes this relevant? - what trigger rule reacts to this kind of change? +- what trigger receipts and pending planned actions were produced by recent events? If the graph cannot answer those questions, the semantics are not yet first-class enough. diff --git a/docs/foundation.md b/docs/foundation.md index 770654f..03e1918 100644 --- a/docs/foundation.md +++ b/docs/foundation.md @@ -14,7 +14,7 @@ Execution tools can do work, but they do not naturally preserve the organization - durable actor lineage and delegation context - durable event history and planned downstream actions -In this repo, "context graph" means a typed graph built from durable primitives and provenanced edges. It does not mean a fuzzy wiki-link memory map. Likewise, the current trigger substrate evaluates durable events into planned follow-up actions; live execution loops are a later layer. +In this repo, "context graph" means a typed graph built from durable primitives and provenanced edges. It does not mean a fuzzy wiki-link memory map. Likewise, the current trigger substrate evaluates normalized durable events into durable trigger receipts and planned follow-up actions; live execution loops are a later layer. ## Product Boundary @@ -23,7 +23,7 @@ WorkGraph is: - the durable system of record for organizational context and agentic work - the durable coordination layer for missions, threads, runs, checkpoints, and evidence-backed handoffs - the typed graph and immutable ledger that heterogeneous agents can consult and act through -- the durable trigger evaluation substrate that turns events into planned actions +- the durable trigger evaluation substrate that turns events into planned actions and durable trigger receipts WorkGraph is not: @@ -59,6 +59,7 @@ Specifically: - explicit domain mutation services per primitive family so semantic checks, policy, audited writes, and future hooks share one contract - trigger evaluation over durable events +- durable trigger receipts with replay-safe deduplication - cross-agent continuity through missions, threads, runs, checkpoints, and action plans ## Deployment Modes diff --git a/docs/operating-model.md b/docs/operating-model.md index 9f6a446..bb52c50 100644 --- a/docs/operating-model.md +++ b/docs/operating-model.md @@ -557,7 +557,20 @@ This foundation pass supports event source contracts for: - `webhook` - `internal` -Concrete matching is implemented for ledger events in this pass. The other sources are part of the schema contract but not yet live runtime surfaces. +Phase 3 expands the substrate from ledger-only evaluation into a normalized event +plane. Ledger, internal, and externally ingested webhook-shaped events can all be +converted into the same event envelope, evaluated against active triggers, and +recorded as durable `trigger_receipt` primitives. + +Those receipts are important because WorkGraph still does not auto-execute trigger +effects in this pass. Instead it durably records: + +- which trigger matched +- which normalized event produced the match +- which planned follow-up actions were emitted +- which of those actions were suppressed by policy +- what replay-safe deduplication key prevents duplicate receipts for the same + trigger/event pair Kernel and CLI mutation paths append durable ledger entries for persisted coordination changes so trigger evaluation can observe real thread, mission, run, trigger, and checkpoint state transitions. Those mutation paths should flow through primitive-family domain mutation services that own lifecycle semantics, policy checks, audited writes, and future trigger hooks rather than composing store writes ad hoc at call sites. @@ -576,7 +589,9 @@ That means: - evidence references the criteria it satisfies - completion is validated, not assumed -Planned update actions and completion actions are durable follow-up intentions. They are not automatically executed in this foundation pass. +Planned update actions, completion actions, and trigger-emitted action plans are +durable follow-up intentions. They are not automatically executed in this +foundation pass. ## Surface Model @@ -601,9 +616,14 @@ Coordination commands now include: - `workgraph run start|complete|fail|cancel <run-id>` — transition run lifecycle state - `workgraph checkpoint --working-on ... --focus ...` — persist working context - `workgraph ledger [--last N]` — inspect recent immutable ledger entries +- `workgraph trigger validate <trigger-id>` — validate one persisted trigger +- `workgraph trigger replay [--last N]` — replay recent ledger entries through the trigger plane +- `workgraph trigger ingest --source ... --event-id ... --event-name ...` — ingest a normalized internal or webhook-shaped event through the CLI `workgraph status` also surfaces graph hygiene (`graph_issues`, `orphan_nodes`) -and thread evidence gaps in both human and JSON output. +and thread evidence gaps in both human and JSON output. Phase 3 also adds trigger +health, recent trigger receipts, and pending trigger action counts to those +orientation surfaces. ## Delegation And Handoff diff --git a/docs/roadmap.md b/docs/roadmap.md index f280443..753820a 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -41,6 +41,8 @@ Scope: - durable trigger subscriptions - policy-aware action planning - ingress from internal and external event sources +- durable trigger receipts and replay-safe deduplication +- CLI-first trigger validation, replay, ingest, and status surfaces ## Phase 4 — Remote Access Surfaces (MCP as Cloud Adapter)