diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 90cbbac0c..8e7cfa7f2 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -805,7 +805,7 @@ Docker Compose provides the full local development stack. All services include h Search runs over the `events.search_tsv` generated `tsvector` column on the `events` table (no separate collection or service). The column is populated on insert — `to_tsvector('simple', content)` — and excludes privacy-sensitive -kinds via `CASE WHEN kind IN (1059, 30300, 30622) THEN NULL`, so those rows are +kinds via `CASE WHEN kind IN (1059, 30300, 30622, 44100, 44101, 44200) THEN NULL`, so those rows are storage-level unsearchable (a `NULL` tsvector never matches `@@`). A GIN index (`idx_events_search_tsv`) backs the `@@` probe; in multi-community mode the community-leading btree filters BitmapAnd with the GIN probe so every query is diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f3f00e7b3..bd836d2dd 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -381,8 +381,9 @@ for team access setup, onboarding, and the full repo inventory. See 6. **Index for search** (if applicable) — Postgres FTS indexes persisted events automatically via the `events.search_tsv` generated column. To exclude a privacy-sensitive kind from search, add it to the `CASE WHEN - kind IN (...)` exclusion in the `search_tsv` definition (see the initial - schema migration) rather than wiring a separate indexer. + kind IN (...)` exclusion in `schema/schema.sql` and add a new additive + migration (see `migrations/0004_agent_turn_metric_fts.sql` as the model) + rather than modifying the initial migration or wiring a separate indexer. 7. **Audit** — the audit log captures all events automatically; no changes needed unless you need custom audit metadata. diff --git a/Cargo.lock b/Cargo.lock index 5aa78909a..448797730 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,9 +780,11 @@ dependencies = [ "async-trait", "axum", "base64", + "chrono", "getrandom 0.4.2", "hex", "nix 0.31.3", + "nostr", "reqwest 0.13.3", "rmcp", "serde", @@ -794,6 +796,7 @@ dependencies = [ "tracing", "tracing-subscriber", "urlencoding", + "uuid", "webbrowser", ] diff --git a/crates/buzz-acp/src/acp.rs b/crates/buzz-acp/src/acp.rs index 0ba5a8c6a..ea11fce12 100644 --- a/crates/buzz-acp/src/acp.rs +++ b/crates/buzz-acp/src/acp.rs @@ -14,6 +14,7 @@ use tokio::process::{Child, ChildStdin, ChildStdout}; use tokio_util::codec::{FramedRead, LinesCodec, LinesCodecError}; use crate::observer::{ObserverContext, ObserverHandle}; +use crate::usage::{TurnUsage, UsageTracker}; /// Maximum allowed size of a single NDJSON line from the agent's stdout. /// Lines exceeding this limit are rejected to prevent OOM from rogue agents. @@ -167,6 +168,11 @@ pub struct AcpClient { /// outside of a goose-native turn — the read loop's steer arm is /// disabled in that case. steer_rx: Option>, + /// Usage tracker — accumulates cumulative token counts from + /// `_goose/unstable/session/update` notifications and computes per-turn + /// deltas. Both goose and buzz-agent emit this notification; goose gates + /// on client capability advertisement, buzz-agent emits unconditionally. + goose_usage: UsageTracker, } impl AcpClient { @@ -258,6 +264,7 @@ impl AcpClient { observer_context: ObserverContext::default(), active_run_id: None, steer_rx: None, + goose_usage: UsageTracker::default(), }) } @@ -303,7 +310,16 @@ impl AcpClient { // on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges. let params = serde_json::json!({ "protocolVersion": 2, - "clientCapabilities": {}, + "clientCapabilities": { + // Signal to goose that we handle `_goose/unstable/session/update` + // notifications. Without this the custom notification is suppressed + // on goose's side and usage data is never emitted. + "_meta": { + "goose": { + "customNotifications": true + } + } + }, "clientInfo": { "name": "buzz-acp", "version": env!("CARGO_PKG_VERSION") @@ -427,6 +443,11 @@ impl AcpClient { let hard_deadline = tokio::time::Instant::now() + max_duration; self.current_hard_deadline = Some(hard_deadline); + // Mark the usage tracker as in-flight for this turn BEFORE sending the + // prompt so that any setup notifications recorded earlier are not + // misattributed to this turn. + self.goose_usage.begin_turn(session_id); + self.last_prompt_id = Some(self.next_id); let id = self.next_id; self.next_id += 1; @@ -502,6 +523,20 @@ impl AcpClient { self.active_run_id.as_deref() } + /// Consume and return the per-turn usage record computed from the most + /// recent `_goose/unstable/session/update` notification. + /// + /// Returns `None` if no usage update arrived since the last call (i.e. + /// the harness did not emit one for this turn, or this is not a goose + /// agent). Must be called at most once per turn; subsequent calls return + /// `None` until the next `usage_update` notification is recorded. + /// + /// Intended for consumption by `publish_agent_turn_metric` in `pool.rs` to + /// publish a kind 44200 NIP-AM event. + pub fn take_turn_usage(&mut self) -> Option { + self.goose_usage.take() + } + /// Install a per-turn steer request channel for goose-native /// non-cancelling mid-turn delivery. /// @@ -840,6 +875,9 @@ impl AcpClient { "session/update" => { let _ = self.handle_session_update(&msg); } + "_goose/unstable/session/update" => { + self.handle_goose_usage_update(&msg); + } "session/request_permission" => { self.handle_permission_request(&msg).await?; } @@ -1170,6 +1208,9 @@ impl AcpClient { idle_deadline = Instant::now() + idle_timeout; } } + "_goose/unstable/session/update" => { + self.handle_goose_usage_update(&msg); + } "session/request_permission" => { self.handle_permission_request(&msg).await?; } @@ -1311,6 +1352,46 @@ impl AcpClient { } } + /// Parse a `_goose/unstable/session/update` notification and record the + /// usage snapshot in the per-session tracker. + /// + /// Silently ignores malformed or non-`usage_update` variants — the + /// notification is best-effort observability data, not a protocol + /// requirement. Failures are logged at debug level. + fn handle_goose_usage_update(&mut self, msg: &serde_json::Value) { + use crate::usage::{GooseSessionUpdateNotification, GooseSessionUpdateVariant}; + let params = match msg.get("params") { + Some(p) => p, + None => { + tracing::debug!( + target: "acp::usage", + "_goose/unstable/session/update: missing params" + ); + return; + } + }; + match serde_json::from_value::(params.clone()) { + Ok(notif) => { + if let GooseSessionUpdateVariant::UsageUpdate(payload) = ¬if.update { + tracing::debug!( + target: "acp::usage", + session_id = %notif.session_id, + input = payload.accumulated_input_tokens, + output = payload.accumulated_output_tokens, + "goose usage update" + ); + self.goose_usage.record(¬if.session_id, payload); + } + } + Err(e) => { + tracing::debug!( + target: "acp::usage", + "_goose/unstable/session/update: deserialization error: {e}" + ); + } + } + } + /// Auto-approve a `session/request_permission` request from the agent. /// /// Finds the option with `kind == "allow_once"` and responds with its `optionId`. @@ -1782,7 +1863,13 @@ mod tests { "method": "initialize", "params": { "protocolVersion": 2, - "clientCapabilities": {}, + "clientCapabilities": { + "_meta": { + "goose": { + "customNotifications": true + } + } + }, "clientInfo": { "name": "buzz-acp", "version": "0.1.0" @@ -1795,6 +1882,11 @@ mod tests { Some("buzz-acp") ); assert!(msg["params"]["clientCapabilities"].is_object()); + assert_eq!( + msg["params"]["clientCapabilities"]["_meta"]["goose"]["customNotifications"].as_bool(), + Some(true), + "goose customNotifications capability must be advertised" + ); } #[test] @@ -2825,4 +2917,94 @@ mod tests { other => panic!("expected SteerAck::Success, got {other:?}"), } } + + // ── Goose usage notification integration ────────────────────────────── + + /// Build a `_goose/unstable/session/update` JSON-RPC notification. + fn goose_usage_update_msg( + session_id: &str, + input: u64, + output: u64, + cost: Option, + ) -> serde_json::Value { + let mut update = serde_json::json!({ + "sessionUpdate": "usage_update", + "used": input + output, + "contextLimit": 200000u64, + "accumulatedInputTokens": input, + "accumulatedOutputTokens": output, + }); + if let Some(c) = cost { + update["accumulatedCost"] = serde_json::json!(c); + } + serde_json::json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { + "sessionId": session_id, + "update": update + } + }) + } + + #[tokio::test] + async fn goose_usage_notification_recorded_and_take_returns_usage() { + let mut client = spawn_inert_client().await; + assert!(client.take_turn_usage().is_none(), "starts empty"); + + // begin_turn before sending the prompt — mirrors the real call flow. + client.goose_usage.begin_turn("s1"); + let msg = goose_usage_update_msg("s1", 1000, 200, Some(0.01)); + client.handle_goose_usage_update(&msg); + + let usage = client + .take_turn_usage() + .expect("usage should be present after notification"); + assert_eq!(usage.session_id, "s1"); + assert_eq!(usage.turn_seq, 1); + assert!(!usage.delta_reliable, "first turn must be unreliable"); + assert_eq!(usage.cumulative_input_tokens, 1000); + assert_eq!(usage.cumulative_output_tokens, 200); + assert_eq!(usage.cumulative_cost_usd, Some(0.01)); + + // Second take must be None. + assert!( + client.take_turn_usage().is_none(), + "take after drain is None" + ); + } + + #[tokio::test] + async fn goose_usage_second_turn_delta_reliable() { + let mut client = spawn_inert_client().await; + // Turn 1. + client.goose_usage.begin_turn("s2"); + client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1000, 200, None)); + let _ = client.take_turn_usage(); + // Turn 2. + client.goose_usage.begin_turn("s2"); + client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1800, 450, None)); + let usage = client.take_turn_usage().expect("turn 2 usage"); + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + } + + #[tokio::test] + async fn goose_usage_malformed_notification_does_not_panic() { + let mut client = spawn_inert_client().await; + // Missing params entirely. + let bad = serde_json::json!({"jsonrpc":"2.0","method":"_goose/unstable/session/update"}); + client.handle_goose_usage_update(&bad); + assert!(client.take_turn_usage().is_none()); + + // params present but wrong shape. + let bad2 = serde_json::json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { "oops": true } + }); + client.handle_goose_usage_update(&bad2); + assert!(client.take_turn_usage().is_none()); + } } diff --git a/crates/buzz-acp/src/config.rs b/crates/buzz-acp/src/config.rs index 8100ea71a..d139dc6cd 100644 --- a/crates/buzz-acp/src/config.rs +++ b/crates/buzz-acp/src/config.rs @@ -541,7 +541,7 @@ fn validate_multiple_event_handling( Ok(()) } -fn normalize_agent_command_identity(command: &str) -> String { +pub(crate) fn normalize_agent_command_identity(command: &str) -> String { let normalized = command.trim().replace('\\', "/"); let trimmed = normalized.trim_end_matches('/'); let basename = trimmed diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index be873bdd0..4a7fdc851 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -8,6 +8,9 @@ mod observer; mod pool; mod queue; mod relay; +mod usage; + +pub use usage::TurnUsage; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -1402,6 +1405,7 @@ async fn tokio_main() -> Result<()> { .as_deref() .and_then(|hex| nostr::PublicKey::from_hex(hex).ok()), memory_enabled: config.memory_enabled, + harness_name: crate::config::normalize_agent_command_identity(&config.agent_command), }); if !config.memory_enabled { diff --git a/crates/buzz-acp/src/pool.rs b/crates/buzz-acp/src/pool.rs index 7cb7e36e7..0d3c2fd49 100644 --- a/crates/buzz-acp/src/pool.rs +++ b/crates/buzz-acp/src/pool.rs @@ -391,6 +391,9 @@ pub struct PromptContext { /// `[Agent Memory — core]` section. On by default; disabled via /// `--no-memory` / `BUZZ_ACP_NO_MEMORY`. pub memory_enabled: bool, + /// Harness identity string for NIP-AM `harness` field. Derived from the + /// configured `agent_command` at startup (e.g. `"goose"`, `"buzz-agent"`). + pub harness_name: String, } impl AgentPool { @@ -1539,6 +1542,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1553,6 +1566,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1568,6 +1591,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1582,6 +1615,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1626,6 +1669,16 @@ pub async fn run_prompt_task( &source, &control_signal, ); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1676,6 +1729,18 @@ pub async fn run_prompt_task( agent.state.invalidate(&source); } + let core_stop = acp_stop_to_core(&stop_reason); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(core_stop), + ) + .await; + send_prompt_result( &result_tx, agent, @@ -1687,6 +1752,16 @@ pub async fn run_prompt_task( Err(AcpError::AgentExited) => { tracing::error!(target: "pool::prompt", "agent {} exited during prompt", agent.index); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1708,6 +1783,16 @@ pub async fn run_prompt_task( { Ok(stop_reason) => { log_stop_reason(&source, &stop_reason); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; // Timeout triggers respawn in handle_prompt_result — // session state will be discarded with the old agent. send_prompt_result( @@ -1725,6 +1810,16 @@ pub async fn run_prompt_task( agent.index ); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1739,6 +1834,16 @@ pub async fn run_prompt_task( "cancel_with_cleanup error: {e} — invalidating session" ); agent.state.invalidate(&source); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1756,6 +1861,16 @@ pub async fn run_prompt_task( ctx.max_turn_duration.as_secs() ); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1772,6 +1887,16 @@ pub async fn run_prompt_task( if !matches!(e, AcpError::AgentError(_)) { agent.state.invalidate(&source); } + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -2557,6 +2682,135 @@ impl Drop for TurnCompletionGuard { } } +/// Map an ACP `StopReason` to the NIP-AM `StopReason` used in kind 44200 payloads. +fn acp_stop_to_core(r: &StopReason) -> buzz_core::agent_turn_metric::StopReason { + use buzz_core::agent_turn_metric::StopReason as CoreStop; + match r { + StopReason::EndTurn => CoreStop::EndTurn, + StopReason::Cancelled => CoreStop::Cancelled, + StopReason::MaxTokens => CoreStop::MaxTokens, + StopReason::MaxTurnRequests => CoreStop::Unknown, + StopReason::Refusal => CoreStop::Unknown, + } +} + +/// Best-effort: build and publish a `kind:44200` NIP-AM agent turn metric event. +/// +/// Does nothing when `usage` is `None` (goose emitted no usage notification +/// for this turn) or when `owner_pubkey` is unconfigured (no NIP-AO identity). +/// Errors are logged at WARN and never surface to the caller — metric +/// publishing must never fail a turn. +async fn publish_agent_turn_metric( + ctx: &PromptContext, + usage: Option, + channel_id: Option, + session_id: &str, + turn_id: &str, + stop_reason: Option, +) { + use buzz_core::agent_turn_metric::{AgentTurnMetricPayload, TokenCounts}; + use nostr::{EventBuilder, Kind, Tag}; + + let (usage, owner_pk) = match (usage, ctx.agent_owner_pubkey.as_ref()) { + (Some(u), Some(pk)) => (u, pk), + _ => return, + }; + + let turn_counts = if usage.delta_reliable { + Some(TokenCounts { + input_tokens: usage.turn_input_tokens, + output_tokens: usage.turn_output_tokens, + total_tokens: None, + cost_usd: usage.turn_cost_usd, + cache_read_tokens: None, + cache_write_tokens: None, + }) + } else { + // Defense-in-depth: UsageTracker already sets all turn_* fields to None + // when delta_reliable is false, so the None arm here is technically + // redundant. The explicit guard prevents a future refactor from + // accidentally publishing unreliable per-turn counts. + None + }; + let cumulative_counts = Some(TokenCounts { + input_tokens: Some(usage.cumulative_input_tokens), + output_tokens: Some(usage.cumulative_output_tokens), + total_tokens: None, + cost_usd: usage.cumulative_cost_usd, + cache_read_tokens: None, + cache_write_tokens: None, + }); + let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + let payload = AgentTurnMetricPayload { + harness: ctx.harness_name.clone(), + model: None, + channel_id: channel_id.map(|id| id.to_string()), + session_id: Some(usage.session_id.clone()), + turn_id: Some(turn_id.to_string()), + turn_seq: Some(usage.turn_seq), + timestamp, + turn: turn_counts, + cumulative: cumulative_counts, + delta_reliable: usage.delta_reliable, + stop_reason, + }; + let ciphertext = match buzz_core::agent_turn_metric::encrypt_agent_turn_metric( + &ctx.agent_keys, + owner_pk, + &payload, + ) { + Ok(c) => c, + Err(e) => { + tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: encrypt failed: {e}" + ); + return; + } + }; + let agent_hex = ctx.agent_keys.public_key().to_hex(); + let owner_hex = owner_pk.to_hex(); + let event = match EventBuilder::new( + Kind::Custom(buzz_core::kind::KIND_AGENT_TURN_METRIC as u16), + ciphertext, + ) + .tags([ + Tag::parse(["p", &owner_hex]).expect("p tag"), + Tag::parse(["agent", &agent_hex]).expect("agent tag"), + ]) + .sign_with_keys(&ctx.agent_keys) + { + Ok(e) => e, + Err(e) => { + tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: sign failed: {e}" + ); + return; + } + }; + const METRIC_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); + match tokio::time::timeout(METRIC_TIMEOUT, ctx.rest_client.submit_event(&event)).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: publish failed: {e}" + ), + Err(_) => tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: publish timed out" + ), + } +} + const REACTION_SEEN: &str = "👀"; const REACTION_WORKING: &str = "💬"; @@ -3657,4 +3911,210 @@ mod tests { result.agent.acp.install_steer_rx(steer_rx); // Reaching here without a panic is the test. } + + // ── NIP-AM emit-hook unit tests ──────────────────────────────────────── + + /// `acp_stop_to_core` maps all ACP stop reasons to the correct NIP-AM + /// variants without panicking on any input. + #[test] + fn test_acp_stop_to_core_maps_all_variants() { + use buzz_core::agent_turn_metric::StopReason as CoreStop; + assert_eq!(acp_stop_to_core(&StopReason::EndTurn), CoreStop::EndTurn); + assert_eq!( + acp_stop_to_core(&StopReason::Cancelled), + CoreStop::Cancelled + ); + assert_eq!( + acp_stop_to_core(&StopReason::MaxTokens), + CoreStop::MaxTokens + ); + assert_eq!( + acp_stop_to_core(&StopReason::MaxTurnRequests), + CoreStop::Unknown + ); + assert_eq!(acp_stop_to_core(&StopReason::Refusal), CoreStop::Unknown); + } + + /// `publish_agent_turn_metric` is a no-op when `usage` is `None`. + #[tokio::test] + async fn test_publish_agent_turn_metric_noop_on_no_usage() { + let ctx = make_prompt_context_no_owner(); + // usage = None → early return, no panic. + publish_agent_turn_metric( + &ctx, + None, + None, + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// `publish_agent_turn_metric` is a no-op when `owner_pubkey` is absent. + #[tokio::test] + async fn test_publish_agent_turn_metric_noop_on_no_owner() { + let ctx = make_prompt_context_no_owner(); + let usage = crate::usage::TurnUsage { + session_id: "sess-1".to_string(), + turn_seq: 1, + delta_reliable: true, + turn_input_tokens: Some(100), + turn_output_tokens: Some(50), + turn_cost_usd: None, + cumulative_input_tokens: 100, + cumulative_output_tokens: 50, + cumulative_cost_usd: None, + }; + // owner_pubkey = None → early return, no panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + None, + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// `publish_agent_turn_metric` encrypts the payload when owner is present + /// (the HTTP submit will fail in tests, but we verify no panic and the + /// encrypt/sign path executes). + #[tokio::test] + async fn test_publish_agent_turn_metric_encrypts_with_owner() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + let usage = crate::usage::TurnUsage { + session_id: "sess-1".to_string(), + turn_seq: 1, + delta_reliable: true, + turn_input_tokens: Some(200), + turn_output_tokens: Some(80), + turn_cost_usd: Some(0.001), + cumulative_input_tokens: 200, + cumulative_output_tokens: 80, + cumulative_cost_usd: Some(0.001), + }; + // Will try to publish and fail (no real relay) but must not panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// Regression for the control-cancel drain: `publish_agent_turn_metric` + /// with a `Cancelled` stop reason and pending usage executes without panic + /// (encrypt+sign path). This mirrors the control-signal arm that previously + /// returned early without draining usage. + #[tokio::test] + async fn test_publish_agent_turn_metric_cancelled_stop_reason() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + let usage = crate::usage::TurnUsage { + session_id: "sess-cancel".to_string(), + turn_seq: 2, + delta_reliable: true, + turn_input_tokens: Some(50), + turn_output_tokens: Some(20), + turn_cost_usd: None, + cumulative_input_tokens: 150, + cumulative_output_tokens: 70, + cumulative_cost_usd: None, + }; + // Must not panic; HTTP submit will fail (no real relay) — that's fine. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-cancel", + "turn-cancel", + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; + } + + /// `publish_agent_turn_metric` uses `ctx.harness_name` in the payload. + /// A buzz-agent-commanded context must not panic — verifies the harness + /// field flows through encrypt/sign without error. + #[tokio::test] + async fn test_publish_agent_turn_metric_buzz_agent_harness_name() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let mut ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + ctx.harness_name = "buzz-agent".to_string(); + let usage = crate::usage::TurnUsage { + session_id: "sess-ba".to_string(), + turn_seq: 1, + delta_reliable: false, // first turn from buzz-agent + turn_input_tokens: None, + turn_output_tokens: None, + turn_cost_usd: None, + cumulative_input_tokens: 400, + cumulative_output_tokens: 100, + cumulative_cost_usd: None, + }; + // Will try to publish (encrypt succeeds) and fail HTTP (no relay) — must not panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-ba", + "turn-ba", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + fn make_prompt_context_no_owner() -> PromptContext { + let agent_keys = nostr::Keys::generate(); + make_prompt_context_impl(&agent_keys, None) + } + + fn make_prompt_context_with_owner( + agent_keys: &nostr::Keys, + owner_pubkey: nostr::PublicKey, + ) -> PromptContext { + make_prompt_context_impl(agent_keys, Some(owner_pubkey)) + } + + fn make_prompt_context_impl( + agent_keys: &nostr::Keys, + owner_pubkey: Option, + ) -> PromptContext { + use crate::relay::RestClient; + PromptContext { + mcp_servers: vec![], + initial_message: None, + idle_timeout: Duration::from_secs(60), + max_turn_duration: Duration::from_secs(120), + turn_liveness_interval: Duration::ZERO, + dedup_mode: DedupMode::Drop, + system_prompt: None, + heartbeat_prompt: None, + base_prompt: None, + cwd: ".".to_string(), + rest_client: RestClient { + http: reqwest::Client::new(), + base_url: "http://127.0.0.1:0".to_string(), + keys: agent_keys.clone(), + auth_tag_json: None, + }, + channel_info: std::collections::HashMap::new(), + context_message_limit: 0, + max_turns_per_session: 0, + permission_mode: PermissionMode::Default, + agent_keys: agent_keys.clone(), + agent_owner_pubkey: owner_pubkey, + memory_enabled: false, + harness_name: "goose".to_string(), + } + } } diff --git a/crates/buzz-acp/src/usage.rs b/crates/buzz-acp/src/usage.rs new file mode 100644 index 000000000..f581d9120 --- /dev/null +++ b/crates/buzz-acp/src/usage.rs @@ -0,0 +1,815 @@ +//! Usage tracking for NIP-AM agent turn metrics. +//! +//! Agents that support usage reporting emit a `_goose/unstable/session/update` +//! notification (with `sessionUpdate: "usage_update"`) at the end of every +//! turn. Both goose and buzz-agent use this same wire format. The payload +//! carries session-cumulative token counts from which we derive per-turn +//! deltas. +//! +//! # Delta computation +//! +//! Because goose only reports cumulative counters, the per-turn counts are +//! computed as `current − previous`. Three cases require special handling per +//! NIP-AM: +//! +//! 1. **First turn (no prior baseline):** delta unknown → `null` counts, +//! `delta_reliable: false`. +//! 2. **Counter decrease** (harness restart, overflow): delta would be +//! negative → `null` counts, `delta_reliable: false`. +//! 3. **Session restart** (caller supplies a new `session_id` not seen +//! before): treated as case 1 — fresh baseline, no delta for this turn. +//! +//! Goose may emit **multiple** `usage_update` notifications per turn. The +//! tracker handles this correctly: the committed baseline (and `turn_seq`) +//! advance only when `take()` is called (i.e. at publish time), never on +//! individual notifications. Within a turn all notifications measure their +//! delta from the same frozen baseline — the end of the previous published +//! turn — so the final `pending` record always reflects the full +//! previous-published→current-final delta regardless of how many +//! intermediate notifications arrived. +//! +//! The `TurnUsage` produced after each turn is consumed by the +//! `TurnCompletionGuard` in `pool.rs` to publish a kind 44200 relay event. + +use std::collections::HashMap; + +/// Wire-format deserialization for `_goose/unstable/session/update` params. +/// +/// Method: `_goose/unstable/session/update` +/// Shape (camelCase on the wire): +/// ```json +/// { +/// "sessionId": "...", +/// "update": { +/// "sessionUpdate": "usage_update", +/// "used": 12345, +/// "contextLimit": 200000, +/// "accumulatedInputTokens": 10000, +/// "accumulatedOutputTokens": 2345, +/// "accumulatedCost": 0.0234 +/// } +/// } +/// ``` +/// +/// `used` and `contextLimit` are optional because buzz-agent does not track a +/// context window limit; the fields are present when goose emits them. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct GooseSessionUpdateNotification { + pub session_id: String, + pub update: GooseSessionUpdateVariant, +} + +/// Discriminated union matching goose's `GooseSessionUpdate` enum on the wire. +/// We only care about `usage_update`; other variants are ignored. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(tag = "sessionUpdate", rename_all = "snake_case")] +pub(crate) enum GooseSessionUpdateVariant { + UsageUpdate(UsageUpdatePayload), + #[serde(other)] + Other, +} + +/// The `usage_update` payload. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct UsageUpdatePayload { + /// Total tokens used (context-usage proxy). Optional — buzz-agent omits + /// this field or sends 0 because it does not track a context window limit. + #[serde(default)] + #[allow(dead_code)] + pub used: u64, + /// Context window size. Optional — buzz-agent omits this field. + #[serde(default)] + #[allow(dead_code)] + pub context_limit: u64, + pub accumulated_input_tokens: u64, + pub accumulated_output_tokens: u64, + pub accumulated_cost: Option, +} + +/// Per-session normalization state: the last cumulative snapshot we saw. +#[derive(Debug, Clone)] +struct SessionState { + /// Per-session turn counter for the LAST PUBLISHED metric (1-based). + /// Advanced only when `take()` drains a pending record — not on every + /// `record()` call. This ensures `turnSeq` counts published metrics, not + /// usage-update notifications. + published_seq: u64, + /// Cumulative input tokens at the end of the LAST PUBLISHED turn. + /// Advanced only on publish (i.e. in `take()`), not on every notification. + last_input: u64, + /// Cumulative output tokens at the end of the LAST PUBLISHED turn. + last_output: u64, + /// Cumulative cost at the end of the LAST PUBLISHED turn. + last_cost: Option, +} + +/// Per-turn usage record exposed to `TurnCompletionGuard` for NIP-AM publishing. +/// +/// `turn_*` fields are `None` when delta is unreliable (first turn or counter +/// decrease). `cumulative_*` fields are always present when the agent reports them. +#[derive(Debug, Clone)] +pub struct TurnUsage { + /// Goose session id (maps to NIP-AM `sessionId`). + pub session_id: String, + /// Per-session monotonic sequence number for this turn (maps to NIP-AM `turnSeq`). + pub turn_seq: u64, + /// Whether the `turn_*` delta fields are reliable. + pub delta_reliable: bool, + /// Per-turn input token delta; `None` when unreliable. + pub turn_input_tokens: Option, + /// Per-turn output token delta; `None` when unreliable. + pub turn_output_tokens: Option, + /// Per-turn cost delta (`current − previous`); `None` when unreliable or + /// either snapshot is missing. + pub turn_cost_usd: Option, + /// Session-cumulative input tokens as reported by goose at end of turn. + pub cumulative_input_tokens: u64, + /// Session-cumulative output tokens as reported by goose at end of turn. + pub cumulative_output_tokens: u64, + /// Session-cumulative estimated cost in USD; `None` if goose did not report it. + pub cumulative_cost_usd: Option, +} + +/// Tracks per-session cumulative usage state across turns. +/// +/// Cheap to construct. Usage lifecycle per turn: +/// +/// 1. **`begin_turn(session_id)`** — call this immediately before sending +/// `session/prompt`. Marks the tracker as in-flight for the given session +/// and clears any leftover pending record from a previous turn. Setup +/// notifications that arrive *before* the first `begin_turn` (e.g. during +/// `session/new` setup) will still update the cumulative baseline but will +/// NOT produce a publishable record. +/// 2. **`record(session_id, payload)`** — called for each +/// `_goose/unstable/session/update` notification. When in-flight, updates +/// `pending` with the latest cumulative values and a delta measured from +/// the committed baseline (end of the previous published turn). Multiple +/// notifications per turn are fine — the last one wins and `turn_seq` stays +/// constant within the turn. When not in-flight, advances the committed +/// baseline so the next turn can compute a correct delta. +/// 3. **`take()`** — called at turn completion by `TurnCompletionGuard`. +/// Drains and returns the pending record (or `None` if no usage was emitted +/// for this turn), clears the in-flight marker, and advances the committed +/// baseline so the next `record()` call measures from here. +#[derive(Debug, Default)] +pub(crate) struct UsageTracker { + /// One entry per goose `sessionId` ever seen in this process. + sessions: HashMap, + /// The session that currently has an in-flight `session/prompt`. + /// `None` means no prompt is in flight; `record()` will still update + /// the baseline but will not set `pending`. + in_flight_session: Option, + /// The most recently computed turn usage, ready for `take()`. + pending: Option, +} + +impl UsageTracker { + /// Mark the start of a new prompt turn for `session_id`. + /// + /// Clears any leftover `pending` record and records which session is + /// in-flight. Must be called before the corresponding `session/prompt` + /// request is sent so that setup notifications received before this call + /// do not become publishable for this turn. + pub(crate) fn begin_turn(&mut self, session_id: &str) { + self.in_flight_session = Some(session_id.to_string()); + self.pending = None; + } + + /// Process a `usage_update` notification payload. + /// + /// Behavior depends on which session (if any) is currently in-flight; see + /// the three explicit cases below. Only a notification for the in-flight + /// session produces a publishable `pending` record. A notification that + /// arrives outside any turn (e.g. during `session/new` setup) advances the + /// committed baseline so the next in-flight turn computes a correct delta. + /// A notification for a *different* in-flight session is ignored entirely. + /// + /// When multiple notifications arrive during the same turn, the **last one + /// wins** on the cumulative totals, and the delta is always measured from + /// the baseline at the end of the **previous published turn** — not from an + /// intermediate notification within the current turn. `turn_seq` stays + /// constant across all notifications within one turn and only increments + /// when a record is actually published (i.e. when `take()` is called). + /// + /// Three cases: + /// 1. **In-flight-match** (`in_flight_session == Some(session_id)`): updates + /// `pending`. Baseline NOT advanced (that happens on `take()`). + /// 2. **Not in-flight at all** (`in_flight_session == None`): advances the + /// committed baseline (setup notification path). + /// 3. **In-flight for another session** (`in_flight_session == Some(other)`): + /// ignored entirely — touching this session's baseline while another is + /// in-flight would undercount this session's next published delta. + pub(crate) fn record(&mut self, session_id: &str, payload: &UsageUpdatePayload) { + let current_input = payload.accumulated_input_tokens; + let current_output = payload.accumulated_output_tokens; + let current_cost = payload.accumulated_cost; + + // Determine whether this session is currently in-flight so we know + // whether to set `pending`. We compute the delta regardless so that + // setup notifications (no in-flight turn) still advance the baseline. + let is_in_flight = self.in_flight_session.as_deref() == Some(session_id); + + let (delta_reliable, turn_input, turn_output, turn_cost, turn_seq) = + match self.sessions.get(session_id) { + None => { + // First notification for this session — no baseline yet. + (false, None, None, None, 1u64) + } + Some(prev) => { + // turn_seq for this pending record is one above the last + // *published* seq — constant for all notifications in this + // turn, advanced only on publish. + let seq = prev.published_seq + 1; + // Token counter decrease → unreliable delta. + if current_input < prev.last_input || current_output < prev.last_output { + (false, None, None, None, seq) + } else { + let di = current_input - prev.last_input; + let dout = current_output - prev.last_output; + // Cost delta: only when both snapshots have cost. + // A cost *decrease* is also unreliable (NIP-AM: negative + // delta ⇒ delta_reliable false, null all turn fields). + let (dc, cost_reliable) = match (current_cost, prev.last_cost) { + (Some(c), Some(p)) if c >= p => (Some(c - p), true), + (Some(_), Some(_)) => { + // Both present but current < prev — counter decreased. + (None, false) + } + _ => (None, true), // absent on either side: null cost, reliable tokens + }; + if cost_reliable { + (true, Some(di), Some(dout), dc, seq) + } else { + // Cost decrease overrides the whole record to unreliable. + (false, None, None, None, seq) + } + } + } + }; + + if is_in_flight { + // In-flight-match: update pending with the latest cumulative values. + // Baseline is NOT advanced here — it advances only on take(). + self.pending = Some(TurnUsage { + session_id: session_id.to_string(), + turn_seq, + delta_reliable, + turn_input_tokens: turn_input, + turn_output_tokens: turn_output, + turn_cost_usd: turn_cost, + cumulative_input_tokens: current_input, + cumulative_output_tokens: current_output, + cumulative_cost_usd: current_cost, + }); + } else if self.in_flight_session.is_none() { + // Not in-flight at all: advance the committed baseline so the next + // in-flight turn computes its delta from this notification. + // This handles setup notifications that fire during `session/new` + // before the first `begin_turn`. + self.sessions.insert( + session_id.to_string(), + SessionState { + published_seq: match self.sessions.get(session_id) { + Some(s) => s.published_seq, + None => 0, + }, + last_input: current_input, + last_output: current_output, + last_cost: current_cost, + }, + ); + } + // else: in-flight-for-another-session — ignore. A late notification + // for session X while session Y is in-flight must NOT advance X's + // committed baseline; doing so would undercount X's next published delta. + } + + /// Consume and return the most recently computed turn usage record, then + /// clear the in-flight marker and advance the committed baseline. + /// + /// Returns `None` if no `usage_update` arrived during the current in-flight + /// turn (the agent did not emit usage, or no `begin_turn` was called). The + /// caller (`TurnCompletionGuard`) must handle `None`. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn take(&mut self) -> Option { + self.in_flight_session = None; + let record = self.pending.take()?; + // Advance the committed baseline to this published record so the + // *next* turn measures its delta from here. + self.sessions.insert( + record.session_id.clone(), + SessionState { + published_seq: record.turn_seq, + last_input: record.cumulative_input_tokens, + last_output: record.cumulative_output_tokens, + last_cost: record.cumulative_cost_usd, + }, + ); + Some(record) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn payload(input: u64, output: u64, cost: Option) -> UsageUpdatePayload { + UsageUpdatePayload { + used: input + output, + context_limit: 200_000, + accumulated_input_tokens: input, + accumulated_output_tokens: output, + accumulated_cost: cost, + } + } + + fn payload_no_context(input: u64, output: u64, cost: Option) -> UsageUpdatePayload { + UsageUpdatePayload { + used: 0, + context_limit: 0, + accumulated_input_tokens: input, + accumulated_output_tokens: output, + accumulated_cost: cost, + } + } + + // ── Turn scoping: setup notifications must not pollute the first real turn ─ + + #[test] + fn setup_notification_before_begin_turn_returns_none() { + // Regression: setup notifications fire during session/new (before any + // prompt). They must update the baseline but must NOT produce a + // publishable record for the next turn. + let mut tracker = UsageTracker::default(); + + // Simulate a setup notification (no begin_turn called yet). + tracker.record("sess-setup", &payload(500, 100, Some(0.005))); + // No turn is in-flight — pending must stay None. + assert!( + tracker.pending.is_none(), + "setup notification must not set pending before begin_turn" + ); + + // The zero-update turn: begin_turn, no notification during prompt, take. + tracker.begin_turn("sess-setup"); + let result = tracker.take(); + assert!( + result.is_none(), + "zero-update turn after setup must return None" + ); + + // Baseline was still updated: the next real turn gets a correct delta. + tracker.begin_turn("sess-setup"); + tracker.record("sess-setup", &payload(1200, 300, Some(0.012))); + let usage = tracker.take().expect("second turn must have usage"); + + assert!( + usage.delta_reliable, + "baseline fed by setup: delta reliable" + ); + assert_eq!(usage.turn_input_tokens, Some(700)); // 1200 - 500 + assert_eq!(usage.turn_output_tokens, Some(200)); // 300 - 100 + let dc = usage.turn_cost_usd.expect("cost delta present"); + assert!((dc - 0.007).abs() < 1e-9, "cost delta: {dc}"); + } + + #[test] + fn record_outside_in_flight_does_not_clobber_pending() { + // A notification for a different session_id while another is in-flight + // must not overwrite the pending record. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(1000, 200, None)); + + // Notification for a different session — should not touch pending. + tracker.record("sess-b", &payload(9000, 3000, None)); + + let usage = tracker.take().expect("sess-a pending must survive"); + assert_eq!(usage.session_id, "sess-a"); + } + + #[test] + fn cross_session_notification_does_not_corrupt_other_sessions_delta() { + // Regression: A publishes at 1000/100 (turn 1). A late A notification at + // 1500/150 arrives while session B is in-flight. Under the old `else` + // branch this would advance A's committed baseline to 1500/150 without + // publishing a metric, so A's next turn (2000/250) would see a delta of + // only 500/100 instead of the correct 1000/150. + // + // With the fixed three-way branch, the cross-session notification is + // ignored entirely and A's baseline stays at its last published state. + let mut tracker = UsageTracker::default(); + + // ── Turn A1 — establish A's committed baseline at 1000/100, seq=1 ── + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(1000, 100, None)); + let a1 = tracker.take().expect("A turn 1"); + assert_eq!(a1.turn_seq, 1); + assert!(!a1.delta_reliable, "first turn is unreliable"); + assert_eq!(a1.cumulative_input_tokens, 1000); + + // ── B is now in-flight; A late notification arrives ── + tracker.begin_turn("sess-b"); + // Late A notification while B is in-flight — must NOT advance A's baseline. + tracker.record("sess-a", &payload(1500, 150, None)); + // B gets its own notification and completes. + tracker.record("sess-b", &payload(200, 50, None)); + let b1 = tracker.take().expect("B turn 1"); + assert_eq!(b1.session_id, "sess-b"); + + // ── Turn A2 — delta must be measured from A's last PUBLISHED baseline ── + // If the cross-session fix is correct: committed A baseline = 1000/100 + // (from take() after A turn 1), so delta = 2000-1000 = 1000 / 250-100 = 150. + // If broken (old code): committed A baseline = 1500/150 (wrongly advanced), + // so delta = 500/100 — the undercount Eva+Wren and Thufir both flagged. + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(2000, 250, None)); + let a2 = tracker.take().expect("A turn 2"); + + assert_eq!(a2.session_id, "sess-a"); + assert_eq!( + a2.turn_seq, 2, + "seq must increment per publish, not per notification" + ); + assert!(a2.delta_reliable, "A turn 2 must have a reliable delta"); + assert_eq!( + a2.turn_input_tokens, + Some(1000), + "A turn 2 delta must be from A's last published baseline (1000), not the \ + late cross-session advance (500)" + ); + assert_eq!(a2.turn_output_tokens, Some(150)); + assert_eq!(a2.cumulative_input_tokens, 2000); + assert_eq!(a2.cumulative_output_tokens, 250); + } + + // ── Delta computation: non-happy paths ───────────────────────────────── + + #[test] + fn first_turn_no_prior_delta_unreliable() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-1"); + tracker.record("sess-1", &payload(1000, 200, Some(0.01))); + let usage = tracker.take().expect("should have pending usage"); + + assert_eq!(usage.session_id, "sess-1"); + assert_eq!(usage.turn_seq, 1); + assert!( + !usage.delta_reliable, + "first turn: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none()); + assert!(usage.turn_output_tokens.is_none()); + assert!(usage.turn_cost_usd.is_none()); + // Cumulative is still populated. + assert_eq!(usage.cumulative_input_tokens, 1000); + assert_eq!(usage.cumulative_output_tokens, 200); + assert_eq!(usage.cumulative_cost_usd, Some(0.01)); + } + + #[test] + fn counter_decrease_delta_unreliable_no_negatives() { + let mut tracker = UsageTracker::default(); + // Turn 1 — establish baseline. + tracker.begin_turn("sess-2"); + tracker.record("sess-2", &payload(5000, 1000, Some(0.05))); + let _ = tracker.take(); + + // Turn 2 — counter decreased (harness restart simulation). + tracker.begin_turn("sess-2"); + tracker.record("sess-2", &payload(100, 50, Some(0.001))); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.turn_seq, 2); + assert!( + !usage.delta_reliable, + "counter decrease: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none(), "no negative delta"); + assert!(usage.turn_output_tokens.is_none(), "no negative delta"); + assert!(usage.turn_cost_usd.is_none()); + } + + #[test] + fn cost_decrease_sets_delta_unreliable_and_nulls_all_turn_fields() { + // Regression for Thufir fix 2: cost counter decrease must set + // delta_reliable = false and null all turn fields (not just cost). + // turn_seq still increments (NIP-AM: seq advances even on unreliable). + let mut tracker = UsageTracker::default(); + // Turn 1 — establish baseline with cost. + tracker.begin_turn("sess-cost"); + tracker.record("sess-cost", &payload(1000, 200, Some(0.10))); + let t1 = tracker.take().expect("t1"); + assert_eq!(t1.turn_seq, 1); + + // Turn 2 — tokens monotone, but cost decreased. + tracker.begin_turn("sess-cost"); + tracker.record("sess-cost", &payload(1500, 350, Some(0.05))); + let usage = tracker.take().expect("t2"); + + assert_eq!(usage.turn_seq, 2, "turn_seq must still increment"); + assert!( + !usage.delta_reliable, + "cost decrease: delta must be unreliable" + ); + assert!( + usage.turn_input_tokens.is_none(), + "all turn fields null on unreliable" + ); + assert!(usage.turn_output_tokens.is_none()); + assert!(usage.turn_cost_usd.is_none()); + // Cumulative values are unaffected. + assert_eq!(usage.cumulative_input_tokens, 1500); + assert_eq!(usage.cumulative_output_tokens, 350); + assert_eq!(usage.cumulative_cost_usd, Some(0.05)); + } + + #[test] + fn cost_absent_on_one_side_leaves_tokens_reliable() { + // Cost merely absent on either side: null cost, reliable tokens. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-nocost"); + tracker.record("sess-nocost", &payload(1000, 200, Some(0.01))); + let _ = tracker.take(); + + // Turn 2 — no cost reported this time. + tracker.begin_turn("sess-nocost"); + tracker.record("sess-nocost", &payload(1800, 450, None)); + let usage = tracker.take().expect("pending"); + + assert!( + usage.delta_reliable, + "absent cost must not make delta unreliable" + ); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + assert!( + usage.turn_cost_usd.is_none(), + "cost null when absent on either side" + ); + } + + #[test] + fn session_restart_new_session_id_treated_as_first_turn() { + let mut tracker = UsageTracker::default(); + // Original session. + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(8000, 2000, None)); + let _ = tracker.take(); + + // New session_id — restart. Must behave like a first turn. + tracker.begin_turn("sess-b"); + tracker.record("sess-b", &payload(500, 100, None)); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.session_id, "sess-b"); + assert_eq!(usage.turn_seq, 1); + assert!( + !usage.delta_reliable, + "new session: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none()); + } + + // ── Happy path ───────────────────────────────────────────────────────── + + #[test] + fn second_turn_delta_computed_correctly() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-3"); + tracker.record("sess-3", &payload(1000, 200, Some(0.01))); + let _ = tracker.take(); + + tracker.begin_turn("sess-3"); + tracker.record("sess-3", &payload(1800, 450, Some(0.018))); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.turn_seq, 2); + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + // cost delta: 0.018 - 0.01 = 0.008 (floating-point; use approx check) + let dc = usage.turn_cost_usd.expect("cost delta present"); + assert!((dc - 0.008).abs() < 1e-9, "cost delta: {dc}"); + assert_eq!(usage.cumulative_input_tokens, 1800); + assert_eq!(usage.cumulative_output_tokens, 450); + } + + #[test] + fn take_returns_none_after_drain() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-4"); + tracker.record("sess-4", &payload(100, 20, None)); + let _ = tracker.take(); + assert!(tracker.take().is_none(), "take after drain must be None"); + } + + #[test] + fn last_update_wins_multiple_updates_same_turn() { + // Goose emits multiple usage_update notifications per turn. The tracker + // must: + // (a) use the LAST notification's cumulative values, + // (b) measure the delta from the baseline at the END OF THE PREVIOUS + // PUBLISHED TURN (not from intermediate notifications), and + // (c) keep turn_seq constant across all notifications within the turn + // (incrementing only on publish, not on each notification). + let mut tracker = UsageTracker::default(); + // Turn 1 — establish baseline. After take(), committed baseline = 1000/100. + tracker.begin_turn("sess-5"); + tracker.record("sess-5", &payload(1000, 100, None)); + let t1 = tracker.take().expect("turn 1"); + assert_eq!(t1.turn_seq, 1); + + // Turn 2 — two notifications arrive before take(). The second overwrites + // the first in pending; delta is measured from the committed baseline + // (1000/100), not from the intermediate snapshot (1500/150). + tracker.begin_turn("sess-5"); + tracker.record("sess-5", &payload(1500, 150, None)); + tracker.record("sess-5", &payload(2000, 250, None)); + let usage = tracker.take().expect("turn 2"); + + // Cumulative from the last notification. + assert_eq!(usage.cumulative_input_tokens, 2000); + assert_eq!(usage.cumulative_output_tokens, 250); + // Delta is from committed baseline (1000, 100) → (2000, 250) = 1000/150. + assert_eq!(usage.turn_input_tokens, Some(1000)); + assert_eq!(usage.turn_output_tokens, Some(150)); + // seq increments once per publish, not once per notification. + assert_eq!(usage.turn_seq, 2); + + // Turn 3 — prove seq continues to increment per publish, not per notification. + tracker.begin_turn("sess-5"); + tracker.record("sess-5", &payload(2300, 290, None)); + let t3 = tracker.take().expect("turn 3"); + assert_eq!(t3.turn_seq, 3); + // Delta from turn-2 committed baseline (2000, 250). + assert_eq!(t3.turn_input_tokens, Some(300)); + assert_eq!(t3.turn_output_tokens, Some(40)); + } + + // ── Wire deserialization ──────────────────────────────────────────────── + + #[test] + fn notification_deserializes_from_wire_json() { + let raw = serde_json::json!({ + "sessionId": "abc-123", + "update": { + "sessionUpdate": "usage_update", + "used": 50000, + "contextLimit": 200000, + "accumulatedInputTokens": 40000, + "accumulatedOutputTokens": 10000, + "accumulatedCost": 0.42 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + assert_eq!(notif.session_id, "abc-123"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert_eq!(p.accumulated_input_tokens, 40000); + assert_eq!(p.accumulated_output_tokens, 10000); + assert_eq!(p.accumulated_cost, Some(0.42)); + } + GooseSessionUpdateVariant::Other => panic!("expected UsageUpdate"), + } + } + + #[test] + fn notification_deserializes_without_used_and_context_limit() { + // buzz-agent emits usage_update without used/contextLimit. + let raw = serde_json::json!({ + "sessionId": "buzz-sess", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 500, + "accumulatedOutputTokens": 100 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert_eq!(p.accumulated_input_tokens, 500); + assert_eq!(p.accumulated_output_tokens, 100); + assert_eq!(p.used, 0); + assert_eq!(p.context_limit, 0); + assert!(p.accumulated_cost.is_none()); + } + GooseSessionUpdateVariant::Other => panic!("expected UsageUpdate"), + } + } + + #[test] + fn other_variant_deserializes_without_error() { + let raw = serde_json::json!({ + "sessionId": "xyz", + "update": { + "sessionUpdate": "status_message", + "status": { "type": "notice", "message": "hi" } + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + assert!(matches!(notif.update, GooseSessionUpdateVariant::Other)); + } + + #[test] + fn missing_accumulated_cost_is_none() { + let raw = serde_json::json!({ + "sessionId": "s", + "update": { + "sessionUpdate": "usage_update", + "used": 100, + "contextLimit": 200000, + "accumulatedInputTokens": 80, + "accumulatedOutputTokens": 20 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert!(p.accumulated_cost.is_none()); + } + _ => panic!("expected UsageUpdate"), + } + } + + #[test] + fn buzz_agent_notification_flows_through_tracker() { + // End-to-end: a buzz-agent-shaped usage_update (no used/contextLimit) + // deserializes and flows through UsageTracker to produce correct TurnUsage. + let raw1 = serde_json::json!({ + "sessionId": "buzz-s1", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 300, + "accumulatedOutputTokens": 80 + } + }); + let raw2 = serde_json::json!({ + "sessionId": "buzz-s1", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 700, + "accumulatedOutputTokens": 150 + } + }); + + let mut tracker = UsageTracker::default(); + + // Turn 1 — first turn, delta unreliable. + tracker.begin_turn("buzz-s1"); + let notif1: GooseSessionUpdateNotification = serde_json::from_value(raw1).expect("deser"); + if let GooseSessionUpdateVariant::UsageUpdate(p) = notif1.update { + tracker.record("buzz-s1", &p); + } + let t1 = tracker.take().expect("turn 1"); + assert!(!t1.delta_reliable, "first turn: unreliable"); + assert_eq!(t1.cumulative_input_tokens, 300); + + // Turn 2 — delta reliable. + tracker.begin_turn("buzz-s1"); + let notif2: GooseSessionUpdateNotification = serde_json::from_value(raw2).expect("deser"); + if let GooseSessionUpdateVariant::UsageUpdate(p) = notif2.update { + tracker.record("buzz-s1", &p); + } + let t2 = tracker.take().expect("turn 2"); + assert!(t2.delta_reliable, "second turn: reliable"); + assert_eq!(t2.turn_input_tokens, Some(400)); // 700 - 300 + assert_eq!(t2.turn_output_tokens, Some(70)); // 150 - 80 + } + + #[test] + fn buzz_agent_payload_no_context_fields_processes_correctly() { + // UsageTracker handles payloads with used=0 / context_limit=0 correctly. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("s"); + tracker.record("s", &payload_no_context(1000, 200, None)); + let _ = tracker.take(); + + tracker.begin_turn("s"); + tracker.record("s", &payload_no_context(1500, 300, None)); + let usage = tracker.take().expect("pending"); + + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(500)); + assert_eq!(usage.turn_output_tokens, Some(100)); + } + + #[test] + fn begin_turn_then_take_without_record_returns_none() { + // A turn cancelled before the provider emits any tokens: begin_turn is + // called but no record() arrives before take(). take() must return None. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-precancel"); + let result = tracker.take(); + assert!( + result.is_none(), + "take() without any record() must return None (pre-response cancel path)" + ); + } +} diff --git a/crates/buzz-agent/Cargo.toml b/crates/buzz-agent/Cargo.toml index 7889ad34a..720f0785a 100644 --- a/crates/buzz-agent/Cargo.toml +++ b/crates/buzz-agent/Cargo.toml @@ -43,6 +43,9 @@ hex = { workspace = true } sha2 = { workspace = true } urlencoding = "2" webbrowser = "1" +nostr = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { version = "0.31", default-features = false, features = ["signal", "process"] } diff --git a/crates/buzz-agent/src/agent.rs b/crates/buzz-agent/src/agent.rs index 57e7b87b3..730e87b2e 100644 --- a/crates/buzz-agent/src/agent.rs +++ b/crates/buzz-agent/src/agent.rs @@ -54,6 +54,12 @@ pub struct RunCtx<'a> { /// which the exact-but-stale token count would otherwise miss. Cleared and /// preserved in lockstep with `last_request_input_tokens`. pub last_request_history_bytes: &'a mut Option, + /// Accumulated input tokens across all LLM rounds in this turn, for + /// NIP-AM metric publishing. Reset to `None` at turn start in `run()`. + pub turn_input_tokens: &'a mut Option, + /// Accumulated output tokens across all LLM rounds in this turn, for + /// NIP-AM metric publishing. Reset to `None` at turn start in `run()`. + pub turn_output_tokens: &'a mut Option, } impl RunCtx<'_> { @@ -69,6 +75,10 @@ impl RunCtx<'_> { } self.history.push(HistoryItem::User(user_text)); + // Reset per-turn token accumulators for this prompt. + *self.turn_input_tokens = None; + *self.turn_output_tokens = None; + let mut round = 0u32; // Per-prompt `_Stop` objection count. Bounded per prompt (not per // session) so a stubborn exchange can't permanently disable the stop @@ -156,6 +166,14 @@ impl RunCtx<'_> { .map(HistoryItem::context_pressure_bytes) .sum(), ); + // Accumulate per-turn input tokens for NIP-AM metric publishing. + *self.turn_input_tokens = + Some(self.turn_input_tokens.unwrap_or(0).saturating_add(tokens)); + } + // Accumulate per-turn output tokens for NIP-AM metric publishing. + if let Some(out) = response.output_tokens { + *self.turn_output_tokens = + Some(self.turn_output_tokens.unwrap_or(0).saturating_add(out)); } if !response.reasoning.is_empty() { diff --git a/crates/buzz-agent/src/lib.rs b/crates/buzz-agent/src/lib.rs index 8de6a16e1..58d6bdfcc 100644 --- a/crates/buzz-agent/src/lib.rs +++ b/crates/buzz-agent/src/lib.rs @@ -30,9 +30,9 @@ use crate::llm::Llm; use crate::mcp::McpRegistry; use crate::types::{ContentBlock, HistoryItem}; use crate::wire::{ - classify, Inbound, InitializeParams, SessionCancelParams, SessionNewParams, - SessionPromptParams, SessionSetModelParams, SessionSteerParams, WireMsg, WireSender, - INVALID_PARAMS, METHOD_NOT_FOUND, PARSE_ERROR, + classify, goose_session_update, Inbound, InitializeParams, SessionCancelParams, + SessionNewParams, SessionPromptParams, SessionSetModelParams, SessionSteerParams, WireMsg, + WireSender, INVALID_PARAMS, METHOD_NOT_FOUND, PARSE_ERROR, }; struct App { @@ -76,6 +76,12 @@ struct Session { /// with it so the gate can account for history appended since. last_request_history_bytes: Option, effective_system_prompt: Arc, + /// Session-cumulative input tokens across all turns. Sent in the + /// `_goose/unstable/session/update` usage notification so buzz-acp's + /// `UsageTracker` can compute per-turn deltas symmetrically with goose. + accumulated_input_tokens: u64, + /// Session-cumulative output tokens across all turns. + accumulated_output_tokens: u64, /// Per-session model override set by `session/set_model`. When `Some`, /// overrides `App::cfg.model` for all LLM calls on this session. Persists /// across `session/prompt` calls until changed. @@ -403,6 +409,8 @@ async fn session_new(app: &Arc, id: Value, params: Value, wire_tx: &WireSen last_request_input_tokens: None, last_request_history_bytes: None, effective_system_prompt, + accumulated_input_tokens: 0, + accumulated_output_tokens: 0, effective_model: None, }, ); @@ -644,6 +652,8 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender ), ) .await; + let mut turn_input_tokens: Option = None; + let mut turn_output_tokens: Option = None; // Resolve effective model: session override wins over config default. let effective_model_str = effective_model_override .as_deref() @@ -664,6 +674,8 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender handoff_count: &mut handoff_count, last_request_input_tokens: &mut last_request_input_tokens, last_request_history_bytes: &mut last_request_history_bytes, + turn_input_tokens: &mut turn_input_tokens, + turn_output_tokens: &mut turn_output_tokens, }; let result = ctx.run(p.prompt).await; if let Some(s) = app.sessions.lock().await.get_mut(&sid) { @@ -677,6 +689,51 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender s.last_request_input_tokens = last_request_input_tokens; s.last_request_history_bytes = last_request_history_bytes; } + // Update session-cumulative token counters and emit the usage notification + // BEFORE sending the session/prompt response. buzz-acp's UsageTracker + // processes the notification while the turn is still in-flight (i.e. before + // the response triggers take_turn_usage()), which is required for the + // begin_turn gate to recognise it as publishable. + // + // Only emit when at least one token count was observed — a turn with no + // provider response (validation failure, pre-response cancellation) carries + // no information and must not produce a kind 44200 record per NIP-AM. + if turn_input_tokens.is_some() || turn_output_tokens.is_some() { + let accumulated = { + let mut sessions = app.sessions.lock().await; + if let Some(s) = sessions.get_mut(&sid) { + s.accumulated_input_tokens = s + .accumulated_input_tokens + .saturating_add(turn_input_tokens.unwrap_or(0)); + s.accumulated_output_tokens = s + .accumulated_output_tokens + .saturating_add(turn_output_tokens.unwrap_or(0)); + Some((s.accumulated_input_tokens, s.accumulated_output_tokens)) + } else { + // Session is gone — the accumulated baseline no longer exists, so + // there is nothing correct to emit. Skip the usage notification. + None + } + }; + if let Some((accumulated_in, accumulated_out)) = accumulated { + wire::send( + &wire_tx, + goose_session_update( + &sid, + json!({ + "sessionUpdate": "usage_update", + // used: total tokens as a context-usage proxy; + // contextLimit: 0 (buzz-agent has no context limit tracking). + "used": accumulated_in.saturating_add(accumulated_out), + "contextLimit": 0u64, + "accumulatedInputTokens": accumulated_in, + "accumulatedOutputTokens": accumulated_out, + }), + ), + ) + .await; + } + } match result { Ok(stop) => { wire::send( diff --git a/crates/buzz-agent/src/llm.rs b/crates/buzz-agent/src/llm.rs index 90919619f..76b39e8fe 100644 --- a/crates/buzz-agent/src/llm.rs +++ b/crates/buzz-agent/src/llm.rs @@ -778,11 +778,13 @@ fn parse_responses(v: Value) -> Result { _ => ProviderStop::Other, }; let input_tokens = sum_usage(&v, &["input_tokens"]); + let output_tokens = sum_usage(&v, &["output_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -881,11 +883,13 @@ fn parse_anthropic(v: Value) -> Result { } } let input_tokens = anthropic_input_tokens(&v); + let output_tokens = sum_usage(&v, &["output_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -930,11 +934,13 @@ fn parse_openai(v: Value) -> Result { } } let input_tokens = openai_chat_input_tokens(&v); + let output_tokens = sum_usage(&v, &["completion_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -2466,4 +2472,75 @@ mod tests { let src = StaticTokenSource::new("static-key"); assert_eq!(src.refresh_now("rejected").await.unwrap(), "static-key"); } + + // ── Output-token parsing tests ────────────────────────────────────────── + + /// `parse_anthropic` extracts `output_tokens` from the usage object. + #[test] + fn parse_anthropic_output_tokens() { + let v = serde_json::json!({ + "stop_reason": "end_turn", + "content": [{"type": "text", "text": "hi"}], + "usage": {"input_tokens": 42, "output_tokens": 7} + }); + assert_eq!(parse_anthropic(v).unwrap().output_tokens, Some(7)); + } + + /// `parse_anthropic` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_anthropic_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "stop_reason": "end_turn", + "content": [{"type": "text", "text": "hi"}] + }); + assert_eq!(parse_anthropic(v).unwrap().output_tokens, None); + } + + /// `parse_openai` maps `completion_tokens` to `output_tokens`. + #[test] + fn parse_openai_output_tokens_from_completion_tokens() { + let v = serde_json::json!({ + "choices": [{"finish_reason": "stop", "message": {"content": "hi"}}], + "usage": {"prompt_tokens": 123, "completion_tokens": 4, "total_tokens": 127} + }); + assert_eq!(parse_openai(v).unwrap().output_tokens, Some(4)); + } + + /// `parse_openai` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_openai_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "choices": [{"finish_reason": "stop", "message": {"content": "hi"}}] + }); + assert_eq!(parse_openai(v).unwrap().output_tokens, None); + } + + /// `parse_responses` extracts `output_tokens` from the usage object. + #[test] + fn parse_responses_output_tokens() { + let v = serde_json::json!({ + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hi"}] + }], + "usage": {"input_tokens": 321, "output_tokens": 9, "total_tokens": 330} + }); + assert_eq!(parse_responses(v).unwrap().output_tokens, Some(9)); + } + + /// `parse_responses` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_responses_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hi"}] + }] + }); + assert_eq!(parse_responses(v).unwrap().output_tokens, None); + } } diff --git a/crates/buzz-agent/src/types.rs b/crates/buzz-agent/src/types.rs index a8acb52a6..ef006b70a 100644 --- a/crates/buzz-agent/src/types.rs +++ b/crates/buzz-agent/src/types.rs @@ -139,6 +139,10 @@ pub struct LlmResponse { /// tokens, so reading it alone would undercount). Used to gate handoff on /// the real token budget rather than a byte estimate. pub input_tokens: Option, + /// Output tokens the provider reported for this request, or `None` if the + /// response carried no usage. Used to accumulate per-turn output counts + /// for NIP-AM metric publishing. + pub output_tokens: Option, /// Reasoning/thinking content emitted by the model before its answer, if /// any. Non-empty when the provider returns extended-thinking tokens: /// diff --git a/crates/buzz-agent/src/wire.rs b/crates/buzz-agent/src/wire.rs index 78130ccb6..7b50e7982 100644 --- a/crates/buzz-agent/src/wire.rs +++ b/crates/buzz-agent/src/wire.rs @@ -136,6 +136,18 @@ pub fn session_update(sid: &str, update: Value) -> Value { }) } +/// A `_goose/unstable/session/update` notification — the separate top-level +/// method goose uses for custom usage and status events. Used by buzz-agent +/// to emit the `usage_update` payload so buzz-acp's `UsageTracker` can treat +/// buzz-agent and goose symmetrically. +pub fn goose_session_update(sid: &str, update: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { "sessionId": sid, "update": update }, + }) +} + /// A `session/update` notification carrying a `update._meta.goose.` field. /// Used to advertise `activeRunId` (so steer-capable clients can target the /// in-flight run) and `queuedSteer` (so they can correlate an accepted steer diff --git a/crates/buzz-agent/tests/fake_llm.rs b/crates/buzz-agent/tests/fake_llm.rs index a1791e7d6..f2a86ac4b 100644 --- a/crates/buzz-agent/tests/fake_llm.rs +++ b/crates/buzz-agent/tests/fake_llm.rs @@ -750,6 +750,344 @@ async fn steer_rejected_on_run_id_mismatch() { h.shutdown().await; } +// ─── Usage notification (_goose/unstable/session/update usage_update) ─────── + +/// An OpenAI chat completion response with a `usage` block (prompt_tokens + +/// completion_tokens). buzz-agent maps these to `accumulatedInputTokens` / +/// `accumulatedOutputTokens` in the `_goose/unstable/session/update` notification. +fn openai_text_with_usage(content: &str, input_tokens: u64, output_tokens: u64) -> Value { + json!({ + "id": "cc-u", "object": "chat.completion", "model": "fake-model", + "choices": [{ + "index": 0, + "message": { "role": "assistant", "content": content }, + "finish_reason": "stop", + }], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + }) +} + +/// Returns true when `v` is a `_goose/unstable/session/update` usage_update +/// notification. +fn is_usage_update(v: &Value) -> bool { + v.get("method") == Some(&json!("_goose/unstable/session/update")) + && v["params"]["update"]["sessionUpdate"] == "usage_update" +} + +/// Collect every frame that arrives BEFORE the message matching `until_pred`, +/// then return (frames_before, matching_frame). +async fn recv_until_with_drain(h: &mut Harness, mut until_pred: F) -> (Vec, Value) +where + F: FnMut(&Value) -> bool, +{ + let mut before = Vec::new(); + loop { + let v = h.recv().await; + if until_pred(&v) { + return (before, v); + } + before.push(v); + } +} + +/// buzz-agent must emit `_goose/unstable/session/update` with `sessionUpdate: +/// "usage_update"` **before** the `session/prompt` response on each turn, and +/// must accumulate counters across turns (turn 2 reports turn1+turn2 sums). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn usage_notification_emitted_before_prompt_response() { + let url = spawn_fake_llm(vec![ + openai_text_with_usage("turn one reply", 10, 5), + openai_text_with_usage("turn two reply", 20, 8), + ]) + .await; + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + // ── Turn 1 ────────────────────────────────────────────────────────────── + let p1 = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"turn 1"}]}), + ) + .await; + + let (frames_before_t1, response_t1) = recv_until_with_drain(&mut h, |v| v["id"] == p1).await; + assert_eq!( + response_t1["result"]["stopReason"], "end_turn", + "turn 1 must complete with end_turn" + ); + + // A usage_update notification must appear in the frames before the response. + let usage_t1 = frames_before_t1 + .iter() + .find(|v| is_usage_update(v)) + .unwrap_or_else(|| { + panic!( + "expected _goose/unstable/session/update usage_update before turn-1 response; frames: {frames_before_t1:#?}" + ) + }); + assert_eq!( + usage_t1["params"]["update"]["sessionUpdate"], "usage_update", + "sessionUpdate field must be 'usage_update'" + ); + assert_eq!( + usage_t1["params"]["update"]["accumulatedInputTokens"], + json!(10u64), + "turn 1 accumulated input tokens" + ); + assert_eq!( + usage_t1["params"]["update"]["accumulatedOutputTokens"], + json!(5u64), + "turn 1 accumulated output tokens" + ); + + // ── Turn 2 ────────────────────────────────────────────────────────────── + let p2 = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"turn 2"}]}), + ) + .await; + + let (frames_before_t2, response_t2) = recv_until_with_drain(&mut h, |v| v["id"] == p2).await; + assert_eq!( + response_t2["result"]["stopReason"], "end_turn", + "turn 2 must complete with end_turn" + ); + + // Notification arrives before the response, with cumulative sums (10+20, 5+8). + let usage_t2 = frames_before_t2 + .iter() + .find(|v| is_usage_update(v)) + .unwrap_or_else(|| { + panic!( + "expected _goose/unstable/session/update usage_update before turn-2 response; frames: {frames_before_t2:#?}" + ) + }); + assert_eq!( + usage_t2["params"]["update"]["accumulatedInputTokens"], + json!(30u64), + "turn 2 accumulated input tokens must be 10+20=30" + ); + assert_eq!( + usage_t2["params"]["update"]["accumulatedOutputTokens"], + json!(13u64), + "turn 2 accumulated output tokens must be 5+8=13" + ); + + h.shutdown().await; +} + +/// When the provider returns a response with no `usage` block, buzz-agent must +/// NOT emit a `_goose/unstable/session/update` notification for that turn. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn no_usage_turn_emits_no_usage_notification() { + let url = spawn_fake_llm(vec![openai_text("no usage here")]).await; + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + let p_id = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"go"}]}), + ) + .await; + + let (frames_before, response) = recv_until_with_drain(&mut h, |v| v["id"] == p_id).await; + assert_eq!( + response["result"]["stopReason"], "end_turn", + "turn must complete with end_turn" + ); + + // No usage notification must appear in the frames before the response. + let found = frames_before.iter().any(is_usage_update); + assert!( + !found, + "expected NO usage_update notification when provider reports no usage; frames: {frames_before:#?}" + ); + + h.shutdown().await; +} + +/// When a turn is cancelled AFTER the provider has already returned a response +/// (so token counts are observed), buzz-agent must still emit the usage +/// notification before the cancelled `session/prompt` response. +/// +/// Setup: round 1 is a tool call WITH usage (tokens are captured). After the +/// tool_call_update notification (proving round 1 is fully processed), we gate +/// the round-2 LLM response behind a `oneshot` barrier that only releases after +/// cancel is sent. This guarantees the turn exits with `stopReason: "cancelled"` +/// deterministically, even on a slow CI worker. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn cancelled_turn_with_usage_emits_notification_before_response() { + use tokio::sync::oneshot; + + // Gate: the second LLM request (round 2) is held until we explicitly release it. + let (gate_tx, gate_rx) = oneshot::channel::<()>(); + let gate_rx = Arc::new(tokio::sync::Mutex::new(Some(gate_rx))); + + // Round 1: tool call with usage — sets turn_input/output_tokens. + // Round 2: gated — blocked until cancel fires, then released so the + // in-flight TCP request can resolve. The queue is empty for round 2, so the + // agent receives the fallback "no canned response" body which it treats as + // an LLM error; the cancel check at the round boundary fires first because + // the gate is only released after cancel is enqueued. + let responses = vec![openai_tool_call_with_usage( + "call_cancel_test", + "fake__noop", + json!({}), + 15, + 6, + )]; + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let url = format!("http://{}", listener.local_addr().unwrap()); + let queue = Arc::new(Mutex::new(VecDeque::from(responses))); + let gate_rx_clone = gate_rx.clone(); + tokio::spawn(async move { + let mut request_num = 0usize; + loop { + let (mut sock, _) = match listener.accept().await { + Ok(p) => p, + Err(_) => return, + }; + let queue = queue.clone(); + let gate = gate_rx_clone.clone(); + request_num += 1; + let req_num = request_num; + tokio::spawn(async move { + let mut buf = Vec::new(); + let mut tmp = [0u8; 4096]; + while !buf.windows(4).any(|w| w == b"\r\n\r\n") { + match sock.read(&mut tmp).await { + Ok(0) | Err(_) => return, + Ok(n) => buf.extend_from_slice(&tmp[..n]), + } + if buf.len() > 1_000_000 { + return; + } + } + // For request 2+ (round 2), wait for the gate to open before + // responding. This ensures cancel is sent before round 2 resolves, + // making stopReason: cancelled deterministic. + if req_num >= 2 { + let rx = gate.lock().await.take(); + if let Some(rx) = rx { + let _ = rx.await; + } + } + let body = queue + .lock() + .await + .pop_front() + .unwrap_or_else(|| json!({ "error": "no canned response" })); + let body_s = serde_json::to_string(&body).unwrap(); + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body_s.len(), body_s, + ); + let _ = sock.write_all(resp.as_bytes()).await; + let _ = sock.shutdown().await; + }); + } + }); + + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + let p_id = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"start work"}]}), + ) + .await; + + // Wait for the activeRunId advert (agent is live). + let _run_id = recv_active_run_id(&mut h).await; + // Wait for tool_call_update — proves round 1 LLM response is fully processed + // and tokens are captured before we send cancel. + h.recv_until(|v| { + v.get("method") == Some(&json!("session/update")) + && v["params"]["update"]["sessionUpdate"] == "tool_call_update" + }) + .await; + + // Now send cancel and release the round-2 gate. Cancel is enqueued before + // round 2 can respond, so the turn exits with stopReason: cancelled. + let c_id = h.send("session/cancel", json!({"sessionId": sid})).await; + let _ = gate_tx.send(()); // unblock round 2 + + let mut saw_usage_before_prompt_response = false; + let mut saw_usage = false; + let mut saw_cancel_ok = false; + let mut saw_prompt_response = false; + for _ in 0..40 { + let v = h.recv().await; + if v["id"] == json!(c_id) { + saw_cancel_ok = true; + } else if is_usage_update(&v) { + saw_usage = true; + if !saw_prompt_response { + saw_usage_before_prompt_response = true; + } + } else if v["id"] == json!(p_id) { + saw_prompt_response = true; + // The gate guarantees stopReason: cancelled — not a race-driven error. + assert_eq!( + v["result"]["stopReason"], "cancelled", + "turn must end with stopReason: cancelled" + ); + } + if saw_usage && saw_prompt_response && saw_cancel_ok { + break; + } + } + assert!(saw_cancel_ok, "session/cancel was not acknowledged"); + assert!( + saw_usage, + "expected usage_update notification for cancelled turn with observed tokens" + ); + assert!( + saw_usage_before_prompt_response, + "usage_update must arrive before the session/prompt response" + ); + + h.shutdown().await; +} + +/// A tool-call OpenAI response with a `usage` block. Used to capture tokens in +/// round 1 before a cancel fires at the round boundary. +fn openai_tool_call_with_usage( + id: &str, + name: &str, + args: Value, + input_tokens: u64, + output_tokens: u64, +) -> Value { + json!({ + "id": "cc-u2", "object": "chat.completion", "model": "fake-model", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", "content": null, + "tool_calls": [{ + "id": id, "type": "function", + "function": { "name": name, "arguments": args.to_string() }, + }], + }, + "finish_reason": "tool_calls", + }], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + }) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn steer_rejected_on_empty_prompt() { let (url, _captures) = spawn_capturing_fake_llm(vec![ diff --git a/crates/buzz-core/src/agent_turn_metric.rs b/crates/buzz-core/src/agent_turn_metric.rs new file mode 100644 index 000000000..037f54f5a --- /dev/null +++ b/crates/buzz-core/src/agent_turn_metric.rs @@ -0,0 +1,508 @@ +//! NIP-AM: Agent Turn Metric — payload type and encrypt/decrypt helpers. +//! +//! One `kind:44200` event is published per completed agent turn. Its content +//! is a NIP-44 v2 ciphertext (agent key → owner pubkey) that decodes to an +//! [`AgentTurnMetricPayload`] JSON object. +//! +//! See `docs/nips/NIP-AM.md` for the full specification. + +use nostr::{Event, Keys, PublicKey}; +use serde::{Deserialize, Serialize}; + +use crate::observer::{decrypt_observer_payload, encrypt_observer_payload, ObserverPayloadError}; + +// Re-export for callers that only need the error type. +pub use crate::observer::ObserverPayloadError as AgentTurnMetricError; + +/// Token-usage counters for a single measurement window (one turn or cumulative). +/// +/// All token fields are nullable — `None` means the harness did not report them, +/// NOT that the count was zero. See NIP-AM §Numeric validity and token semantics. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TokenCounts { + /// Input tokens (inclusive of cache reads/writes where applicable). + pub input_tokens: Option, + + /// Output tokens. + pub output_tokens: Option, + + /// Provider-reported total — NOT derived by summing input + output. + /// `None` when the provider did not report a total. + pub total_tokens: Option, + + /// Estimated cost in USD. Must be finite and non-negative when present. + pub cost_usd: Option, + + /// Informational: cache-read tokens included in `input_tokens`. + #[serde(skip_serializing_if = "Option::is_none")] + pub cache_read_tokens: Option, + + /// Informational: cache-write tokens included in `input_tokens`. + #[serde(skip_serializing_if = "Option::is_none")] + pub cache_write_tokens: Option, +} + +/// Why a turn ended. +/// +/// NIP-AM: consumers MUST treat unrecognized `stopReason` values as `Unknown` +/// and keep the token counts valid. Custom deserialization maps any unrecognized +/// string to `Unknown` instead of failing the whole payload. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StopReason { + /// Model reached a natural end-of-turn. + EndTurn, + /// Model hit the max-tokens limit. + MaxTokens, + /// Turn was cancelled by the owner or harness. + Cancelled, + /// Turn ended with an error. + Error, + /// Stop reason is unknown or unrecognized. + Unknown, +} + +impl<'de> Deserialize<'de> for StopReason { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + Ok(match s.as_str() { + "end_turn" => StopReason::EndTurn, + "max_tokens" => StopReason::MaxTokens, + "cancelled" => StopReason::Cancelled, + "error" => StopReason::Error, + "unknown" => StopReason::Unknown, + _ => StopReason::Unknown, + }) + } +} + +/// Decrypted payload of a `kind:44200` Agent Turn Metric event. +/// +/// `harness` and `timestamp` are REQUIRED. All other fields are optional or +/// nullable unless constrained by the NIP (e.g. `session_id` + `turn_seq` +/// are required whenever `cumulative` is present). +/// +/// Consumers MUST ignore unknown fields (forward compatibility). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentTurnMetricPayload { + /// Harness identifier (e.g. `"goose"`, `"buzz-agent"`). REQUIRED. + pub harness: String, + + /// Model identifier as reported by the harness, or `None` if unknown. + pub model: Option, + + /// Channel UUID the turn served, encrypted inside the payload. + pub channel_id: Option, + + /// Session identifier. REQUIRED when `cumulative` is present. + pub session_id: Option, + + /// Turn identifier (harness-internal). + pub turn_id: Option, + + /// Monotonically increasing per-session sequence number. + /// REQUIRED when `cumulative` is present; strictly increasing within one + /// `session_id`. A publisher restart that loses the counter MUST start a + /// new `session_id`. + pub turn_seq: Option, + + /// RFC 3339 timestamp (end-of-turn). REQUIRED. + pub timestamp: String, + + /// Usage for this turn (computed delta). Null fields mean not reported. + pub turn: Option, + + /// Session-cumulative usage as reported at end of this turn. + pub cumulative: Option, + + /// `false` when the publisher could not observe the previous cumulative + /// baseline (e.g. harness restart mid-session), making `turn` unreliable. + /// Defaults to `true` on the wire when not explicitly set. + #[serde(default = "default_delta_reliable")] + pub delta_reliable: bool, + + /// Why the turn ended. Unrecognized values MUST be treated as `Unknown`. + pub stop_reason: Option, +} + +fn default_delta_reliable() -> bool { + true +} + +impl AgentTurnMetricPayload { + /// Validate numeric constraints from NIP-AM §Numeric validity. + /// + /// Returns `Err` when any `cost_usd` field (in `turn` or `cumulative`) is + /// present but negative or non-finite (NaN or infinity). Token counts are + /// typed as `Option` and therefore cannot be negative by construction. + pub fn validate(&self) -> Result<(), ObserverPayloadError> { + fn check_cost(cost: Option, field: &str) -> Result<(), ObserverPayloadError> { + if let Some(c) = cost { + if !c.is_finite() || c < 0.0 { + return Err(ObserverPayloadError::InvalidPayload(format!( + "{field} must be finite and non-negative (got {c})" + ))); + } + } + Ok(()) + } + if let Some(t) = &self.turn { + check_cost(t.cost_usd, "turn.costUsd")?; + } + if let Some(c) = &self.cumulative { + check_cost(c.cost_usd, "cumulative.costUsd")?; + } + Ok(()) + } +} + +/// Encrypt an [`AgentTurnMetricPayload`] into a NIP-44 v2 ciphertext string +/// using the agent's key pair and the owner's public key. +/// +/// Returns `Err(ObserverPayloadError::InvalidPayload)` if any `cost_usd` field +/// is negative or non-finite (NaN/inf), in accordance with NIP-AM §Numeric +/// validity. +/// +/// This is the content field of a `kind:44200` event. +pub fn encrypt_agent_turn_metric( + agent_keys: &Keys, + owner_pubkey: &PublicKey, + payload: &AgentTurnMetricPayload, +) -> Result { + payload.validate()?; + encrypt_observer_payload(agent_keys, owner_pubkey, payload) +} + +/// Decrypt and deserialize an [`AgentTurnMetricPayload`] from a `kind:44200` event. +/// +/// `recipient_keys` is the owner's key pair. +/// +/// Returns `Err(ObserverPayloadError::InvalidPayload)` if the decrypted payload +/// fails numeric validation (e.g. negative or non-finite `costUsd`), mirroring +/// the fail-closed contract of [`encrypt_agent_turn_metric`]. +pub fn decrypt_agent_turn_metric( + recipient_keys: &Keys, + event: &Event, +) -> Result { + let payload: AgentTurnMetricPayload = decrypt_observer_payload(recipient_keys, event)?; + payload.validate()?; + Ok(payload) +} + +#[cfg(test)] +mod tests { + use super::*; + use nostr::{EventBuilder, Kind, Tag}; + + fn sample_payload() -> AgentTurnMetricPayload { + AgentTurnMetricPayload { + harness: "goose".to_string(), + model: Some("claude-sonnet-4-5".to_string()), + channel_id: Some("12345678-1234-1234-1234-123456789abc".to_string()), + session_id: Some("sess-abc".to_string()), + turn_id: Some("turn-1".to_string()), + turn_seq: Some(1), + timestamp: "2026-07-01T20:11:03.213Z".to_string(), + turn: Some(TokenCounts { + input_tokens: Some(1234), + output_tokens: Some(567), + total_tokens: Some(1801), + cost_usd: Some(0.0123), + cache_read_tokens: None, + cache_write_tokens: None, + }), + cumulative: Some(TokenCounts { + input_tokens: Some(45210), + output_tokens: Some(9876), + total_tokens: Some(55086), + cost_usd: Some(0.41), + cache_read_tokens: None, + cache_write_tokens: None, + }), + delta_reliable: true, + stop_reason: Some(StopReason::EndTurn), + } + } + + #[test] + fn round_trip_encrypt_decrypt() { + let agent_keys = Keys::generate(); + let owner_keys = Keys::generate(); + + let payload = sample_payload(); + let ciphertext = encrypt_agent_turn_metric(&agent_keys, &owner_keys.public_key(), &payload) + .expect("encrypt"); + + // Build a minimal event envelope so decrypt_observer_payload can use event.pubkey. + let event = EventBuilder::new(Kind::Custom(44200), ciphertext) + .tags([ + Tag::parse(["p", &owner_keys.public_key().to_hex()]).unwrap(), + Tag::parse(["agent", &agent_keys.public_key().to_hex()]).unwrap(), + ]) + .sign_with_keys(&agent_keys) + .expect("sign"); + + let decoded = decrypt_agent_turn_metric(&owner_keys, &event).expect("decrypt"); + + assert_eq!(decoded, payload); + } + + #[test] + fn wrong_key_decrypt_fails() { + let agent_keys = Keys::generate(); + let owner_keys = Keys::generate(); + let wrong_keys = Keys::generate(); + + let payload = sample_payload(); + let ciphertext = encrypt_agent_turn_metric(&agent_keys, &owner_keys.public_key(), &payload) + .expect("encrypt"); + + let event = EventBuilder::new(Kind::Custom(44200), ciphertext) + .tags([ + Tag::parse(["p", &owner_keys.public_key().to_hex()]).unwrap(), + Tag::parse(["agent", &agent_keys.public_key().to_hex()]).unwrap(), + ]) + .sign_with_keys(&agent_keys) + .expect("sign"); + + let result = decrypt_agent_turn_metric(&wrong_keys, &event); + assert!(result.is_err(), "expected decrypt error with wrong key"); + } + + #[test] + fn delta_reliable_defaults_to_true_when_absent() { + let json = r#"{"harness":"goose","timestamp":"2026-07-01T20:11:03Z"}"#; + let payload: AgentTurnMetricPayload = serde_json::from_str(json).expect("parse"); + assert!( + payload.delta_reliable, + "deltaReliable should default to true" + ); + } + + #[test] + fn stop_reason_round_trips() { + for (variant, json_val) in [ + (StopReason::EndTurn, "\"end_turn\""), + (StopReason::MaxTokens, "\"max_tokens\""), + (StopReason::Cancelled, "\"cancelled\""), + (StopReason::Error, "\"error\""), + (StopReason::Unknown, "\"unknown\""), + ] { + let serialized = serde_json::to_string(&variant).unwrap(); + assert_eq!(serialized, json_val); + let deserialized: StopReason = serde_json::from_str(json_val).unwrap(); + assert_eq!(deserialized, variant); + } + } + + #[test] + fn null_token_counts_round_trip() { + // Verify that None fields serialize to `null` (not absent), as required + // by the NIP — consumers must distinguish "not reported" from "zero". + let counts = TokenCounts { + input_tokens: None, + output_tokens: None, + total_tokens: None, + cost_usd: None, + cache_read_tokens: None, + cache_write_tokens: None, + }; + let json = serde_json::to_string(&counts).unwrap(); + // cache_* are skip_serializing_if = None, others serialize as null + assert!(json.contains("\"inputTokens\":null")); + assert!(json.contains("\"outputTokens\":null")); + let back: TokenCounts = serde_json::from_str(&json).unwrap(); + assert_eq!(back, counts); + } + + #[test] + fn unknown_stop_reason_maps_to_unknown_not_error() { + // NIP-AM: consumers MUST treat unrecognized stopReason values as Unknown; + // the token counts remain valid and the whole payload must not be rejected. + let json = r#"{ + "harness": "goose", + "timestamp": "2026-07-01T20:11:03Z", + "stopReason": "tool_limit", + "turn": { + "inputTokens": 1234, + "outputTokens": 567, + "totalTokens": 1801, + "costUsd": null + } + }"#; + let payload: AgentTurnMetricPayload = + serde_json::from_str(json).expect("payload with future stopReason must parse"); + assert_eq!( + payload.stop_reason, + Some(StopReason::Unknown), + "unrecognized stopReason must map to Unknown" + ); + // Token counts must be preserved. + let turn = payload.turn.expect("turn must be present"); + assert_eq!(turn.input_tokens, Some(1234)); + assert_eq!(turn.output_tokens, Some(567)); + assert_eq!(turn.total_tokens, Some(1801)); + } + + // ── validate() — negative / non-finite costUsd ───────────────────────── + + fn make_payload_with_turn_cost(cost: Option) -> AgentTurnMetricPayload { + AgentTurnMetricPayload { + harness: "test".to_string(), + model: None, + channel_id: None, + session_id: None, + turn_id: None, + turn_seq: None, + timestamp: "2026-07-01T00:00:00Z".to_string(), + turn: Some(TokenCounts { + input_tokens: Some(100), + output_tokens: Some(50), + total_tokens: None, + cost_usd: cost, + cache_read_tokens: None, + cache_write_tokens: None, + }), + cumulative: None, + delta_reliable: true, + stop_reason: None, + } + } + + fn make_payload_with_cumulative_cost(cost: Option) -> AgentTurnMetricPayload { + AgentTurnMetricPayload { + harness: "test".to_string(), + model: None, + channel_id: None, + session_id: None, + turn_id: None, + turn_seq: None, + timestamp: "2026-07-01T00:00:00Z".to_string(), + turn: None, + cumulative: Some(TokenCounts { + input_tokens: Some(500), + output_tokens: Some(200), + total_tokens: None, + cost_usd: cost, + cache_read_tokens: None, + cache_write_tokens: None, + }), + delta_reliable: true, + stop_reason: None, + } + } + + #[test] + fn validate_rejects_negative_turn_cost() { + let payload = make_payload_with_turn_cost(Some(-0.001)); + assert!( + matches!( + payload.validate(), + Err(ObserverPayloadError::InvalidPayload(_)) + ), + "negative turn.costUsd must be rejected" + ); + } + + #[test] + fn validate_rejects_nan_turn_cost() { + let payload = make_payload_with_turn_cost(Some(f64::NAN)); + assert!( + matches!( + payload.validate(), + Err(ObserverPayloadError::InvalidPayload(_)) + ), + "NaN turn.costUsd must be rejected" + ); + } + + #[test] + fn validate_rejects_infinite_turn_cost() { + let payload = make_payload_with_turn_cost(Some(f64::INFINITY)); + assert!( + matches!( + payload.validate(), + Err(ObserverPayloadError::InvalidPayload(_)) + ), + "infinite turn.costUsd must be rejected" + ); + } + + #[test] + fn validate_rejects_negative_cumulative_cost() { + let payload = make_payload_with_cumulative_cost(Some(-1.0)); + assert!( + matches!( + payload.validate(), + Err(ObserverPayloadError::InvalidPayload(_)) + ), + "negative cumulative.costUsd must be rejected" + ); + } + + #[test] + fn validate_accepts_finite_non_negative_cost() { + // Zero, small, and larger values are all valid. + for cost in [0.0_f64, 0.001, 1.0, 999.99] { + let payload = make_payload_with_turn_cost(Some(cost)); + assert!(payload.validate().is_ok(), "cost {cost} should be accepted"); + } + } + + #[test] + fn validate_accepts_absent_cost() { + let payload = make_payload_with_turn_cost(None); + assert!( + payload.validate().is_ok(), + "absent costUsd must be accepted" + ); + } + + #[test] + fn encrypt_agent_turn_metric_rejects_negative_cost() { + let agent_keys = Keys::generate(); + let owner_keys = Keys::generate(); + let payload = make_payload_with_turn_cost(Some(-0.5)); + let result = encrypt_agent_turn_metric(&agent_keys, &owner_keys.public_key(), &payload); + assert!( + matches!(result, Err(ObserverPayloadError::InvalidPayload(_))), + "encrypt must reject payload with negative costUsd" + ); + } + + #[test] + fn decrypt_agent_turn_metric_rejects_negative_cost_bypassing_encrypt() { + // Regression: a raw/misbehaving agent can persist a syntactically valid + // NIP-44 payload with costUsd: -1 by calling encrypt_observer_payload + // directly (bypassing the validating encrypt_agent_turn_metric helper). + // decrypt_agent_turn_metric must reject it symmetrically. + use crate::observer::encrypt_observer_payload; + + let agent_keys = Keys::generate(); + let owner_keys = Keys::generate(); + + // Build a payload with negative costUsd and encrypt via the lower-level + // path, bypassing encrypt_agent_turn_metric's validate() call. + let bad_payload = make_payload_with_turn_cost(Some(-1.0)); + let ciphertext = + encrypt_observer_payload(&agent_keys, &owner_keys.public_key(), &bad_payload) + .expect("lower-level encrypt should succeed without validation"); + + let event = EventBuilder::new(Kind::Custom(44200), ciphertext) + .tags([ + Tag::parse(["p", &owner_keys.public_key().to_hex()]).unwrap(), + Tag::parse(["agent", &agent_keys.public_key().to_hex()]).unwrap(), + ]) + .sign_with_keys(&agent_keys) + .expect("sign"); + + let result = decrypt_agent_turn_metric(&owner_keys, &event); + assert!( + matches!(result, Err(ObserverPayloadError::InvalidPayload(_))), + "decrypt must reject a payload with negative costUsd even when \ + encrypted via the lower-level path" + ); + } +} diff --git a/crates/buzz-core/src/filter.rs b/crates/buzz-core/src/filter.rs index a3c0bef59..1671f7622 100644 --- a/crates/buzz-core/src/filter.rs +++ b/crates/buzz-core/src/filter.rs @@ -12,14 +12,17 @@ pub fn filters_match(filters: &[Filter], event: &StoredEvent) -> bool { } /// Result-level read authorization for relay-signed events whose content is -/// private to a single viewer. Currently only `KIND_DM_VISIBILITY`: the reader -/// MUST equal the snapshot's `#p` (owner). Returns `true` for every other kind. +/// private to a single viewer. Currently gates `KIND_DM_VISIBILITY` and +/// `KIND_AGENT_TURN_METRIC`: the reader MUST equal the event's `#p` tag +/// (owner). Returns `true` for every other kind. /// -/// This guards the delivery surfaces directly, so a query that bypasses the -/// filter-level `#p` gate (e.g. a kindless `ids:[…]` lookup of a known snapshot -/// id) still cannot read another viewer's hidden-DM set. +/// This guards every delivery surface — WS historical pull (`req.rs`), HTTP +/// bridge (`bridge.rs`), and live fan-out (`event.rs`) — so a query that +/// bypasses the filter-level `#p` gate (e.g. a kindless `ids:[…]` lookup of +/// a known event id) still cannot read another user's private event. pub fn reader_authorized_for_event(event: &nostr::Event, reader_pubkey_hex: &str) -> bool { - if crate::kind::event_kind_u32(event) != crate::kind::KIND_DM_VISIBILITY { + let kind = crate::kind::event_kind_u32(event); + if kind != crate::kind::KIND_DM_VISIBILITY && kind != crate::kind::KIND_AGENT_TURN_METRIC { return true; } let p = nostr::SingleLetterTag::lowercase(nostr::Alphabet::P); @@ -261,4 +264,37 @@ mod tests { .expect("sign"); assert!(reader_authorized_for_event(¬e, other)); } + + #[test] + fn reader_authorized_for_event_gates_agent_turn_metric_by_p() { + let agent_keys = Keys::generate(); + let owner = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + let attacker = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"; + + // Agent turn metric event: pubkey=agent, p tag=owner (NIP-AM envelope shape). + let metric = EventBuilder::new( + Kind::Custom(crate::kind::KIND_AGENT_TURN_METRIC as u16), + "encrypted-payload", + ) + .tags([ + Tag::parse(["p", owner]).unwrap(), + Tag::parse(["agent", &agent_keys.public_key().to_hex()]).unwrap(), + ]) + .sign_with_keys(&agent_keys) + .expect("sign"); + + assert!( + reader_authorized_for_event(&metric, owner), + "owner must be authorized to read their own agent turn metric" + ); + assert!( + !reader_authorized_for_event(&metric, attacker), + "non-owner must NOT be authorized to read an agent turn metric via kindless ids" + ); + // The authoring agent also does not get read-back (NIP-AM: owner-only read). + assert!( + !reader_authorized_for_event(&metric, &agent_keys.public_key().to_hex()), + "the authoring agent must NOT be authorized to read its own metric event (owner-only)" + ); + } } diff --git a/crates/buzz-core/src/kind.rs b/crates/buzz-core/src/kind.rs index 6b86b1c1f..ebaf01a30 100644 --- a/crates/buzz-core/src/kind.rs +++ b/crates/buzz-core/src/kind.rs @@ -110,6 +110,15 @@ pub const KIND_EVENT_REMINDER: u32 = 30300; /// a compile-time bitset or sorted array with binary search for hot-path use. pub const AUTHOR_ONLY_KINDS: &[u32] = &[KIND_EVENT_REMINDER]; +/// Kinds that require a result-level read gate beyond the filter-layer +/// `#p` check: even a reader who knows an event id MUST match the event's +/// `#p` tag to receive the event. This closes the kindless `{ids:[…]}` read +/// path for events whose existence must not be leaked. +/// +/// Used by `filter_can_match_result_gated_kinds` to force the per-event +/// fallback path in COUNT rather than the fast SQL `count_events()`. +pub const RESULT_GATED_KINDS: &[u32] = &[KIND_DM_VISIBILITY, KIND_AGENT_TURN_METRIC]; + /// Kinds whose stored events have `#p`-bound read access — readable only by /// subscribers whose pubkey appears in the event's `#p` tag. /// @@ -118,8 +127,9 @@ pub const AUTHOR_ONLY_KINDS: &[u32] = &[KIND_EVENT_REMINDER]; /// `#p` values exactly equal the authenticated reader's pubkey. For stored /// (non-ephemeral) kinds in this set, the storage layer additionally writes a /// NULL `search_tsv` so the event is unsearchable through NIP-50 FTS -/// (`schema/schema.sql` and `migrations/0001_initial_schema.sql` — drift -/// caught by `p_gated_persistent_kinds_have_storage_null_tsvector` in +/// (`schema/schema.sql` plus a new additive migration — see +/// `0004_agent_turn_metric_fts.sql` — drift caught by +/// `p_gated_persistent_kinds_have_storage_null_tsvector` in /// `crates/buzz-search/tests/fts_integration.rs`). /// /// Ephemeral kinds (20000–29999, e.g. [`KIND_AGENT_OBSERVER_FRAME`]) are @@ -131,6 +141,10 @@ pub const P_GATED_KINDS: &[u32] = &[ KIND_MEMBER_REMOVED_NOTIFICATION, KIND_GIFT_WRAP, KIND_DM_VISIBILITY, + // NIP-AM: agent turn metrics are encrypted to the owner and must not be + // readable by any unauthenticated or non-owner party, including via `ids` + // filters — see NIP-AM §Relay Behavior. + KIND_AGENT_TURN_METRIC, ]; /// NIP-AP: Agent Persona (parameterized replaceable, owner-authored). @@ -343,6 +357,15 @@ pub const KIND_MEMBER_ADDED_NOTIFICATION: u32 = 44100; /// Stored globally (channel_id = None) with p-tag = target, h-tag = channel UUID. pub const KIND_MEMBER_REMOVED_NOTIFICATION: u32 = 44101; +/// NIP-AM: Agent Turn Metric — durable per-turn token-usage record (agent-authored). +/// +/// Regular stored event (append-only, never replaced). The agent publishes one +/// event per completed turn, NIP-44 encrypted to its owner. Tags: exactly one `p` +/// (owner pubkey) and one `agent` (agent pubkey == event pubkey); no `h` tag. +/// Stored globally (channel_id = NULL); owner-scoped reads only (p-gated, NIP-42). +/// See `docs/nips/NIP-AM.md`. +pub const KIND_AGENT_TURN_METRIC: u32 = 44200; + // Forum / social (45000–45999) // V1 used addressable range (30001–30003) — wrong. /// A forum post (thread root). @@ -507,6 +530,7 @@ pub const ALL_KINDS: &[u32] = &[ KIND_JOB_ERROR, KIND_MEMBER_ADDED_NOTIFICATION, KIND_MEMBER_REMOVED_NOTIFICATION, + KIND_AGENT_TURN_METRIC, KIND_WORKFLOW_DEF, KIND_LONG_FORM, KIND_USER_STATUS, @@ -655,6 +679,11 @@ const _: () = assert!(KIND_AUTH <= u16::MAX as u32); const _: () = assert!(KIND_CANVAS <= u16::MAX as u32); const _: () = assert!(KIND_HUDDLE_GUIDELINES <= u16::MAX as u32); const _: () = assert!(EPHEMERAL_KIND_MIN < EPHEMERAL_KIND_MAX); +// Compile-time: KIND_AGENT_TURN_METRIC is a regular stored kind (not ephemeral, not replaceable). +const _: () = assert!(!is_ephemeral(KIND_AGENT_TURN_METRIC)); +const _: () = assert!(!is_replaceable(KIND_AGENT_TURN_METRIC)); +const _: () = assert!(!is_parameterized_replaceable(KIND_AGENT_TURN_METRIC)); +const _: () = assert!(KIND_AGENT_TURN_METRIC <= u16::MAX as u32); #[cfg(test)] mod tests { diff --git a/crates/buzz-core/src/lib.rs b/crates/buzz-core/src/lib.rs index dee40e988..6139ee3ae 100644 --- a/crates/buzz-core/src/lib.rs +++ b/crates/buzz-core/src/lib.rs @@ -5,6 +5,8 @@ //! Provides [`StoredEvent`], filter matching, kind constants, and event //! verification. All other Buzz crates depend on this one. +/// NIP-AM: Agent Turn Metric — payload type and encrypt/decrypt helpers. +pub mod agent_turn_metric; /// Channel and membership enums shared across crates. pub mod channel; /// NIP-AE Agent Engrams — slug grammar, conversation key, d-tag derivation, diff --git a/crates/buzz-core/src/observer.rs b/crates/buzz-core/src/observer.rs index f2b981188..8347bda05 100644 --- a/crates/buzz-core/src/observer.rs +++ b/crates/buzz-core/src/observer.rs @@ -44,6 +44,9 @@ pub enum ObserverPayloadError { /// Actual plaintext byte count. got: usize, }, + /// A payload field violated a NIP-AM numeric constraint. + #[error("invalid payload field: {0}")] + InvalidPayload(String), } /// Returns true when `content` fits the NIP-44 v2 ciphertext length envelope. diff --git a/crates/buzz-db/src/migration.rs b/crates/buzz-db/src/migration.rs index 7caf822c2..f3defbb66 100644 --- a/crates/buzz-db/src/migration.rs +++ b/crates/buzz-db/src/migration.rs @@ -471,7 +471,7 @@ mod tests { let mut migrations: Vec<_> = MIGRATOR.iter().collect(); migrations.sort_by_key(|migration| migration.version); - assert_eq!(migrations.len(), 3); + assert_eq!(migrations.len(), 4); assert_eq!(migrations[0].version, 1); assert_eq!(&*migrations[0].description, "initial schema"); assert!(migrations[0] @@ -515,6 +515,15 @@ mod tests { .as_str() .contains("ALTER TABLE communities ADD COLUMN icon")); assert!(!migrations[0].sql.as_str().contains("icon")); + + // NIP-AM (kind 44200) FTS exclusion: additive migration, never folded + // into 0001 — folding would change 0001's checksum and break brownfield + // startup. Migration 4 drops and re-adds the generated `search_tsv` + // column with the extended kind-44200 exclusion. 0001 must NOT carry 44200. + assert_eq!(migrations[3].version, 4); + assert!(migrations[3].sql.as_str().contains("search_tsv")); + assert!(migrations[3].sql.as_str().contains("44200")); + assert!(!migrations[0].sql.as_str().contains("44200")); } #[test] diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 59093a060..18e146393 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -569,6 +569,12 @@ pub async fn query_events( if !event_in_accessible_channel(&se, &accessible_channels) { continue; } + // Defense-in-depth: never deliver a result-gated event (e.g. kind:44200 + // or kind:30622) to a non-owner via the feed path, even though feed SQL + // kind allowlists already exclude these kinds. + if !buzz_core::filter::reader_authorized_for_event(&se.event, &authed_pubkey_hex) { + continue; + } if let Ok(v) = serde_json::to_value(&se.event) { events.push(v); feed_count += 1; @@ -629,6 +635,12 @@ pub async fn query_events( if !event_in_accessible_channel(&se, &accessible_channels) { continue; } + // Defense-in-depth: never deliver a result-gated event (e.g. kind:44200 + // or kind:30622) to a non-owner via the thread path, even though + // requires_h_channel_scope already excludes these kinds from thread metadata. + if !buzz_core::filter::reader_authorized_for_event(&se.event, &authed_pubkey_hex) { + continue; + } if let Ok(v) = serde_json::to_value(&se.event) { events.push(v); } @@ -813,6 +825,15 @@ pub async fn count_events( for filter in &filters { let needs_author_only_filtering = crate::handlers::req::filter_can_match_author_only_kinds(filter); + // Same result-gated guard as the WS COUNT handler: force the per-event + // fallback for filters that can match 44200 or 30622 unless #p=[self] + // is safely pushed down (existence leak otherwise). + let needs_result_gated_filtering = + crate::handlers::req::filter_can_match_result_gated_kinds(filter) + && !crate::handlers::req::result_gated_count_safe_for_pushdown( + filter, + &authed_pubkey_hex, + ); // If filter targets a specific channel, verify access. if let Some(ch_id) = extract_channel_from_filter(filter) { @@ -835,6 +856,7 @@ pub async fn count_events( }); if crate::handlers::req::filter_fully_pushable(filter) && (!needs_author_only_filtering || author_is_self) + && !needs_result_gated_filtering { match state.db.count_events(&query).await { Ok(n) => total += n as u64, @@ -864,6 +886,12 @@ pub async fn count_events( { continue; } + if !buzz_core::filter::reader_authorized_for_event( + &se.event, + &authed_pubkey_hex, + ) { + continue; + } total += 1; } } @@ -892,6 +920,7 @@ pub async fn count_events( }); if crate::handlers::req::filter_fully_pushable(filter) && (!needs_author_only_filtering || author_is_self) + && !needs_result_gated_filtering { query.limit = None; match state.db.count_events(&query).await { @@ -921,6 +950,12 @@ pub async fn count_events( { continue; } + if !buzz_core::filter::reader_authorized_for_event( + &se.event, + &authed_pubkey_hex, + ) { + continue; + } total += 1; } } diff --git a/crates/buzz-relay/src/handlers/count.rs b/crates/buzz-relay/src/handlers/count.rs index 7cb488218..4689826f2 100644 --- a/crates/buzz-relay/src/handlers/count.rs +++ b/crates/buzz-relay/src/handlers/count.rs @@ -6,7 +6,9 @@ use nostr::Filter; use tracing::warn; use crate::connection::{AuthState, ConnectionState}; -use crate::handlers::req::is_author_only_event; +use crate::handlers::req::{ + filter_can_match_result_gated_kinds, is_author_only_event, result_gated_count_safe_for_pushdown, +}; use crate::protocol::RelayMessage; use crate::state::AppState; @@ -100,6 +102,14 @@ pub async fn handle_count( // fast-path count_events() cannot be used because it doesn't do // per-event author filtering. let needs_author_only_filtering = super::req::filter_can_match_author_only_kinds(filter); + // Determine if this filter can match result-gated kinds (44200, 30622) + // that require a per-event owner check. When the fast SQL path would + // count matching rows without calling reader_authorized_for_event, a + // non-owner learns the existence of events they are not allowed to see. + // The only safe pushdown is when #p is pinned to the authenticated + // reader's own pubkey. + let needs_result_gated_filtering = filter_can_match_result_gated_kinds(filter) + && !result_gated_count_safe_for_pushdown(filter, &authed_pubkey_hex); if let Some(ch_id) = extract_channel_from_filter(filter) { // Filter targets a specific channel — verify access. Mirrors the WS @@ -149,6 +159,7 @@ pub async fn handle_count( }); if super::req::filter_fully_pushable(filter) && (!needs_author_only_filtering || author_is_self) + && !needs_result_gated_filtering { match state.db.count_events(&query).await { Ok(n) => total += n as u64, @@ -179,6 +190,12 @@ pub async fn handle_count( if is_author_only_event(&se.event, &pubkey_bytes) { continue; } + if !buzz_core::filter::reader_authorized_for_event( + &se.event, + &authed_pubkey_hex, + ) { + continue; + } total += 1; } } @@ -212,6 +229,7 @@ pub async fn handle_count( }); if super::req::filter_fully_pushable(filter) && (!needs_author_only_filtering || author_is_self) + && !needs_result_gated_filtering { query.limit = None; // COUNT doesn't need a row limit match state.db.count_events(&query).await { @@ -242,6 +260,12 @@ pub async fn handle_count( if is_author_only_event(&se.event, &pubkey_bytes) { continue; } + if !buzz_core::filter::reader_authorized_for_event( + &se.event, + &authed_pubkey_hex, + ) { + continue; + } total += 1; } } diff --git a/crates/buzz-relay/src/handlers/event.rs b/crates/buzz-relay/src/handlers/event.rs index 9704f4633..8b1fb85cf 100644 --- a/crates/buzz-relay/src/handlers/event.rs +++ b/crates/buzz-relay/src/handlers/event.rs @@ -430,10 +430,13 @@ async fn dispatch_persistent_event_inner( return 0; } }; - // For viewer-private snapshots (kind:30622), live fan-out must reach only the - // owner — a kindless `ids:[…]` subscription can otherwise match it. Pull paths - // (HTTP /query, WS historical) are gated separately by reader_authorized_for_event. - let dm_visibility_owner: Option = (kind_u32 == buzz_core::kind::KIND_DM_VISIBILITY) + // For viewer-private events (kind:30622 DM visibility, kind:44200 agent turn + // metrics), live fan-out must reach only the owner — a kindless `ids:[…]` + // subscription can otherwise match it. Pull paths (HTTP /query, WS historical) + // are gated separately by reader_authorized_for_event. + let owner_only_kind = kind_u32 == buzz_core::kind::KIND_DM_VISIBILITY + || kind_u32 == buzz_core::kind::KIND_AGENT_TURN_METRIC; + let private_event_owner: Option = owner_only_kind .then(|| { let p = nostr::SingleLetterTag::lowercase(nostr::Alphabet::P); stored_event @@ -445,12 +448,12 @@ async fn dispatch_persistent_event_inner( .flatten(); // Author-only delivery gating (NIP-ER reminders) is enforced centrally in // filter_fanout_by_access, applied to `matches` above before this loop. The - // DM visibility owner gate is an additional delivery fence, so build shared + // private-event owner gate is an additional delivery fence, so build shared // frames only after applying it to the already access-filtered recipient set. let recipients: Vec<_> = matches .iter() .filter_map(|(target_conn_id, sub_id)| { - if let Some(ref owner_hex) = dm_visibility_owner { + if let Some(ref owner_hex) = private_event_owner { let is_owner = state .conn_manager .pubkey_for(*target_conn_id) diff --git a/crates/buzz-relay/src/handlers/ingest.rs b/crates/buzz-relay/src/handlers/ingest.rs index 32b7244b4..87088bfd4 100644 --- a/crates/buzz-relay/src/handlers/ingest.rs +++ b/crates/buzz-relay/src/handlers/ingest.rs @@ -12,9 +12,9 @@ use uuid::Uuid; use buzz_auth::Scope; use buzz_core::kind::{ event_kind_u32, is_identity_archive_request_kind, is_parameterized_replaceable, - is_relay_admin_kind, KIND_AGENT_ENGRAM, KIND_AGENT_PROFILE, KIND_APPROVAL_DENY, - KIND_APPROVAL_GRANT, KIND_AUTH, KIND_BOOKMARK_LIST, KIND_BOOKMARK_SET, KIND_CANVAS, - KIND_CONTACT_LIST, KIND_DELETION, KIND_DM_ADD_MEMBER, KIND_DM_HIDE, KIND_DM_OPEN, + is_relay_admin_kind, KIND_AGENT_ENGRAM, KIND_AGENT_PROFILE, KIND_AGENT_TURN_METRIC, + KIND_APPROVAL_DENY, KIND_APPROVAL_GRANT, KIND_AUTH, KIND_BOOKMARK_LIST, KIND_BOOKMARK_SET, + KIND_CANVAS, KIND_CONTACT_LIST, KIND_DELETION, KIND_DM_ADD_MEMBER, KIND_DM_HIDE, KIND_DM_OPEN, KIND_EMOJI_LIST, KIND_EMOJI_SET, KIND_EVENT_REMINDER, KIND_FOLLOW_SET, KIND_FORUM_COMMENT, KIND_FORUM_POST, KIND_FORUM_VOTE, KIND_GIFT_WRAP, KIND_GIT_ISSUE, KIND_GIT_PATCH, KIND_GIT_PR_UPDATE, KIND_GIT_PULL_REQUEST, KIND_GIT_REPO_ANNOUNCEMENT, KIND_GIT_REPO_STATE, @@ -156,6 +156,8 @@ fn required_scope_for_kind(kind: u32, event: &Event) -> Result { Ok(Scope::UsersWrite) } + // NIP-AM: agent turn metrics are agent-authored global events (encrypted to owner). + KIND_AGENT_TURN_METRIC => Ok(Scope::MessagesWrite), // NIP-51 standard lists and NIP-65 relay list — user-owned global state, // same ownership shape as kind:3 (contacts) and kind:0 (profile). KIND_MUTE_LIST @@ -379,6 +381,9 @@ pub(crate) fn is_global_only_kind(kind: u32) -> bool { // Mesh-LLM relay status is relay-signed and global. Clients may // subscribe to it, but must not channel-scope or submit it. | KIND_MESH_LLM_RELAY_STATUS + // NIP-AM: agent turn metrics are owner-scoped global events. + // Channel identity is encrypted inside the payload — no `h` tag. + | KIND_AGENT_TURN_METRIC ) } @@ -1066,6 +1071,82 @@ fn validate_engram_nip44_content(content: &str) -> Result<(), String> { Ok(()) } +/// Validate the public envelope of a NIP-AM `kind:44200` event. +/// +/// Enforces (without touching the encrypted payload): +/// - Exactly one `p` tag: 64 lowercase hex chars (the owner pubkey). +/// - Exactly one `agent` tag: 64 lowercase hex chars equal to `event.pubkey`. +/// - No `h` tag (channel identity belongs inside the encrypted payload). +/// - Content syntactically resembles NIP-44 v2 ciphertext (delegated to +/// `validate_engram_nip44_content`, which does the same length/base64/version check). +/// +/// Ownership (`is_agent_owner`) is an async DB check performed separately in +/// `ingest_event_inner` after this synchronous envelope check. +fn validate_agent_turn_metric_envelope(event: &nostr::Event) -> Result<(), String> { + let event_pubkey_hex = event.pubkey.to_hex(); + let mut p_tags: Vec<&str> = Vec::new(); + let mut agent_tags: Vec<&str> = Vec::new(); + let mut has_h_tag = false; + + for tag in event.tags.iter() { + let parts = tag.as_slice(); + if parts.len() < 2 { + continue; + } + match parts[0].as_str() { + "p" => p_tags.push(&parts[1]), + "agent" => agent_tags.push(&parts[1]), + "h" => has_h_tag = true, + _ => {} + } + } + + if has_h_tag { + return Err( + "agent-turn-metric event must not have an `h` tag (channel identity belongs inside the encrypted payload)".to_string(), + ); + } + + if p_tags.len() != 1 { + return Err(format!( + "agent-turn-metric event must have exactly one `p` tag (got {})", + p_tags.len() + )); + } + let p = p_tags[0]; + if p.len() != 64 + || !p + .bytes() + .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase()) + { + return Err("agent-turn-metric `p` tag must be 64 lowercase hex chars".to_string()); + } + + if agent_tags.len() != 1 { + return Err(format!( + "agent-turn-metric event must have exactly one `agent` tag (got {})", + agent_tags.len() + )); + } + let agent = agent_tags[0]; + if agent.len() != 64 + || !agent + .bytes() + .all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase()) + { + return Err("agent-turn-metric `agent` tag must be 64 lowercase hex chars".to_string()); + } + if agent != event_pubkey_hex { + return Err("agent-turn-metric `agent` tag must equal event pubkey".to_string()); + } + + // Content must look like a NIP-44 v2 ciphertext (length, base64, version prefix). + validate_engram_nip44_content(&event.content) + .map_err(|e| e.replace("agent-engram", "agent-turn-metric"))?; + + Ok(()) +} + /// Parse a NIP-ER `not_before` tag value into a Unix timestamp. /// /// The value MUST be a decimal integer string containing only ASCII digits, with @@ -1671,6 +1752,43 @@ async fn ingest_event_inner( .map_err(|e| IngestError::Rejected(format!("invalid: {e}")))?; } + if kind_u32 == KIND_AGENT_TURN_METRIC { + validate_agent_turn_metric_envelope(&event) + .map_err(|e| IngestError::Rejected(format!("invalid: {e}")))?; + + // Ownership check: `p` tag must be the registered owner of `event.pubkey`. + // Tag shape is already verified above; these extractions are infallible. + let owner_hex = event + .tags + .iter() + .find_map(|t| { + let parts = t.as_slice(); + if parts.len() >= 2 && parts[0].as_str() == "p" { + Some(parts[1].as_str()) + } else { + None + } + }) + .expect("p tag present (validated above)"); + let agent_bytes = event.pubkey.to_bytes().to_vec(); + let owner_bytes = hex::decode(owner_hex).expect("hex validated above"); + let is_owner = state + .db + .is_agent_owner(tenant.community(), &agent_bytes, &owner_bytes) + .await + .map_err(|e| { + IngestError::Internal(format!( + "error: db error checking agent-turn-metric ownership: {e}" + )) + })?; + if !is_owner { + return Err(IngestError::AuthFailed( + "restricted: agent-turn-metric `p` tag must be the registered owner of this agent" + .into(), + )); + } + } + if kind_u32 == KIND_EVENT_REMINDER { validate_event_reminder(&event) .map_err(|e| IngestError::Rejected(format!("invalid: {e}")))?; @@ -2274,6 +2392,7 @@ mod tests { KIND_PERSONA, KIND_TEAM, KIND_MANAGED_AGENT, + KIND_AGENT_TURN_METRIC, ]; for kind in migrated { assert!( @@ -2311,6 +2430,24 @@ mod tests { assert!(!requires_h_channel_scope(KIND_MESH_LLM_RELAY_STATUS)); } + #[test] + fn agent_turn_metric_is_global_only_and_in_scope_allowlist() { + let dummy = make_dummy_event(); + assert!( + is_global_only_kind(KIND_AGENT_TURN_METRIC), + "kind:44200 must be global-only (no h tag)" + ); + assert!( + !requires_h_channel_scope(KIND_AGENT_TURN_METRIC), + "kind:44200 must not require an h-tag" + ); + assert_eq!( + required_scope_for_kind(KIND_AGENT_TURN_METRIC, &dummy).unwrap(), + Scope::MessagesWrite, + "kind:44200 requires MessagesWrite scope" + ); + } + #[test] fn nip51_and_nip65_lists_are_global_only() { for kind in [ @@ -2944,4 +3081,104 @@ mod tests { let err = validate_persona_envelope(&ev).unwrap_err(); assert!(err.contains("`d` tag"), "got: {err}"); } + + // ─── agent_turn_metric envelope tests ──────────────────────────────────── + + /// Build an event for kind:44200 with the given tags and content. + /// The signing key IS the agent key, so `event.pubkey` matches the agent. + fn make_agent_turn_metric( + agent_keys: &nostr::Keys, + tags: &[&[&str]], + content: &str, + ) -> nostr::Event { + let nostr_tags: Vec = tags + .iter() + .map(|t| nostr::Tag::parse(t.iter().copied()).unwrap()) + .collect(); + nostr::EventBuilder::new( + nostr::Kind::Custom(buzz_core::kind::KIND_AGENT_TURN_METRIC as u16), + content, + ) + .tags(nostr_tags) + .sign_with_keys(agent_keys) + .unwrap() + } + + #[test] + fn agent_turn_metric_envelope_accepts_canonical() { + let agent = nostr::Keys::generate(); + let owner_hex = "b".repeat(64); + let agent_hex = agent.public_key().to_hex(); + let ev = make_agent_turn_metric( + &agent, + &[&["p", &owner_hex], &["agent", &agent_hex]], + &fake_nip44_v2(), + ); + assert!(validate_agent_turn_metric_envelope(&ev).is_ok()); + } + + #[test] + fn agent_turn_metric_envelope_rejects_h_tag() { + let agent = nostr::Keys::generate(); + let owner_hex = "b".repeat(64); + let agent_hex = agent.public_key().to_hex(); + let ev = make_agent_turn_metric( + &agent, + &[ + &["p", &owner_hex], + &["agent", &agent_hex], + &["h", "some-channel-uuid"], + ], + &fake_nip44_v2(), + ); + let err = validate_agent_turn_metric_envelope(&ev).unwrap_err(); + assert!(err.contains("`h` tag"), "got: {err}"); + } + + #[test] + fn agent_turn_metric_envelope_rejects_missing_p() { + let agent = nostr::Keys::generate(); + let agent_hex = agent.public_key().to_hex(); + let ev = make_agent_turn_metric(&agent, &[&["agent", &agent_hex]], &fake_nip44_v2()); + let err = validate_agent_turn_metric_envelope(&ev).unwrap_err(); + assert!(err.contains("`p` tag"), "got: {err}"); + } + + #[test] + fn agent_turn_metric_envelope_rejects_missing_agent() { + let agent = nostr::Keys::generate(); + let owner_hex = "b".repeat(64); + let ev = make_agent_turn_metric(&agent, &[&["p", &owner_hex]], &fake_nip44_v2()); + let err = validate_agent_turn_metric_envelope(&ev).unwrap_err(); + assert!(err.contains("`agent` tag"), "got: {err}"); + } + + #[test] + fn agent_turn_metric_envelope_rejects_agent_mismatch() { + let agent = nostr::Keys::generate(); + let owner_hex = "b".repeat(64); + let wrong_agent_hex = "c".repeat(64); // not event.pubkey + let ev = make_agent_turn_metric( + &agent, + &[&["p", &owner_hex], &["agent", &wrong_agent_hex]], + &fake_nip44_v2(), + ); + let err = validate_agent_turn_metric_envelope(&ev).unwrap_err(); + assert!(err.contains("equal event pubkey"), "got: {err}"); + } + + #[test] + fn agent_turn_metric_envelope_rejects_bad_content() { + let agent = nostr::Keys::generate(); + let owner_hex = "b".repeat(64); + let agent_hex = agent.public_key().to_hex(); + let ev = make_agent_turn_metric( + &agent, + &[&["p", &owner_hex], &["agent", &agent_hex]], + "not-a-ciphertext", + ); + let err = validate_agent_turn_metric_envelope(&ev).unwrap_err(); + // error comes from validate_engram_nip44_content with label replaced + assert!(err.contains("agent-turn-metric"), "got: {err}"); + } } diff --git a/crates/buzz-relay/src/handlers/req.rs b/crates/buzz-relay/src/handlers/req.rs index 795322245..12ac43bc2 100644 --- a/crates/buzz-relay/src/handlers/req.rs +++ b/crates/buzz-relay/src/handlers/req.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use tracing::{debug, warn}; use buzz_core::filter::filters_match; -use buzz_core::kind::{AUTHOR_ONLY_KINDS, KIND_AGENT_ENGRAM, KIND_DM_VISIBILITY, P_GATED_KINDS}; +use buzz_core::kind::{ + AUTHOR_ONLY_KINDS, KIND_AGENT_ENGRAM, KIND_AGENT_TURN_METRIC, KIND_DM_VISIBILITY, + P_GATED_KINDS, RESULT_GATED_KINDS, +}; use buzz_core::tenant::TenantContext; use buzz_db::EventQuery; use buzz_pubsub::EventTopic; @@ -1015,13 +1018,18 @@ pub(crate) fn p_gated_filters_authorized(filters: &[Filter], authed_pubkey_hex: // safe for kinds whose id is author-bound or whose content is encrypted. // KIND_DM_VISIBILITY is relay-signed (id not author-bound) and exposes // plaintext private hide choices, so its `#p` owner check MUST hold even - // when `ids` is present. Only filters that explicitly name the kind lose - // the exemption — a kindless `ids` lookup is unaffected. - let explicitly_dm_visibility = filter.kinds.as_ref().is_some_and(|ks| { - ks.iter() - .any(|kind| kind.as_u16() as u32 == KIND_DM_VISIBILITY) + // when `ids` is present. KIND_AGENT_TURN_METRIC events are long-lived + // and their cleartext envelope (pubkey, agent tag, created_at) leaks + // turn-activity metadata — knowing an event id is NOT authorization + // (NIP-AM §Relay Behavior). Only filters that explicitly name the kind + // lose the exemption — a kindless `ids` lookup is unaffected. + let explicitly_no_ids_exemption = filter.kinds.as_ref().is_some_and(|ks| { + ks.iter().any(|kind| { + let k = kind.as_u16() as u32; + k == KIND_DM_VISIBILITY || k == KIND_AGENT_TURN_METRIC + }) }); - if !explicitly_dm_visibility && filter.ids.as_ref().is_some_and(|ids| !ids.is_empty()) { + if !explicitly_no_ids_exemption && filter.ids.as_ref().is_some_and(|ids| !ids.is_empty()) { return true; } @@ -1097,6 +1105,45 @@ pub(crate) fn filter_can_match_author_only_kinds(filter: &Filter) -> bool { }) } +/// Returns `true` if the filter CAN match result-gated kinds — meaning it +/// either has no `kinds` constraint (wildcard) or includes at least one kind +/// that carries a per-event result-level read gate (currently +/// `KIND_DM_VISIBILITY` and `KIND_AGENT_TURN_METRIC`). +/// +/// Used by the COUNT handler to force the per-event fallback path instead of +/// the fast SQL `count_events()`, which cannot enforce the owner-only result +/// gate. An existence count leaks private event activity even though no content +/// is returned, violating the NIP-AM / NIP-DM requirement that knowing an id +/// MUST NOT grant access. +pub(crate) fn filter_can_match_result_gated_kinds(filter: &Filter) -> bool { + filter.kinds.as_ref().is_none_or(|ks| { + ks.iter() + .any(|k| RESULT_GATED_KINDS.contains(&(k.as_u16() as u32))) + }) +} + +/// Returns `true` if a result-gated-kind COUNT filter can safely use the fast +/// SQL pushdown path — specifically, when the filter's `#p` tag is non-empty +/// and every entry equals the authenticated reader's pubkey. +/// +/// In that case the SQL `WHERE #p = self` pushdown scopes the query to the +/// reader's own events, so the fast path cannot leak another owner's event +/// existence. This mirrors the owner's own subscription pattern from the NIP: +/// `{kinds:[44200], #p:[self]}`. +/// +/// When this returns `false`, the COUNT handler MUST use the per-event fallback +/// and apply `reader_authorized_for_event` on each row. +pub(crate) fn result_gated_count_safe_for_pushdown( + filter: &Filter, + authed_pubkey_hex: &str, +) -> bool { + let p_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::P); + filter + .generic_tags + .get(&p_tag) + .is_some_and(|values| !values.is_empty() && values.iter().all(|v| v == authed_pubkey_hex)) +} + /// Returns `true` if the event is an author-only kind and the requester is NOT /// the author. Used as a per-event filter during historical delivery and fan-out /// to silently omit unauthorized events from mixed-kind result sets. @@ -1359,6 +1406,66 @@ mod tests { assert!(p_gated_filters_authorized(&[member_notif_ids], authed)); } + /// NIP-AM: kind 44200 must deny `{kinds:[44200], ids:[...]}` by non-owner. + /// Thufir's implementation note: the helper treats explicit-kind+ids and + /// kindless ids differently. Explicit `{kinds:[44200], ids:[...]}` is denied; + #[test] + fn agent_turn_metric_requires_p_tag_even_with_ids() { + let p_tag = SingleLetterTag::lowercase(Alphabet::P); + let authed = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + let other = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + let event_id = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"; + let metric_kind = nostr::Kind::Custom(buzz_core::kind::KIND_AGENT_TURN_METRIC as u16); + + // Case 1: {kinds:[44200], ids:[...]} — explicit kind, should require #p owner. + let explicit_kind_ids_only = Filter::new() + .kind(metric_kind) + .id(nostr::EventId::from_hex(event_id).unwrap()); + assert!( + !p_gated_filters_authorized(&[explicit_kind_ids_only], authed), + "kind:44200 + ids without matching #p must be denied" + ); + + let explicit_kind_wrong_p = Filter::new() + .kind(metric_kind) + .id(nostr::EventId::from_hex(event_id).unwrap()) + .custom_tags(p_tag, [other]); + assert!( + !p_gated_filters_authorized(&[explicit_kind_wrong_p], authed), + "kind:44200 + ids + wrong #p must be denied" + ); + + // Case 2: kindless {ids:[...]} — the existing ids exemption applies + // at this filter-authorization gate (consistent with other p-gated kinds). + // The kindless path is closed at the result level by + // `reader_authorized_for_event` (buzz-core/src/filter.rs), which gates + // kind:44200 delivery to the #p owner across all pull paths (WS historical, + // HTTP bridge) and live fan-out. Pass-through here is correct; the + // result-level gate is the enforcement point for this path. + let kindless_ids = Filter::new().id(nostr::EventId::from_hex(event_id).unwrap()); + assert!( + p_gated_filters_authorized(&[kindless_ids], authed), + "kindless ids filter passes this filter gate — result-level gate closes the path" + ); + + // Case 3: owner querying by #p is allowed. + let owner_by_p = Filter::new().kind(metric_kind).custom_tags(p_tag, [authed]); + assert!( + p_gated_filters_authorized(&[owner_by_p], authed), + "kind:44200 with matching #p must be allowed" + ); + + // Case 4: owner querying by #p + ids is allowed. + let owner_p_and_ids = Filter::new() + .kind(metric_kind) + .id(nostr::EventId::from_hex(event_id).unwrap()) + .custom_tags(p_tag, [authed]); + assert!( + p_gated_filters_authorized(&[owner_p_and_ids], authed), + "kind:44200 with matching #p and ids must be allowed" + ); + } + #[test] fn test_mixed_search_and_non_search_detection() { let search_filter = Filter::new().search("hello"); @@ -1642,4 +1749,71 @@ mod tests { .search("x"); assert!(!p_gated_filters_authorized(&[f], &agent)); } + + // ── filter_can_match_result_gated_kinds + result_gated_count_safe_for_pushdown ── + + #[test] + fn result_gated_wildcard_filter_can_match() { + // No kinds constraint — could match anything, including 44200 / 30622. + let f = Filter::new(); + assert!(filter_can_match_result_gated_kinds(&f)); + } + + #[test] + fn result_gated_explicit_44200_can_match() { + let f = Filter::new().kind(nostr::Kind::Custom( + buzz_core::kind::KIND_AGENT_TURN_METRIC as u16, + )); + assert!(filter_can_match_result_gated_kinds(&f)); + } + + #[test] + fn result_gated_explicit_30622_can_match() { + let f = Filter::new().kind(nostr::Kind::Custom( + buzz_core::kind::KIND_DM_VISIBILITY as u16, + )); + assert!(filter_can_match_result_gated_kinds(&f)); + } + + #[test] + fn result_gated_kind_9_only_cannot_match() { + let f = Filter::new().kind(nostr::Kind::TextNote); + assert!(!filter_can_match_result_gated_kinds(&f)); + } + + #[test] + fn result_gated_safe_pushdown_requires_p_self() { + let (owner, _agent, _other) = three_pubkeys(); + let p_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::P); + let f = nostr::Filter::new() + .kind(nostr::Kind::Custom( + buzz_core::kind::KIND_AGENT_TURN_METRIC as u16, + )) + .custom_tags(p_tag, [owner.clone()]); + // Owner querying their own metrics — safe to push down. + assert!(result_gated_count_safe_for_pushdown(&f, &owner)); + } + + #[test] + fn result_gated_safe_pushdown_rejects_when_p_is_other() { + let (owner, _agent, other) = three_pubkeys(); + let p_tag = nostr::SingleLetterTag::lowercase(nostr::Alphabet::P); + let f = nostr::Filter::new() + .kind(nostr::Kind::Custom( + buzz_core::kind::KIND_AGENT_TURN_METRIC as u16, + )) + .custom_tags(p_tag, [other.clone()]); + // Authenticated as owner but #p is someone else — NOT safe. + assert!(!result_gated_count_safe_for_pushdown(&f, &owner)); + } + + #[test] + fn result_gated_safe_pushdown_rejects_when_no_p_tag() { + let (owner, _agent, _other) = three_pubkeys(); + let f = nostr::Filter::new().kind(nostr::Kind::Custom( + buzz_core::kind::KIND_AGENT_TURN_METRIC as u16, + )); + // No #p tag — fallback required. + assert!(!result_gated_count_safe_for_pushdown(&f, &owner)); + } } diff --git a/crates/buzz-search/tests/fts_integration.rs b/crates/buzz-search/tests/fts_integration.rs index 8f16d6c7d..5403514b3 100644 --- a/crates/buzz-search/tests/fts_integration.rs +++ b/crates/buzz-search/tests/fts_integration.rs @@ -2,14 +2,14 @@ //! //! Run with a local PG: `BUZZ_TEST_DATABASE_URL=postgres://buzz:buzz_dev@localhost:5432/buzz cargo test -p buzz-search --tests -- --include-ignored` //! -//! Each test creates a uniquely-named schema, applies the consolidated `0001` -//! migration into it, exercises a scenario, and drops it. Tests are -//! parallel-safe. +//! Each test creates a uniquely-named schema, applies all four migrations in +//! order (0001 → 0002 → 0003 → 0004) into it, exercises a scenario, and drops +//! it. Tests are parallel-safe. use buzz_core::{ kind::{ - AUTHOR_ONLY_KINDS, KIND_MEMBER_ADDED_NOTIFICATION, KIND_MEMBER_REMOVED_NOTIFICATION, - P_GATED_KINDS, + AUTHOR_ONLY_KINDS, KIND_AGENT_TURN_METRIC, KIND_MEMBER_ADDED_NOTIFICATION, + KIND_MEMBER_REMOVED_NOTIFICATION, P_GATED_KINDS, }, CommunityId, }; @@ -18,7 +18,10 @@ use sqlx::{postgres::PgPoolOptions, Executor, PgPool}; use uuid::Uuid; const TEST_DB_URL: &str = "postgres://buzz:buzz_dev@localhost:5432/buzz"; -const MIGRATION_SQL: &str = include_str!("../../../migrations/0001_initial_schema.sql"); +const MIGRATION_0001_SQL: &str = include_str!("../../../migrations/0001_initial_schema.sql"); +const MIGRATION_0002_SQL: &str = include_str!("../../../migrations/0002_git_repo_names.sql"); +const MIGRATION_0003_SQL: &str = include_str!("../../../migrations/0003_community_icon.sql"); +const MIGRATION_0004_SQL: &str = include_str!("../../../migrations/0004_agent_turn_metric_fts.sql"); async fn setup() -> (PgPool, String) { let url = std::env::var("BUZZ_TEST_DATABASE_URL").unwrap_or_else(|_| TEST_DB_URL.to_string()); @@ -43,9 +46,20 @@ async fn setup() -> (PgPool, String) { .connect(&url_with_search_path) .await .expect("connect with search_path"); - pool.execute(MIGRATION_SQL) + // Apply the full migration chain in order so the test schema exactly matches + // production. Future FTS-affecting migrations must be added here. + pool.execute(MIGRATION_0001_SQL) .await .expect("apply 0001 migration"); + pool.execute(MIGRATION_0002_SQL) + .await + .expect("apply 0002 migration"); + pool.execute(MIGRATION_0003_SQL) + .await + .expect("apply 0003 migration"); + pool.execute(MIGRATION_0004_SQL) + .await + .expect("apply 0004 migration"); (pool, schema) } @@ -1058,8 +1072,9 @@ async fn very_long_query_is_bounded_before_pg_parse() { /// - 30622 = `KIND_DM_VISIBILITY` (per-viewer private hide state) /// - 44100 = `KIND_MEMBER_ADDED_NOTIFICATION` (p-gated membership notice) /// - 44101 = `KIND_MEMBER_REMOVED_NOTIFICATION` (p-gated membership notice) +/// - 44200 = `KIND_AGENT_TURN_METRIC` (NIP-AM: p-gated encrypted turn metrics) /// -/// All six events are inserted with the same unique token in their content +/// All seven events are inserted with the same unique token in their content /// so a single search query exercises every kind in one round-trip. Only /// the kind:9 control must surface — the excluded kinds must not. /// @@ -1152,6 +1167,19 @@ async fn excluded_kinds_are_storage_level_unsearchable() { ) .await; + // kind:44200 agent turn metric — p-gated NIP-44 ciphertext and MUST NOT be searchable. + insert_event( + &pool, + c, + rand_bytes32(), + rand_bytes32(), + KIND_AGENT_TURN_METRIC as i32, + &format!("agent turn metric — {token}"), + None, + 1_700_000_006, + ) + .await; + let svc = SearchService::new(pool.clone()); let result = svc .search(&SearchQuery { @@ -1184,6 +1212,7 @@ async fn excluded_kinds_are_storage_level_unsearchable() { 30622, KIND_MEMBER_ADDED_NOTIFICATION as i32, KIND_MEMBER_REMOVED_NOTIFICATION as i32, + KIND_AGENT_TURN_METRIC as i32, ] { assert!( !kinds.contains(&forbidden), @@ -1299,7 +1328,7 @@ async fn author_only_kinds_are_storage_level_unsearchable() { /// L1 NULL tsvector is the unbreakable backstop: `@@` mathematically cannot /// match NULL. This test catches the drift where someone adds a persistent /// kind to `P_GATED_KINDS` without the matching `schema/schema.sql` + -/// `migrations/0001_initial_schema.sql` skip-set update. +/// a new additive migration (see `0004_agent_turn_metric_fts.sql`) skip-set update. /// /// Ephemeral kinds (20000–29999) are skipped: they are never stored, so the /// storage-layer defense does not apply to them regardless of the schema diff --git a/docs/nips/NIP-AM.md b/docs/nips/NIP-AM.md new file mode 100644 index 000000000..ff636fb80 --- /dev/null +++ b/docs/nips/NIP-AM.md @@ -0,0 +1,236 @@ +NIP-AM +====== + +Agent Turn Metrics +------------------ + +`draft` `optional` `relay` + +This NIP defines a durable, encrypted event kind for recording per-turn token +usage and estimated cost of AI agent sessions. An agent publishes one +`kind:44200` event per completed turn, NIP-44 encrypted to its owner, so the +owner can account for token usage across agents and harnesses without the +relay — or any third party — learning what the agent did or what it cost. + +## Motivation + +AI agent harnesses consume model tokens on every turn. Owners running fleets +of agents need durable, harness-independent usage accounting — the equivalent +of a metered bill — for cost attribution, budgeting, and capacity planning. + +[NIP-AO](NIP-AO.md) (kind 24200) already streams encrypted session telemetry +between agent and owner, but it is deliberately ephemeral: relays MUST NOT +persist it, so it cannot answer "how many tokens did my agents use last +week?". Transcript-grade durable telemetry is explicitly out of scope — the +persistence-averse reasoning behind NIP-AO's ephemerality contract applies to +conversation content, not to a small usage record. Kind 44200 stores only the +metric: token counts, an estimated cost, and correlation identifiers, all +encrypted to the owner. + +## Definitions + +- **Agent**: an AI process with its own Nostr keypair, executing sessions on + behalf of an owner. +- **Owner**: the human (or system) whose pubkey the agent was provisioned under. +- **Turn**: one prompt→response cycle of an agent session, as bounded by the + harness (e.g. one ACP `session/prompt` round trip). +- **Turn metric**: a single kind 44200 event recording the usage of one turn. + +## Event + +`kind:44200` is a regular event by Buzz convention (alongside 44100/44101): +stored, +append-only, never replaced. Each completed turn produces exactly one event. + +```json +{ + "kind": 44200, + "pubkey": "", + "created_at": , + "content": "", + "tags": [ + ["p", ""], + ["agent", ""] + ], + "sig": "..." +} +``` + +Events MUST have exactly one `p` tag (the owner) and exactly one `agent` tag +(equal to `pubkey`). The tag layout deliberately mirrors NIP-AO telemetry +frames so existing owner-scoped tooling applies unchanged. + +No channel (`h`) tag is used. The channel a turn served is private usage +metadata and lives inside the encrypted payload; keeping it out of the tags +avoids leaking per-channel activity rates to the relay operator and keeps the +event community-global (owner-scoped) rather than channel-scoped. + +## Encryption + +`content` MUST be encrypted with NIP-44 v2 using `(agent_privkey, +owner_pubkey)` — identical to NIP-AO telemetry. Plaintext SHOULD be zeroized +after encrypt/decrypt. Decrypted payload MUST NOT exceed 65,535 bytes +(payloads are typically well under 1 KB). + +## Decrypted Payload + +The `content` field decrypts to a UTF-8 JSON object: + +```jsonc +{ + "harness": "goose", // REQUIRED: harness identifier + "model": "claude-sonnet-4-5", // model id, or null if unknown + "channelId": "" | null, + "sessionId": "" | null, // REQUIRED when "cumulative" is present + "turnId": "" | null, + "turnSeq": 17 | null, // REQUIRED when "cumulative" is present + "timestamp": "2026-07-01T20:11:03.213Z", // REQUIRED: RFC 3339, end of turn + + // Usage for THIS turn (computed delta). Fields are null when the harness + // does not report them — a null MUST NOT be recorded or summed as zero. + "turn": { + "inputTokens": 1234 | null, + "outputTokens": 567 | null, + "totalTokens": 1801 | null, + "costUsd": 0.0123 | null // estimated + }, + + // Session-cumulative usage as reported at the end of this turn. + "cumulative": { + "inputTokens": 45210 | null, + "outputTokens": 9876 | null, + "totalTokens": 55086 | null, + "costUsd": 0.41 | null // estimated + }, + + // false when the publisher could not observe the previous turn's + // cumulative baseline (e.g. harness restart mid-session), making the + // "turn" object unreliable for this event. + "deltaReliable": true, + + "stopReason": "end_turn" // optional +} +``` + +`harness` and `timestamp` are REQUIRED. All other fields are OPTIONAL or +nullable, except as constrained below. Consumers MUST ignore unknown fields +(forward compatibility). + +### Ordering and delta recomputation + +When a `cumulative` object is present, `sessionId` and `turnSeq` are +REQUIRED. `turnSeq` is a per-session monotonically increasing integer +starting at any value, incremented by the publisher on every published turn +metric for that session; a publisher restart that loses the counter MUST +start a new `sessionId` rather than reuse the old one with a reset `turnSeq`. +Cumulative values form a series only *within* one `sessionId`, ordered by +`turnSeq` — consumers MUST NOT diff cumulative values across different +`sessionId`s, and MUST NOT rely on `created_at` (seconds precision, ambiguous +for same-second turns) for ordering within a session. + +If a consumer recomputing deltas observes a cumulative counter that decreases +between consecutive `turnSeq` values (counter reset, harness bug), it MUST +treat the affected turn's usage as unknown (null), not as negative usage. +Publishers likewise MUST NOT emit negative values in `turn`; when the +computed delta would be negative or the previous baseline is unknown, the +publisher sets the affected `turn` counters to null and `deltaReliable: +false`. + +Where the harness reports only cumulative counters, the publisher computes +`turn` as the difference between consecutive cumulative snapshots within one +session. Consumers doing exact accounting SHOULD prefer recomputing deltas +from consecutive `cumulative` values and treat `turn` as a convenience. + +### Numeric validity and token semantics + +All token counts MUST be non-negative integers. `costUsd` MUST be a finite, +non-negative number. `totalTokens` is the harness- or provider-reported +total when available; publishers MUST NOT derive it by summing `inputTokens` +and `outputTokens` (providers may count categories a simple sum misses) — +when no total is reported, `totalTokens` is null. `inputTokens` is the +inclusive input-side total: where the provider reports cache reads/writes +separately (e.g. Anthropic `cache_read_input_tokens` / +`cache_creation_input_tokens`), the publisher folds them into `inputTokens`. +Publishers MAY additionally report the cache components in optional +`cacheReadTokens` / `cacheWriteTokens` fields inside `turn` and `cumulative`; +when present these are informational subsets of `inputTokens`, not additions +to it. + +`costUsd` values are estimates (provider list prices at publish time, or a +harness-reported estimate). They are advisory, not billing records. + +`stopReason`, when present, MUST be one of `end_turn`, `max_tokens`, +`cancelled`, `error`, `unknown`. Consumers MUST treat unrecognized +`stopReason` values as `unknown`; the token counts remain valid. + +## Publisher Behavior + +- Publish exactly one event per completed turn, at turn completion, including + turns that end in cancellation or error when usage was observed. +- Do NOT publish an event for a turn with no observed usage (all counters + unknown); an all-null metric carries no information. +- `created_at` SHOULD equal the payload `timestamp` truncated to seconds. + +## Relay Behavior + +On receiving a kind 44200 event, a relay MUST: + +1. Validate the event signature per NIP-01. +2. Verify `event.pubkey` equals the `agent` tag and that + `is_agent_owner(agent, owner)` holds for the `p` tag via authenticated + ownership lookup. Tag matching alone is insufficient. +3. Store the event durably, scoped to the owner (community-global; no channel + scope). +4. NOT index the event in any full-text search (the ciphertext is not + searchable and must not enter search indexes). + +Reads MUST be gated: only an authenticated ([NIP-42](42.md)) reader whose +pubkey equals the `#p` tag value may receive the event. This gate applies to +**every** read path, including explicit `ids` filters — knowing an event id +MUST NOT grant access. (Some p-gated kinds exempt id-addressed lookups on the +theory that knowing the id implies authorization; kind 44200 events are +long-lived and their cleartext envelope leaks turn activity, so no such +exemption is permitted.) Unauthenticated publish or subscribe attempts MUST be +rejected with `AUTH required`; authenticated attempts from a pubkey that is not +the event owner MUST be rejected with `restricted:`. + +Relays SHOULD rate-limit kind 44200 to a rate consistent with real turn +frequency (RECOMMENDED: 60 events/minute per agent pubkey). + +## Client Behavior + +Owners recover usage history with: + +```json +{"kinds": [44200], "#p": [""], "since": } +``` + +On receiving an event, a client MUST verify the signature, decrypt with its +own secret key and `event.pubkey`, and ignore events that fail to decrypt or +parse. Clients SHOULD deduplicate by event id. For within-session ordering, +clients MUST use `(sessionId, turnSeq)` from the decrypted payload as +described above; `created_at` is suitable only for coarse time-window +queries. + +## Relationship to Other NIPs + +- [NIP-AO](NIP-AO.md): same agent↔owner encryption and tag scoping, but + ephemeral and transcript-grade. NIP-AM events MUST NOT carry conversation + content, tool calls, or protocol frames — usage numbers and identifiers only. +- [NIP-09](09.md): the authoring agent (or its owner via relay policy) may + request deletion; relays apply standard deletion semantics. +- [NIP-40](40.md): publishers MAY set `expiration` to bound retention. + +## Security Considerations + +**Metadata leakage.** `p`, `agent`, and `created_at` are cleartext: a relay +operator learns that agent X completed turns for owner Y at some rate. Turn +rate is already observable from the agent's channel messages; the token +counts, cost, model, and channel remain encrypted. + +**No forward secrecy.** NIP-44 does not provide forward secrecy; compromise +of the agent's private key allows decryption of captured ciphertexts. + +**Integrity of accounting.** Metrics are self-reported by the agent process. +A compromised agent can under- or over-report. Owners requiring stronger +guarantees must reconcile against provider-side billing. diff --git a/migrations/0004_agent_turn_metric_fts.sql b/migrations/0004_agent_turn_metric_fts.sql new file mode 100644 index 000000000..3ad64a9c8 --- /dev/null +++ b/migrations/0004_agent_turn_metric_fts.sql @@ -0,0 +1,33 @@ +-- ── Exclude kind 44200 (NIP-AM Agent Turn Metrics) from full-text search ────── +-- NIP-AM events carry NIP-44 ciphertext in `content`. Indexing that ciphertext +-- would waste storage and violate the spec's "NOT index the event in any +-- full-text search" requirement. +-- +-- Additive migration: previously applied files must not change checksum. +-- We must DROP the generated column and re-ADD it with the extended exclusion +-- list; ALTER COLUMN cannot change a GENERATED expression in Postgres. +-- +-- Final kind exclusion list after this migration: +-- 1059 = KIND_GIFT_WRAP (NIP-17 ciphertext) +-- 30300 = KIND_EVENT_REMINDER (AUTHOR_ONLY_KINDS — defense in depth) +-- 30622 = KIND_DM_VISIBILITY (per-viewer private hide state) +-- 44100 = KIND_MEMBER_ADDED_NOTIFICATION (p-gated membership notice) +-- 44101 = KIND_MEMBER_REMOVED_NOTIFICATION (p-gated membership notice) +-- 44200 = KIND_AGENT_TURN_METRIC (NIP-AM: p-gated encrypted turn metrics) +-- Constants kept in `buzz_core::kind`; inlined here because a sqlx migration +-- is frozen SQL and cannot import the Rust constant. If a new privacy-sensitive +-- kind is added there, add a new additive migration following this pattern and +-- add a regression test in `buzz-search/tests/fts_integration.rs`. +-- +-- NULL tsvector never matches `@@`, so excluded rows are storage-level +-- unsearchable. + +ALTER TABLE events DROP COLUMN search_tsv; +ALTER TABLE events ADD COLUMN search_tsv TSVECTOR GENERATED ALWAYS AS ( + CASE WHEN kind IN (1059, 30300, 30622, 44100, 44101, 44200) THEN NULL::tsvector + ELSE to_tsvector('simple', content) + END +) STORED; + +-- Recreate the GIN index dropped with the column. +CREATE INDEX idx_events_search_tsv ON events USING GIN (search_tsv); diff --git a/schema/schema.sql b/schema/schema.sql index 1c6fcfb98..3445d58f5 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -206,9 +206,9 @@ CREATE TABLE events ( -- Privacy: encrypted/private routing wrappers and p-gated membership notices -- must never be discoverable through NIP-50 full-text search. NULL tsvector -- never matches `@@`. - -- Keep in sync with migrations/0001_initial_schema.sql. + -- Keep in sync with migrations (final state: 0001 + 0004_agent_turn_metric_fts). search_tsv TSVECTOR GENERATED ALWAYS AS ( - CASE WHEN kind IN (1059, 30300, 30622, 44100, 44101) THEN NULL::tsvector + CASE WHEN kind IN (1059, 30300, 30622, 44100, 44101, 44200) THEN NULL::tsvector ELSE to_tsvector('simple', content) END ) STORED,