diff --git a/Cargo.lock b/Cargo.lock index 5aa78909a..448797730 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,9 +780,11 @@ dependencies = [ "async-trait", "axum", "base64", + "chrono", "getrandom 0.4.2", "hex", "nix 0.31.3", + "nostr", "reqwest 0.13.3", "rmcp", "serde", @@ -794,6 +796,7 @@ dependencies = [ "tracing", "tracing-subscriber", "urlencoding", + "uuid", "webbrowser", ] diff --git a/crates/buzz-acp/src/acp.rs b/crates/buzz-acp/src/acp.rs index 0ba5a8c6a..ea11fce12 100644 --- a/crates/buzz-acp/src/acp.rs +++ b/crates/buzz-acp/src/acp.rs @@ -14,6 +14,7 @@ use tokio::process::{Child, ChildStdin, ChildStdout}; use tokio_util::codec::{FramedRead, LinesCodec, LinesCodecError}; use crate::observer::{ObserverContext, ObserverHandle}; +use crate::usage::{TurnUsage, UsageTracker}; /// Maximum allowed size of a single NDJSON line from the agent's stdout. /// Lines exceeding this limit are rejected to prevent OOM from rogue agents. @@ -167,6 +168,11 @@ pub struct AcpClient { /// outside of a goose-native turn — the read loop's steer arm is /// disabled in that case. steer_rx: Option>, + /// Usage tracker — accumulates cumulative token counts from + /// `_goose/unstable/session/update` notifications and computes per-turn + /// deltas. Both goose and buzz-agent emit this notification; goose gates + /// on client capability advertisement, buzz-agent emits unconditionally. + goose_usage: UsageTracker, } impl AcpClient { @@ -258,6 +264,7 @@ impl AcpClient { observer_context: ObserverContext::default(), active_run_id: None, steer_rx: None, + goose_usage: UsageTracker::default(), }) } @@ -303,7 +310,16 @@ impl AcpClient { // on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges. let params = serde_json::json!({ "protocolVersion": 2, - "clientCapabilities": {}, + "clientCapabilities": { + // Signal to goose that we handle `_goose/unstable/session/update` + // notifications. Without this the custom notification is suppressed + // on goose's side and usage data is never emitted. + "_meta": { + "goose": { + "customNotifications": true + } + } + }, "clientInfo": { "name": "buzz-acp", "version": env!("CARGO_PKG_VERSION") @@ -427,6 +443,11 @@ impl AcpClient { let hard_deadline = tokio::time::Instant::now() + max_duration; self.current_hard_deadline = Some(hard_deadline); + // Mark the usage tracker as in-flight for this turn BEFORE sending the + // prompt so that any setup notifications recorded earlier are not + // misattributed to this turn. + self.goose_usage.begin_turn(session_id); + self.last_prompt_id = Some(self.next_id); let id = self.next_id; self.next_id += 1; @@ -502,6 +523,20 @@ impl AcpClient { self.active_run_id.as_deref() } + /// Consume and return the per-turn usage record computed from the most + /// recent `_goose/unstable/session/update` notification. + /// + /// Returns `None` if no usage update arrived since the last call (i.e. + /// the harness did not emit one for this turn, or this is not a goose + /// agent). Must be called at most once per turn; subsequent calls return + /// `None` until the next `usage_update` notification is recorded. + /// + /// Intended for consumption by `publish_agent_turn_metric` in `pool.rs` to + /// publish a kind 44200 NIP-AM event. + pub fn take_turn_usage(&mut self) -> Option { + self.goose_usage.take() + } + /// Install a per-turn steer request channel for goose-native /// non-cancelling mid-turn delivery. /// @@ -840,6 +875,9 @@ impl AcpClient { "session/update" => { let _ = self.handle_session_update(&msg); } + "_goose/unstable/session/update" => { + self.handle_goose_usage_update(&msg); + } "session/request_permission" => { self.handle_permission_request(&msg).await?; } @@ -1170,6 +1208,9 @@ impl AcpClient { idle_deadline = Instant::now() + idle_timeout; } } + "_goose/unstable/session/update" => { + self.handle_goose_usage_update(&msg); + } "session/request_permission" => { self.handle_permission_request(&msg).await?; } @@ -1311,6 +1352,46 @@ impl AcpClient { } } + /// Parse a `_goose/unstable/session/update` notification and record the + /// usage snapshot in the per-session tracker. + /// + /// Silently ignores malformed or non-`usage_update` variants — the + /// notification is best-effort observability data, not a protocol + /// requirement. Failures are logged at debug level. + fn handle_goose_usage_update(&mut self, msg: &serde_json::Value) { + use crate::usage::{GooseSessionUpdateNotification, GooseSessionUpdateVariant}; + let params = match msg.get("params") { + Some(p) => p, + None => { + tracing::debug!( + target: "acp::usage", + "_goose/unstable/session/update: missing params" + ); + return; + } + }; + match serde_json::from_value::(params.clone()) { + Ok(notif) => { + if let GooseSessionUpdateVariant::UsageUpdate(payload) = ¬if.update { + tracing::debug!( + target: "acp::usage", + session_id = %notif.session_id, + input = payload.accumulated_input_tokens, + output = payload.accumulated_output_tokens, + "goose usage update" + ); + self.goose_usage.record(¬if.session_id, payload); + } + } + Err(e) => { + tracing::debug!( + target: "acp::usage", + "_goose/unstable/session/update: deserialization error: {e}" + ); + } + } + } + /// Auto-approve a `session/request_permission` request from the agent. /// /// Finds the option with `kind == "allow_once"` and responds with its `optionId`. @@ -1782,7 +1863,13 @@ mod tests { "method": "initialize", "params": { "protocolVersion": 2, - "clientCapabilities": {}, + "clientCapabilities": { + "_meta": { + "goose": { + "customNotifications": true + } + } + }, "clientInfo": { "name": "buzz-acp", "version": "0.1.0" @@ -1795,6 +1882,11 @@ mod tests { Some("buzz-acp") ); assert!(msg["params"]["clientCapabilities"].is_object()); + assert_eq!( + msg["params"]["clientCapabilities"]["_meta"]["goose"]["customNotifications"].as_bool(), + Some(true), + "goose customNotifications capability must be advertised" + ); } #[test] @@ -2825,4 +2917,94 @@ mod tests { other => panic!("expected SteerAck::Success, got {other:?}"), } } + + // ── Goose usage notification integration ────────────────────────────── + + /// Build a `_goose/unstable/session/update` JSON-RPC notification. + fn goose_usage_update_msg( + session_id: &str, + input: u64, + output: u64, + cost: Option, + ) -> serde_json::Value { + let mut update = serde_json::json!({ + "sessionUpdate": "usage_update", + "used": input + output, + "contextLimit": 200000u64, + "accumulatedInputTokens": input, + "accumulatedOutputTokens": output, + }); + if let Some(c) = cost { + update["accumulatedCost"] = serde_json::json!(c); + } + serde_json::json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { + "sessionId": session_id, + "update": update + } + }) + } + + #[tokio::test] + async fn goose_usage_notification_recorded_and_take_returns_usage() { + let mut client = spawn_inert_client().await; + assert!(client.take_turn_usage().is_none(), "starts empty"); + + // begin_turn before sending the prompt — mirrors the real call flow. + client.goose_usage.begin_turn("s1"); + let msg = goose_usage_update_msg("s1", 1000, 200, Some(0.01)); + client.handle_goose_usage_update(&msg); + + let usage = client + .take_turn_usage() + .expect("usage should be present after notification"); + assert_eq!(usage.session_id, "s1"); + assert_eq!(usage.turn_seq, 1); + assert!(!usage.delta_reliable, "first turn must be unreliable"); + assert_eq!(usage.cumulative_input_tokens, 1000); + assert_eq!(usage.cumulative_output_tokens, 200); + assert_eq!(usage.cumulative_cost_usd, Some(0.01)); + + // Second take must be None. + assert!( + client.take_turn_usage().is_none(), + "take after drain is None" + ); + } + + #[tokio::test] + async fn goose_usage_second_turn_delta_reliable() { + let mut client = spawn_inert_client().await; + // Turn 1. + client.goose_usage.begin_turn("s2"); + client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1000, 200, None)); + let _ = client.take_turn_usage(); + // Turn 2. + client.goose_usage.begin_turn("s2"); + client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1800, 450, None)); + let usage = client.take_turn_usage().expect("turn 2 usage"); + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + } + + #[tokio::test] + async fn goose_usage_malformed_notification_does_not_panic() { + let mut client = spawn_inert_client().await; + // Missing params entirely. + let bad = serde_json::json!({"jsonrpc":"2.0","method":"_goose/unstable/session/update"}); + client.handle_goose_usage_update(&bad); + assert!(client.take_turn_usage().is_none()); + + // params present but wrong shape. + let bad2 = serde_json::json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { "oops": true } + }); + client.handle_goose_usage_update(&bad2); + assert!(client.take_turn_usage().is_none()); + } } diff --git a/crates/buzz-acp/src/config.rs b/crates/buzz-acp/src/config.rs index 8100ea71a..d139dc6cd 100644 --- a/crates/buzz-acp/src/config.rs +++ b/crates/buzz-acp/src/config.rs @@ -541,7 +541,7 @@ fn validate_multiple_event_handling( Ok(()) } -fn normalize_agent_command_identity(command: &str) -> String { +pub(crate) fn normalize_agent_command_identity(command: &str) -> String { let normalized = command.trim().replace('\\', "/"); let trimmed = normalized.trim_end_matches('/'); let basename = trimmed diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 21fa41cac..f4b1ffd00 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -8,6 +8,9 @@ mod observer; mod pool; mod queue; mod relay; +mod usage; + +pub use usage::TurnUsage; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -1402,6 +1405,7 @@ async fn tokio_main() -> Result<()> { .as_deref() .and_then(|hex| nostr::PublicKey::from_hex(hex).ok()), memory_enabled: config.memory_enabled, + harness_name: crate::config::normalize_agent_command_identity(&config.agent_command), }); if !config.memory_enabled { diff --git a/crates/buzz-acp/src/pool.rs b/crates/buzz-acp/src/pool.rs index b71c60839..83ab51afa 100644 --- a/crates/buzz-acp/src/pool.rs +++ b/crates/buzz-acp/src/pool.rs @@ -391,6 +391,9 @@ pub struct PromptContext { /// `[Agent Memory — core]` section. On by default; disabled via /// `--no-memory` / `BUZZ_ACP_NO_MEMORY`. pub memory_enabled: bool, + /// Harness identity string for NIP-AM `harness` field. Derived from the + /// configured `agent_command` at startup (e.g. `"goose"`, `"buzz-agent"`). + pub harness_name: String, } impl AgentPool { @@ -1539,6 +1542,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1553,6 +1566,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1568,6 +1591,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1582,6 +1615,16 @@ pub async fn run_prompt_task( let retry_batch = requeue_cancelled_batch(&ctx, control_signal, batch); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1626,6 +1669,16 @@ pub async fn run_prompt_task( &source, &control_signal, ); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1676,6 +1729,18 @@ pub async fn run_prompt_task( agent.state.invalidate(&source); } + let core_stop = acp_stop_to_core(&stop_reason); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(core_stop), + ) + .await; + send_prompt_result( &result_tx, agent, @@ -1687,6 +1752,16 @@ pub async fn run_prompt_task( Err(AcpError::AgentExited) => { tracing::error!(target: "pool::prompt", "agent {} exited during prompt", agent.index); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1708,6 +1783,16 @@ pub async fn run_prompt_task( { Ok(stop_reason) => { log_stop_reason(&source, &stop_reason); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; // Timeout triggers respawn in handle_prompt_result — // session state will be discarded with the old agent. send_prompt_result( @@ -1725,6 +1810,16 @@ pub async fn run_prompt_task( agent.index ); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1739,6 +1834,16 @@ pub async fn run_prompt_task( "cancel_with_cleanup error: {e} — invalidating session" ); agent.state.invalidate(&source); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1756,6 +1861,16 @@ pub async fn run_prompt_task( ctx.max_turn_duration.as_secs() ); agent.state.invalidate_all(); + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -1772,6 +1887,16 @@ pub async fn run_prompt_task( if !matches!(e, AcpError::AgentError(_)) { agent.state.invalidate(&source); } + let usage = agent.acp.take_turn_usage(); + publish_agent_turn_metric( + &ctx, + usage, + observer_channel_id, + &session_id, + &turn_id, + Some(buzz_core::agent_turn_metric::StopReason::Error), + ) + .await; send_prompt_result( &result_tx, agent, @@ -2557,6 +2682,131 @@ impl Drop for TurnCompletionGuard { } } +/// Map an ACP `StopReason` to the NIP-AM `StopReason` used in kind 44200 payloads. +fn acp_stop_to_core(r: &StopReason) -> buzz_core::agent_turn_metric::StopReason { + use buzz_core::agent_turn_metric::StopReason as CoreStop; + match r { + StopReason::EndTurn => CoreStop::EndTurn, + StopReason::Cancelled => CoreStop::Cancelled, + StopReason::MaxTokens => CoreStop::MaxTokens, + StopReason::MaxTurnRequests => CoreStop::Unknown, + StopReason::Refusal => CoreStop::Unknown, + } +} + +/// Best-effort: build and publish a `kind:44200` NIP-AM agent turn metric event. +/// +/// Does nothing when `usage` is `None` (goose emitted no usage notification +/// for this turn) or when `owner_pubkey` is unconfigured (no NIP-AO identity). +/// Errors are logged at WARN and never surface to the caller — metric +/// publishing must never fail a turn. +async fn publish_agent_turn_metric( + ctx: &PromptContext, + usage: Option, + channel_id: Option, + session_id: &str, + turn_id: &str, + stop_reason: Option, +) { + use buzz_core::agent_turn_metric::{AgentTurnMetricPayload, TokenCounts}; + use nostr::{EventBuilder, Kind, Tag}; + + let (usage, owner_pk) = match (usage, ctx.agent_owner_pubkey.as_ref()) { + (Some(u), Some(pk)) => (u, pk), + _ => return, + }; + + let turn_counts = if usage.delta_reliable { + Some(TokenCounts { + input_tokens: usage.turn_input_tokens, + output_tokens: usage.turn_output_tokens, + total_tokens: None, + cost_usd: usage.turn_cost_usd, + cache_read_tokens: None, + cache_write_tokens: None, + }) + } else { + None + }; + let cumulative_counts = Some(TokenCounts { + input_tokens: Some(usage.cumulative_input_tokens), + output_tokens: Some(usage.cumulative_output_tokens), + total_tokens: None, + cost_usd: usage.cumulative_cost_usd, + cache_read_tokens: None, + cache_write_tokens: None, + }); + let timestamp = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + let payload = AgentTurnMetricPayload { + harness: ctx.harness_name.clone(), + model: None, + channel_id: channel_id.map(|id| id.to_string()), + session_id: Some(usage.session_id.clone()), + turn_id: Some(turn_id.to_string()), + turn_seq: Some(usage.turn_seq), + timestamp, + turn: turn_counts, + cumulative: cumulative_counts, + delta_reliable: usage.delta_reliable, + stop_reason, + }; + let ciphertext = match buzz_core::agent_turn_metric::encrypt_agent_turn_metric( + &ctx.agent_keys, + owner_pk, + &payload, + ) { + Ok(c) => c, + Err(e) => { + tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: encrypt failed: {e}" + ); + return; + } + }; + let agent_hex = ctx.agent_keys.public_key().to_hex(); + let owner_hex = owner_pk.to_hex(); + let event = match EventBuilder::new( + Kind::Custom(buzz_core::kind::KIND_AGENT_TURN_METRIC as u16), + ciphertext, + ) + .tags([ + Tag::parse(["p", &owner_hex]).expect("p tag"), + Tag::parse(["agent", &agent_hex]).expect("agent tag"), + ]) + .sign_with_keys(&ctx.agent_keys) + { + Ok(e) => e, + Err(e) => { + tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: sign failed: {e}" + ); + return; + } + }; + const METRIC_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); + match tokio::time::timeout(METRIC_TIMEOUT, ctx.rest_client.submit_event(&event)).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: publish failed: {e}" + ), + Err(_) => tracing::warn!( + target: "pool::metrics", + session_id, + turn_id, + "NIP-AM: publish timed out" + ), + } +} + const REACTION_SEEN: &str = "👀"; const REACTION_WORKING: &str = "💬"; @@ -3613,4 +3863,210 @@ mod tests { result.agent.acp.install_steer_rx(steer_rx); // Reaching here without a panic is the test. } + + // ── NIP-AM emit-hook unit tests ──────────────────────────────────────── + + /// `acp_stop_to_core` maps all ACP stop reasons to the correct NIP-AM + /// variants without panicking on any input. + #[test] + fn test_acp_stop_to_core_maps_all_variants() { + use buzz_core::agent_turn_metric::StopReason as CoreStop; + assert_eq!(acp_stop_to_core(&StopReason::EndTurn), CoreStop::EndTurn); + assert_eq!( + acp_stop_to_core(&StopReason::Cancelled), + CoreStop::Cancelled + ); + assert_eq!( + acp_stop_to_core(&StopReason::MaxTokens), + CoreStop::MaxTokens + ); + assert_eq!( + acp_stop_to_core(&StopReason::MaxTurnRequests), + CoreStop::Unknown + ); + assert_eq!(acp_stop_to_core(&StopReason::Refusal), CoreStop::Unknown); + } + + /// `publish_agent_turn_metric` is a no-op when `usage` is `None`. + #[tokio::test] + async fn test_publish_agent_turn_metric_noop_on_no_usage() { + let ctx = make_prompt_context_no_owner(); + // usage = None → early return, no panic. + publish_agent_turn_metric( + &ctx, + None, + None, + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// `publish_agent_turn_metric` is a no-op when `owner_pubkey` is absent. + #[tokio::test] + async fn test_publish_agent_turn_metric_noop_on_no_owner() { + let ctx = make_prompt_context_no_owner(); + let usage = crate::usage::TurnUsage { + session_id: "sess-1".to_string(), + turn_seq: 1, + delta_reliable: true, + turn_input_tokens: Some(100), + turn_output_tokens: Some(50), + turn_cost_usd: None, + cumulative_input_tokens: 100, + cumulative_output_tokens: 50, + cumulative_cost_usd: None, + }; + // owner_pubkey = None → early return, no panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + None, + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// `publish_agent_turn_metric` encrypts the payload when owner is present + /// (the HTTP submit will fail in tests, but we verify no panic and the + /// encrypt/sign path executes). + #[tokio::test] + async fn test_publish_agent_turn_metric_encrypts_with_owner() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + let usage = crate::usage::TurnUsage { + session_id: "sess-1".to_string(), + turn_seq: 1, + delta_reliable: true, + turn_input_tokens: Some(200), + turn_output_tokens: Some(80), + turn_cost_usd: Some(0.001), + cumulative_input_tokens: 200, + cumulative_output_tokens: 80, + cumulative_cost_usd: Some(0.001), + }; + // Will try to publish and fail (no real relay) but must not panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-1", + "turn-1", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + /// Regression for the control-cancel drain: `publish_agent_turn_metric` + /// with a `Cancelled` stop reason and pending usage executes without panic + /// (encrypt+sign path). This mirrors the control-signal arm that previously + /// returned early without draining usage. + #[tokio::test] + async fn test_publish_agent_turn_metric_cancelled_stop_reason() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + let usage = crate::usage::TurnUsage { + session_id: "sess-cancel".to_string(), + turn_seq: 2, + delta_reliable: true, + turn_input_tokens: Some(50), + turn_output_tokens: Some(20), + turn_cost_usd: None, + cumulative_input_tokens: 150, + cumulative_output_tokens: 70, + cumulative_cost_usd: None, + }; + // Must not panic; HTTP submit will fail (no real relay) — that's fine. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-cancel", + "turn-cancel", + Some(buzz_core::agent_turn_metric::StopReason::Cancelled), + ) + .await; + } + + /// `publish_agent_turn_metric` uses `ctx.harness_name` in the payload. + /// A buzz-agent-commanded context must not panic — verifies the harness + /// field flows through encrypt/sign without error. + #[tokio::test] + async fn test_publish_agent_turn_metric_buzz_agent_harness_name() { + let agent_keys = nostr::Keys::generate(); + let owner_keys = nostr::Keys::generate(); + let mut ctx = make_prompt_context_with_owner(&agent_keys, owner_keys.public_key()); + ctx.harness_name = "buzz-agent".to_string(); + let usage = crate::usage::TurnUsage { + session_id: "sess-ba".to_string(), + turn_seq: 1, + delta_reliable: false, // first turn from buzz-agent + turn_input_tokens: None, + turn_output_tokens: None, + turn_cost_usd: None, + cumulative_input_tokens: 400, + cumulative_output_tokens: 100, + cumulative_cost_usd: None, + }; + // Will try to publish (encrypt succeeds) and fail HTTP (no relay) — must not panic. + publish_agent_turn_metric( + &ctx, + Some(usage), + Some(uuid::Uuid::new_v4()), + "sess-ba", + "turn-ba", + Some(buzz_core::agent_turn_metric::StopReason::EndTurn), + ) + .await; + } + + fn make_prompt_context_no_owner() -> PromptContext { + let agent_keys = nostr::Keys::generate(); + make_prompt_context_impl(&agent_keys, None) + } + + fn make_prompt_context_with_owner( + agent_keys: &nostr::Keys, + owner_pubkey: nostr::PublicKey, + ) -> PromptContext { + make_prompt_context_impl(agent_keys, Some(owner_pubkey)) + } + + fn make_prompt_context_impl( + agent_keys: &nostr::Keys, + owner_pubkey: Option, + ) -> PromptContext { + use crate::relay::RestClient; + PromptContext { + mcp_servers: vec![], + initial_message: None, + idle_timeout: Duration::from_secs(60), + max_turn_duration: Duration::from_secs(120), + turn_liveness_interval: Duration::ZERO, + dedup_mode: DedupMode::Drop, + system_prompt: None, + heartbeat_prompt: None, + base_prompt: None, + cwd: ".".to_string(), + rest_client: RestClient { + http: reqwest::Client::new(), + base_url: "http://127.0.0.1:0".to_string(), + keys: agent_keys.clone(), + auth_tag_json: None, + }, + channel_info: std::collections::HashMap::new(), + context_message_limit: 0, + max_turns_per_session: 0, + permission_mode: PermissionMode::Default, + agent_keys: agent_keys.clone(), + agent_owner_pubkey: owner_pubkey, + memory_enabled: false, + harness_name: "goose".to_string(), + } + } } diff --git a/crates/buzz-acp/src/usage.rs b/crates/buzz-acp/src/usage.rs new file mode 100644 index 000000000..db28d6570 --- /dev/null +++ b/crates/buzz-acp/src/usage.rs @@ -0,0 +1,671 @@ +//! Usage tracking for NIP-AM agent turn metrics. +//! +//! Agents that support usage reporting emit a `_goose/unstable/session/update` +//! notification (with `sessionUpdate: "usage_update"`) at the end of every +//! turn. Both goose and buzz-agent use this same wire format. The payload +//! carries session-cumulative token counts from which we derive per-turn +//! deltas. +//! +//! # Delta computation +//! +//! Because goose only reports cumulative counters, the per-turn counts are +//! computed as `current − previous`. Three cases require special handling per +//! NIP-AM: +//! +//! 1. **First turn (no prior baseline):** delta unknown → `null` counts, +//! `delta_reliable: false`. +//! 2. **Counter decrease** (harness restart, overflow): delta would be +//! negative → `null` counts, `delta_reliable: false`. +//! 3. **Session restart** (caller supplies a new `session_id` not seen +//! before): treated as case 1 — fresh baseline, no delta for this turn. +//! +//! The `TurnUsage` produced after each turn is consumed by the +//! `TurnCompletionGuard` in `pool.rs` to publish a kind 44200 relay event. + +use std::collections::HashMap; + +/// Wire-format deserialization for `_goose/unstable/session/update` params. +/// +/// Method: `_goose/unstable/session/update` +/// Shape (camelCase on the wire): +/// ```json +/// { +/// "sessionId": "...", +/// "update": { +/// "sessionUpdate": "usage_update", +/// "used": 12345, +/// "contextLimit": 200000, +/// "accumulatedInputTokens": 10000, +/// "accumulatedOutputTokens": 2345, +/// "accumulatedCost": 0.0234 +/// } +/// } +/// ``` +/// +/// `used` and `contextLimit` are optional because buzz-agent does not track a +/// context window limit; the fields are present when goose emits them. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct GooseSessionUpdateNotification { + pub session_id: String, + pub update: GooseSessionUpdateVariant, +} + +/// Discriminated union matching goose's `GooseSessionUpdate` enum on the wire. +/// We only care about `usage_update`; other variants are ignored. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(tag = "sessionUpdate", rename_all = "snake_case")] +pub(crate) enum GooseSessionUpdateVariant { + UsageUpdate(UsageUpdatePayload), + #[serde(other)] + Other, +} + +/// The `usage_update` payload. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct UsageUpdatePayload { + /// Total tokens used (context-usage proxy). Optional — buzz-agent omits + /// this field or sends 0 because it does not track a context window limit. + #[serde(default)] + #[allow(dead_code)] + pub used: u64, + /// Context window size. Optional — buzz-agent omits this field. + #[serde(default)] + #[allow(dead_code)] + pub context_limit: u64, + pub accumulated_input_tokens: u64, + pub accumulated_output_tokens: u64, + pub accumulated_cost: Option, +} + +/// Per-session normalization state: the last cumulative snapshot we saw. +#[derive(Debug, Clone)] +struct SessionState { + /// Monotonically increasing per-session turn counter (1-based, incremented + /// on every recorded update). + turn_seq: u64, + /// Cumulative input tokens at the end of the previous turn. + last_input: u64, + /// Cumulative output tokens at the end of the previous turn. + last_output: u64, + /// Cumulative cost at the end of the previous turn. + last_cost: Option, +} + +/// Per-turn usage record exposed to `TurnCompletionGuard` for NIP-AM publishing. +/// +/// `turn_*` fields are `None` when delta is unreliable (first turn or counter +/// decrease). `cumulative_*` fields are always present when the agent reports them. +#[derive(Debug, Clone)] +pub struct TurnUsage { + /// Goose session id (maps to NIP-AM `sessionId`). + pub session_id: String, + /// Per-session monotonic sequence number for this turn (maps to NIP-AM `turnSeq`). + pub turn_seq: u64, + /// Whether the `turn_*` delta fields are reliable. + pub delta_reliable: bool, + /// Per-turn input token delta; `None` when unreliable. + pub turn_input_tokens: Option, + /// Per-turn output token delta; `None` when unreliable. + pub turn_output_tokens: Option, + /// Per-turn cost delta (`current − previous`); `None` when unreliable or + /// either snapshot is missing. + pub turn_cost_usd: Option, + /// Session-cumulative input tokens as reported by goose at end of turn. + pub cumulative_input_tokens: u64, + /// Session-cumulative output tokens as reported by goose at end of turn. + pub cumulative_output_tokens: u64, + /// Session-cumulative estimated cost in USD; `None` if goose did not report it. + pub cumulative_cost_usd: Option, +} + +/// Tracks per-session cumulative usage state across turns. +/// +/// Cheap to construct. Usage lifecycle per turn: +/// +/// 1. **`begin_turn(session_id)`** — call this immediately before sending +/// `session/prompt`. Marks the tracker as in-flight for the given session +/// and clears any leftover pending record from a previous turn. Setup +/// notifications that arrive *before* the first `begin_turn` (e.g. during +/// `session/new` setup) will still update the cumulative baseline but will +/// NOT produce a publishable record. +/// 2. **`record(session_id, payload)`** — called for each +/// `_goose/unstable/session/update` notification. Always updates the +/// cumulative baseline; only produces a publishable record when a turn is +/// currently in-flight for the matching session. +/// 3. **`take()`** — called at turn completion by `TurnCompletionGuard`. +/// Drains and returns the pending record (or `None` if no usage was emitted +/// for this turn) and clears the in-flight marker. +#[derive(Debug, Default)] +pub(crate) struct UsageTracker { + /// One entry per goose `sessionId` ever seen in this process. + sessions: HashMap, + /// The session that currently has an in-flight `session/prompt`. + /// `None` means no prompt is in flight; `record()` will still update + /// the baseline but will not set `pending`. + in_flight_session: Option, + /// The most recently computed turn usage, ready for `take()`. + pending: Option, +} + +impl UsageTracker { + /// Mark the start of a new prompt turn for `session_id`. + /// + /// Clears any leftover `pending` record and records which session is + /// in-flight. Must be called before the corresponding `session/prompt` + /// request is sent so that setup notifications received before this call + /// do not become publishable for this turn. + pub(crate) fn begin_turn(&mut self, session_id: &str) { + self.in_flight_session = Some(session_id.to_string()); + self.pending = None; + } + + /// Process a `usage_update` notification payload. + /// + /// **Always** updates the cumulative baseline for `session_id` so that the + /// next in-flight turn can compute a correct delta even if this notification + /// arrived outside a turn (e.g. during `session/new` setup). + /// + /// Only produces a publishable `pending` record when a turn is currently + /// in-flight for the matching `session_id`. If `in_flight_session` is + /// `None` or refers to a different session, the baseline is updated but + /// `pending` is left unchanged. + /// + /// When multiple notifications arrive during the same turn, the last one + /// wins (goose may emit several per turn; each increments `turn_seq`). + pub(crate) fn record(&mut self, session_id: &str, payload: &UsageUpdatePayload) { + let current_input = payload.accumulated_input_tokens; + let current_output = payload.accumulated_output_tokens; + let current_cost = payload.accumulated_cost; + + let (delta_reliable, turn_input, turn_output, turn_cost, turn_seq) = + match self.sessions.get(session_id) { + None => { + // First notification for this session — no baseline yet. + (false, None, None, None, 1u64) + } + Some(prev) => { + let seq = prev.turn_seq + 1; + // Token counter decrease → unreliable delta. + if current_input < prev.last_input || current_output < prev.last_output { + (false, None, None, None, seq) + } else { + let di = current_input - prev.last_input; + let dout = current_output - prev.last_output; + // Cost delta: only when both snapshots have cost. + // A cost *decrease* is also unreliable (NIP-AM: negative + // delta ⇒ delta_reliable false, null all turn fields). + let (dc, cost_reliable) = match (current_cost, prev.last_cost) { + (Some(c), Some(p)) if c >= p => (Some(c - p), true), + (Some(_), Some(_)) => { + // Both present but current < prev — counter decreased. + (None, false) + } + _ => (None, true), // absent on either side: null cost, reliable tokens + }; + if cost_reliable { + (true, Some(di), Some(dout), dc, seq) + } else { + // Cost decrease overrides the whole record to unreliable. + (false, None, None, None, seq) + } + } + } + }; + + // Always advance the session baseline so the next in-flight turn can + // compute a correct delta even if this notification is from setup. + self.sessions.insert( + session_id.to_string(), + SessionState { + turn_seq, + last_input: current_input, + last_output: current_output, + last_cost: current_cost, + }, + ); + + // Only publish a pending record if this session is currently in-flight. + if self.in_flight_session.as_deref() == Some(session_id) { + self.pending = Some(TurnUsage { + session_id: session_id.to_string(), + turn_seq, + delta_reliable, + turn_input_tokens: turn_input, + turn_output_tokens: turn_output, + turn_cost_usd: turn_cost, + cumulative_input_tokens: current_input, + cumulative_output_tokens: current_output, + cumulative_cost_usd: current_cost, + }); + } + } + + /// Consume and return the most recently computed turn usage record, then + /// clear the in-flight marker. + /// + /// Returns `None` if no `usage_update` arrived during the current in-flight + /// turn (the agent did not emit usage, or no `begin_turn` was called). The + /// caller (`TurnCompletionGuard`) must handle `None`. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn take(&mut self) -> Option { + self.in_flight_session = None; + self.pending.take() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn payload(input: u64, output: u64, cost: Option) -> UsageUpdatePayload { + UsageUpdatePayload { + used: input + output, + context_limit: 200_000, + accumulated_input_tokens: input, + accumulated_output_tokens: output, + accumulated_cost: cost, + } + } + + fn payload_no_context(input: u64, output: u64, cost: Option) -> UsageUpdatePayload { + UsageUpdatePayload { + used: 0, + context_limit: 0, + accumulated_input_tokens: input, + accumulated_output_tokens: output, + accumulated_cost: cost, + } + } + + // ── Turn scoping: setup notifications must not pollute the first real turn ─ + + #[test] + fn setup_notification_before_begin_turn_returns_none() { + // Regression: setup notifications fire during session/new (before any + // prompt). They must update the baseline but must NOT produce a + // publishable record for the next turn. + let mut tracker = UsageTracker::default(); + + // Simulate a setup notification (no begin_turn called yet). + tracker.record("sess-setup", &payload(500, 100, Some(0.005))); + // No turn is in-flight — pending must stay None. + assert!( + tracker.pending.is_none(), + "setup notification must not set pending before begin_turn" + ); + + // The zero-update turn: begin_turn, no notification during prompt, take. + tracker.begin_turn("sess-setup"); + let result = tracker.take(); + assert!( + result.is_none(), + "zero-update turn after setup must return None" + ); + + // Baseline was still updated: the next real turn gets a correct delta. + tracker.begin_turn("sess-setup"); + tracker.record("sess-setup", &payload(1200, 300, Some(0.012))); + let usage = tracker.take().expect("second turn must have usage"); + + assert!( + usage.delta_reliable, + "baseline fed by setup: delta reliable" + ); + assert_eq!(usage.turn_input_tokens, Some(700)); // 1200 - 500 + assert_eq!(usage.turn_output_tokens, Some(200)); // 300 - 100 + let dc = usage.turn_cost_usd.expect("cost delta present"); + assert!((dc - 0.007).abs() < 1e-9, "cost delta: {dc}"); + } + + #[test] + fn record_outside_in_flight_does_not_clobber_pending() { + // A notification for a different session_id while another is in-flight + // must not overwrite the pending record. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(1000, 200, None)); + + // Notification for a different session — should not touch pending. + tracker.record("sess-b", &payload(9000, 3000, None)); + + let usage = tracker.take().expect("sess-a pending must survive"); + assert_eq!(usage.session_id, "sess-a"); + } + + // ── Delta computation: non-happy paths ───────────────────────────────── + + #[test] + fn first_turn_no_prior_delta_unreliable() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-1"); + tracker.record("sess-1", &payload(1000, 200, Some(0.01))); + let usage = tracker.take().expect("should have pending usage"); + + assert_eq!(usage.session_id, "sess-1"); + assert_eq!(usage.turn_seq, 1); + assert!( + !usage.delta_reliable, + "first turn: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none()); + assert!(usage.turn_output_tokens.is_none()); + assert!(usage.turn_cost_usd.is_none()); + // Cumulative is still populated. + assert_eq!(usage.cumulative_input_tokens, 1000); + assert_eq!(usage.cumulative_output_tokens, 200); + assert_eq!(usage.cumulative_cost_usd, Some(0.01)); + } + + #[test] + fn counter_decrease_delta_unreliable_no_negatives() { + let mut tracker = UsageTracker::default(); + // Turn 1 — establish baseline. + tracker.begin_turn("sess-2"); + tracker.record("sess-2", &payload(5000, 1000, Some(0.05))); + let _ = tracker.take(); + + // Turn 2 — counter decreased (harness restart simulation). + tracker.begin_turn("sess-2"); + tracker.record("sess-2", &payload(100, 50, Some(0.001))); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.turn_seq, 2); + assert!( + !usage.delta_reliable, + "counter decrease: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none(), "no negative delta"); + assert!(usage.turn_output_tokens.is_none(), "no negative delta"); + assert!(usage.turn_cost_usd.is_none()); + } + + #[test] + fn cost_decrease_sets_delta_unreliable_and_nulls_all_turn_fields() { + // Regression for Thufir fix 2: cost counter decrease must set + // delta_reliable = false and null all turn fields (not just cost). + // turn_seq still increments (NIP-AM: seq advances even on unreliable). + let mut tracker = UsageTracker::default(); + // Turn 1 — establish baseline with cost. + tracker.begin_turn("sess-cost"); + tracker.record("sess-cost", &payload(1000, 200, Some(0.10))); + let t1 = tracker.take().expect("t1"); + assert_eq!(t1.turn_seq, 1); + + // Turn 2 — tokens monotone, but cost decreased. + tracker.begin_turn("sess-cost"); + tracker.record("sess-cost", &payload(1500, 350, Some(0.05))); + let usage = tracker.take().expect("t2"); + + assert_eq!(usage.turn_seq, 2, "turn_seq must still increment"); + assert!( + !usage.delta_reliable, + "cost decrease: delta must be unreliable" + ); + assert!( + usage.turn_input_tokens.is_none(), + "all turn fields null on unreliable" + ); + assert!(usage.turn_output_tokens.is_none()); + assert!(usage.turn_cost_usd.is_none()); + // Cumulative values are unaffected. + assert_eq!(usage.cumulative_input_tokens, 1500); + assert_eq!(usage.cumulative_output_tokens, 350); + assert_eq!(usage.cumulative_cost_usd, Some(0.05)); + } + + #[test] + fn cost_absent_on_one_side_leaves_tokens_reliable() { + // Cost merely absent on either side: null cost, reliable tokens. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-nocost"); + tracker.record("sess-nocost", &payload(1000, 200, Some(0.01))); + let _ = tracker.take(); + + // Turn 2 — no cost reported this time. + tracker.begin_turn("sess-nocost"); + tracker.record("sess-nocost", &payload(1800, 450, None)); + let usage = tracker.take().expect("pending"); + + assert!( + usage.delta_reliable, + "absent cost must not make delta unreliable" + ); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + assert!( + usage.turn_cost_usd.is_none(), + "cost null when absent on either side" + ); + } + + #[test] + fn session_restart_new_session_id_treated_as_first_turn() { + let mut tracker = UsageTracker::default(); + // Original session. + tracker.begin_turn("sess-a"); + tracker.record("sess-a", &payload(8000, 2000, None)); + let _ = tracker.take(); + + // New session_id — restart. Must behave like a first turn. + tracker.begin_turn("sess-b"); + tracker.record("sess-b", &payload(500, 100, None)); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.session_id, "sess-b"); + assert_eq!(usage.turn_seq, 1); + assert!( + !usage.delta_reliable, + "new session: delta must be unreliable" + ); + assert!(usage.turn_input_tokens.is_none()); + } + + // ── Happy path ───────────────────────────────────────────────────────── + + #[test] + fn second_turn_delta_computed_correctly() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-3"); + tracker.record("sess-3", &payload(1000, 200, Some(0.01))); + let _ = tracker.take(); + + tracker.begin_turn("sess-3"); + tracker.record("sess-3", &payload(1800, 450, Some(0.018))); + let usage = tracker.take().expect("pending"); + + assert_eq!(usage.turn_seq, 2); + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(800)); + assert_eq!(usage.turn_output_tokens, Some(250)); + // cost delta: 0.018 - 0.01 = 0.008 (floating-point; use approx check) + let dc = usage.turn_cost_usd.expect("cost delta present"); + assert!((dc - 0.008).abs() < 1e-9, "cost delta: {dc}"); + assert_eq!(usage.cumulative_input_tokens, 1800); + assert_eq!(usage.cumulative_output_tokens, 450); + } + + #[test] + fn take_returns_none_after_drain() { + let mut tracker = UsageTracker::default(); + tracker.begin_turn("sess-4"); + tracker.record("sess-4", &payload(100, 20, None)); + let _ = tracker.take(); + assert!(tracker.take().is_none(), "take after drain must be None"); + } + + #[test] + fn last_update_wins_multiple_updates_same_turn() { + let mut tracker = UsageTracker::default(); + // Turn 1 — baseline. + tracker.begin_turn("sess-5"); + tracker.record("sess-5", &payload(1000, 100, None)); + let _ = tracker.take(); + + // Two updates arrive before take() — each advances state independently; + // the second delta is computed from the first update's snapshot. + tracker.begin_turn("sess-5"); + tracker.record("sess-5", &payload(1500, 150, None)); + tracker.record("sess-5", &payload(2000, 250, None)); + let usage = tracker.take().expect("pending"); + + // Cumulative from the last update. + assert_eq!(usage.cumulative_input_tokens, 2000); + assert_eq!(usage.cumulative_output_tokens, 250); + // Delta is from the previous intermediate snapshot (1500, 150) → (2000, 250). + assert_eq!(usage.turn_input_tokens, Some(500)); + assert_eq!(usage.turn_output_tokens, Some(100)); + } + + // ── Wire deserialization ──────────────────────────────────────────────── + + #[test] + fn notification_deserializes_from_wire_json() { + let raw = serde_json::json!({ + "sessionId": "abc-123", + "update": { + "sessionUpdate": "usage_update", + "used": 50000, + "contextLimit": 200000, + "accumulatedInputTokens": 40000, + "accumulatedOutputTokens": 10000, + "accumulatedCost": 0.42 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + assert_eq!(notif.session_id, "abc-123"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert_eq!(p.accumulated_input_tokens, 40000); + assert_eq!(p.accumulated_output_tokens, 10000); + assert_eq!(p.accumulated_cost, Some(0.42)); + } + GooseSessionUpdateVariant::Other => panic!("expected UsageUpdate"), + } + } + + #[test] + fn notification_deserializes_without_used_and_context_limit() { + // buzz-agent emits usage_update without used/contextLimit. + let raw = serde_json::json!({ + "sessionId": "buzz-sess", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 500, + "accumulatedOutputTokens": 100 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert_eq!(p.accumulated_input_tokens, 500); + assert_eq!(p.accumulated_output_tokens, 100); + assert_eq!(p.used, 0); + assert_eq!(p.context_limit, 0); + assert!(p.accumulated_cost.is_none()); + } + GooseSessionUpdateVariant::Other => panic!("expected UsageUpdate"), + } + } + + #[test] + fn other_variant_deserializes_without_error() { + let raw = serde_json::json!({ + "sessionId": "xyz", + "update": { + "sessionUpdate": "status_message", + "status": { "type": "notice", "message": "hi" } + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + assert!(matches!(notif.update, GooseSessionUpdateVariant::Other)); + } + + #[test] + fn missing_accumulated_cost_is_none() { + let raw = serde_json::json!({ + "sessionId": "s", + "update": { + "sessionUpdate": "usage_update", + "used": 100, + "contextLimit": 200000, + "accumulatedInputTokens": 80, + "accumulatedOutputTokens": 20 + } + }); + let notif: GooseSessionUpdateNotification = + serde_json::from_value(raw).expect("deserialization"); + match notif.update { + GooseSessionUpdateVariant::UsageUpdate(p) => { + assert!(p.accumulated_cost.is_none()); + } + _ => panic!("expected UsageUpdate"), + } + } + + #[test] + fn buzz_agent_notification_flows_through_tracker() { + // End-to-end: a buzz-agent-shaped usage_update (no used/contextLimit) + // deserializes and flows through UsageTracker to produce correct TurnUsage. + let raw1 = serde_json::json!({ + "sessionId": "buzz-s1", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 300, + "accumulatedOutputTokens": 80 + } + }); + let raw2 = serde_json::json!({ + "sessionId": "buzz-s1", + "update": { + "sessionUpdate": "usage_update", + "accumulatedInputTokens": 700, + "accumulatedOutputTokens": 150 + } + }); + + let mut tracker = UsageTracker::default(); + + // Turn 1 — first turn, delta unreliable. + tracker.begin_turn("buzz-s1"); + let notif1: GooseSessionUpdateNotification = serde_json::from_value(raw1).expect("deser"); + if let GooseSessionUpdateVariant::UsageUpdate(p) = notif1.update { + tracker.record("buzz-s1", &p); + } + let t1 = tracker.take().expect("turn 1"); + assert!(!t1.delta_reliable, "first turn: unreliable"); + assert_eq!(t1.cumulative_input_tokens, 300); + + // Turn 2 — delta reliable. + tracker.begin_turn("buzz-s1"); + let notif2: GooseSessionUpdateNotification = serde_json::from_value(raw2).expect("deser"); + if let GooseSessionUpdateVariant::UsageUpdate(p) = notif2.update { + tracker.record("buzz-s1", &p); + } + let t2 = tracker.take().expect("turn 2"); + assert!(t2.delta_reliable, "second turn: reliable"); + assert_eq!(t2.turn_input_tokens, Some(400)); // 700 - 300 + assert_eq!(t2.turn_output_tokens, Some(70)); // 150 - 80 + } + + #[test] + fn buzz_agent_payload_no_context_fields_processes_correctly() { + // UsageTracker handles payloads with used=0 / context_limit=0 correctly. + let mut tracker = UsageTracker::default(); + tracker.begin_turn("s"); + tracker.record("s", &payload_no_context(1000, 200, None)); + let _ = tracker.take(); + + tracker.begin_turn("s"); + tracker.record("s", &payload_no_context(1500, 300, None)); + let usage = tracker.take().expect("pending"); + + assert!(usage.delta_reliable); + assert_eq!(usage.turn_input_tokens, Some(500)); + assert_eq!(usage.turn_output_tokens, Some(100)); + } +} diff --git a/crates/buzz-agent/Cargo.toml b/crates/buzz-agent/Cargo.toml index 7889ad34a..720f0785a 100644 --- a/crates/buzz-agent/Cargo.toml +++ b/crates/buzz-agent/Cargo.toml @@ -43,6 +43,9 @@ hex = { workspace = true } sha2 = { workspace = true } urlencoding = "2" webbrowser = "1" +nostr = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { version = "0.31", default-features = false, features = ["signal", "process"] } diff --git a/crates/buzz-agent/src/agent.rs b/crates/buzz-agent/src/agent.rs index 7474f0e4b..28c691d81 100644 --- a/crates/buzz-agent/src/agent.rs +++ b/crates/buzz-agent/src/agent.rs @@ -56,6 +56,12 @@ pub struct RunCtx<'a> { /// which the exact-but-stale token count would otherwise miss. Cleared and /// preserved in lockstep with `last_request_input_tokens`. pub last_request_history_bytes: &'a mut Option, + /// Accumulated input tokens across all LLM rounds in this turn, for + /// NIP-AM metric publishing. Reset to `None` at turn start in `run()`. + pub turn_input_tokens: &'a mut Option, + /// Accumulated output tokens across all LLM rounds in this turn, for + /// NIP-AM metric publishing. Reset to `None` at turn start in `run()`. + pub turn_output_tokens: &'a mut Option, } impl RunCtx<'_> { @@ -71,6 +77,10 @@ impl RunCtx<'_> { } self.history.push(HistoryItem::User(user_text)); + // Reset per-turn token accumulators for this prompt. + *self.turn_input_tokens = None; + *self.turn_output_tokens = None; + let mut round = 0u32; // Per-prompt latch: only used to detect "LLM said end_turn twice // in a row with no tool calls between" within this single prompt. @@ -158,6 +168,14 @@ impl RunCtx<'_> { .map(HistoryItem::context_pressure_bytes) .sum(), ); + // Accumulate per-turn input tokens for NIP-AM metric publishing. + *self.turn_input_tokens = + Some(self.turn_input_tokens.unwrap_or(0).saturating_add(tokens)); + } + // Accumulate per-turn output tokens for NIP-AM metric publishing. + if let Some(out) = response.output_tokens { + *self.turn_output_tokens = + Some(self.turn_output_tokens.unwrap_or(0).saturating_add(out)); } if !response.reasoning.is_empty() { diff --git a/crates/buzz-agent/src/lib.rs b/crates/buzz-agent/src/lib.rs index 5cc8e0b4f..9c97b6b91 100644 --- a/crates/buzz-agent/src/lib.rs +++ b/crates/buzz-agent/src/lib.rs @@ -30,9 +30,9 @@ use crate::llm::Llm; use crate::mcp::McpRegistry; use crate::types::{ContentBlock, HistoryItem}; use crate::wire::{ - classify, Inbound, InitializeParams, SessionCancelParams, SessionNewParams, - SessionPromptParams, SessionSteerParams, WireMsg, WireSender, INVALID_PARAMS, METHOD_NOT_FOUND, - PARSE_ERROR, + classify, goose_session_update, Inbound, InitializeParams, SessionCancelParams, + SessionNewParams, SessionPromptParams, SessionSteerParams, WireMsg, WireSender, INVALID_PARAMS, + METHOD_NOT_FOUND, PARSE_ERROR, }; struct App { @@ -71,6 +71,12 @@ struct Session { /// with it so the gate can account for history appended since. last_request_history_bytes: Option, effective_system_prompt: Arc, + /// Session-cumulative input tokens across all turns. Sent in the + /// `_goose/unstable/session/update` usage notification so buzz-acp's + /// `UsageTracker` can compute per-turn deltas symmetrically with goose. + accumulated_input_tokens: u64, + /// Session-cumulative output tokens across all turns. + accumulated_output_tokens: u64, } fn die(msg: String) -> ! { @@ -365,6 +371,8 @@ async fn session_new(app: &Arc, id: Value, params: Value, wire_tx: &WireSen last_request_input_tokens: None, last_request_history_bytes: None, effective_system_prompt, + accumulated_input_tokens: 0, + accumulated_output_tokens: 0, }, ); drop(sessions); @@ -512,6 +520,8 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender ), ) .await; + let mut turn_input_tokens: Option = None; + let mut turn_output_tokens: Option = None; let mut ctx = RunCtx { cfg: &app.cfg, session_id: &sid, @@ -528,6 +538,8 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender stop_rejections: &mut stop_rejections, last_request_input_tokens: &mut last_request_input_tokens, last_request_history_bytes: &mut last_request_history_bytes, + turn_input_tokens: &mut turn_input_tokens, + turn_output_tokens: &mut turn_output_tokens, }; let result = ctx.run(p.prompt).await; if let Some(s) = app.sessions.lock().await.get_mut(&sid) { @@ -542,6 +554,50 @@ async fn run_prompt(app: Arc, id: Value, params: Value, wire_tx: WireSender s.last_request_input_tokens = last_request_input_tokens; s.last_request_history_bytes = last_request_history_bytes; } + // Update session-cumulative token counters and emit the usage notification + // BEFORE sending the session/prompt response. buzz-acp's UsageTracker + // processes the notification while the turn is still in-flight (i.e. before + // the response triggers take_turn_usage()), which is required for the + // begin_turn gate to recognise it as publishable. + // + // Only emit when at least one token count was observed — a turn with no + // provider response (validation failure, pre-response cancellation) carries + // no information and must not produce a kind 44200 record per NIP-AM. + if turn_input_tokens.is_some() || turn_output_tokens.is_some() { + let (accumulated_in, accumulated_out) = { + let mut sessions = app.sessions.lock().await; + if let Some(s) = sessions.get_mut(&sid) { + s.accumulated_input_tokens = s + .accumulated_input_tokens + .saturating_add(turn_input_tokens.unwrap_or(0)); + s.accumulated_output_tokens = s + .accumulated_output_tokens + .saturating_add(turn_output_tokens.unwrap_or(0)); + (s.accumulated_input_tokens, s.accumulated_output_tokens) + } else { + ( + turn_input_tokens.unwrap_or(0), + turn_output_tokens.unwrap_or(0), + ) + } + }; + wire::send( + &wire_tx, + goose_session_update( + &sid, + json!({ + "sessionUpdate": "usage_update", + // used: total tokens as a context-usage proxy; + // contextLimit: 0 (buzz-agent has no context limit tracking). + "used": accumulated_in.saturating_add(accumulated_out), + "contextLimit": 0u64, + "accumulatedInputTokens": accumulated_in, + "accumulatedOutputTokens": accumulated_out, + }), + ), + ) + .await; + } match result { Ok(stop) => { wire::send( diff --git a/crates/buzz-agent/src/llm.rs b/crates/buzz-agent/src/llm.rs index 628449db2..40598668a 100644 --- a/crates/buzz-agent/src/llm.rs +++ b/crates/buzz-agent/src/llm.rs @@ -708,11 +708,13 @@ fn parse_responses(v: Value) -> Result { _ => ProviderStop::Other, }; let input_tokens = sum_usage(&v, &["input_tokens"]); + let output_tokens = sum_usage(&v, &["output_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -811,11 +813,13 @@ fn parse_anthropic(v: Value) -> Result { } } let input_tokens = anthropic_input_tokens(&v); + let output_tokens = sum_usage(&v, &["output_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -860,11 +864,13 @@ fn parse_openai(v: Value) -> Result { } } let input_tokens = openai_chat_input_tokens(&v); + let output_tokens = sum_usage(&v, &["completion_tokens"]); Ok(LlmResponse { text, tool_calls, stop, input_tokens, + output_tokens, reasoning, }) } @@ -1858,4 +1864,75 @@ mod tests { let src = StaticTokenSource::new("static-key"); assert_eq!(src.refresh_now("rejected").await.unwrap(), "static-key"); } + + // ── Output-token parsing tests ────────────────────────────────────────── + + /// `parse_anthropic` extracts `output_tokens` from the usage object. + #[test] + fn parse_anthropic_output_tokens() { + let v = serde_json::json!({ + "stop_reason": "end_turn", + "content": [{"type": "text", "text": "hi"}], + "usage": {"input_tokens": 42, "output_tokens": 7} + }); + assert_eq!(parse_anthropic(v).unwrap().output_tokens, Some(7)); + } + + /// `parse_anthropic` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_anthropic_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "stop_reason": "end_turn", + "content": [{"type": "text", "text": "hi"}] + }); + assert_eq!(parse_anthropic(v).unwrap().output_tokens, None); + } + + /// `parse_openai` maps `completion_tokens` to `output_tokens`. + #[test] + fn parse_openai_output_tokens_from_completion_tokens() { + let v = serde_json::json!({ + "choices": [{"finish_reason": "stop", "message": {"content": "hi"}}], + "usage": {"prompt_tokens": 123, "completion_tokens": 4, "total_tokens": 127} + }); + assert_eq!(parse_openai(v).unwrap().output_tokens, Some(4)); + } + + /// `parse_openai` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_openai_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "choices": [{"finish_reason": "stop", "message": {"content": "hi"}}] + }); + assert_eq!(parse_openai(v).unwrap().output_tokens, None); + } + + /// `parse_responses` extracts `output_tokens` from the usage object. + #[test] + fn parse_responses_output_tokens() { + let v = serde_json::json!({ + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hi"}] + }], + "usage": {"input_tokens": 321, "output_tokens": 9, "total_tokens": 330} + }); + assert_eq!(parse_responses(v).unwrap().output_tokens, Some(9)); + } + + /// `parse_responses` returns `None` for `output_tokens` when usage is absent. + #[test] + fn parse_responses_output_tokens_missing_usage_is_none() { + let v = serde_json::json!({ + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hi"}] + }] + }); + assert_eq!(parse_responses(v).unwrap().output_tokens, None); + } } diff --git a/crates/buzz-agent/src/types.rs b/crates/buzz-agent/src/types.rs index a8acb52a6..ef006b70a 100644 --- a/crates/buzz-agent/src/types.rs +++ b/crates/buzz-agent/src/types.rs @@ -139,6 +139,10 @@ pub struct LlmResponse { /// tokens, so reading it alone would undercount). Used to gate handoff on /// the real token budget rather than a byte estimate. pub input_tokens: Option, + /// Output tokens the provider reported for this request, or `None` if the + /// response carried no usage. Used to accumulate per-turn output counts + /// for NIP-AM metric publishing. + pub output_tokens: Option, /// Reasoning/thinking content emitted by the model before its answer, if /// any. Non-empty when the provider returns extended-thinking tokens: /// diff --git a/crates/buzz-agent/src/wire.rs b/crates/buzz-agent/src/wire.rs index 7c164724d..9d9bd69fe 100644 --- a/crates/buzz-agent/src/wire.rs +++ b/crates/buzz-agent/src/wire.rs @@ -126,6 +126,18 @@ pub fn session_update(sid: &str, update: Value) -> Value { }) } +/// A `_goose/unstable/session/update` notification — the separate top-level +/// method goose uses for custom usage and status events. Used by buzz-agent +/// to emit the `usage_update` payload so buzz-acp's `UsageTracker` can treat +/// buzz-agent and goose symmetrically. +pub fn goose_session_update(sid: &str, update: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "method": "_goose/unstable/session/update", + "params": { "sessionId": sid, "update": update }, + }) +} + /// A `session/update` notification carrying a `update._meta.goose.` field. /// Used to advertise `activeRunId` (so steer-capable clients can target the /// in-flight run) and `queuedSteer` (so they can correlate an accepted steer diff --git a/crates/buzz-agent/tests/fake_llm.rs b/crates/buzz-agent/tests/fake_llm.rs index a1791e7d6..20ae5bd0a 100644 --- a/crates/buzz-agent/tests/fake_llm.rs +++ b/crates/buzz-agent/tests/fake_llm.rs @@ -750,6 +750,285 @@ async fn steer_rejected_on_run_id_mismatch() { h.shutdown().await; } +// ─── Usage notification (_goose/unstable/session/update usage_update) ─────── + +/// An OpenAI chat completion response with a `usage` block (prompt_tokens + +/// completion_tokens). buzz-agent maps these to `accumulatedInputTokens` / +/// `accumulatedOutputTokens` in the `_goose/unstable/session/update` notification. +fn openai_text_with_usage(content: &str, input_tokens: u64, output_tokens: u64) -> Value { + json!({ + "id": "cc-u", "object": "chat.completion", "model": "fake-model", + "choices": [{ + "index": 0, + "message": { "role": "assistant", "content": content }, + "finish_reason": "stop", + }], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + }) +} + +/// Returns true when `v` is a `_goose/unstable/session/update` usage_update +/// notification. +fn is_usage_update(v: &Value) -> bool { + v.get("method") == Some(&json!("_goose/unstable/session/update")) + && v["params"]["update"]["sessionUpdate"] == "usage_update" +} + +/// Collect every frame that arrives BEFORE the message matching `until_pred`, +/// then return (frames_before, matching_frame). +async fn recv_until_with_drain(h: &mut Harness, mut until_pred: F) -> (Vec, Value) +where + F: FnMut(&Value) -> bool, +{ + let mut before = Vec::new(); + loop { + let v = h.recv().await; + if until_pred(&v) { + return (before, v); + } + before.push(v); + } +} + +/// buzz-agent must emit `_goose/unstable/session/update` with `sessionUpdate: +/// "usage_update"` **before** the `session/prompt` response on each turn, and +/// must accumulate counters across turns (turn 2 reports turn1+turn2 sums). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn usage_notification_emitted_before_prompt_response() { + let url = spawn_fake_llm(vec![ + openai_text_with_usage("turn one reply", 10, 5), + openai_text_with_usage("turn two reply", 20, 8), + ]) + .await; + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + // ── Turn 1 ────────────────────────────────────────────────────────────── + let p1 = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"turn 1"}]}), + ) + .await; + + let (frames_before_t1, response_t1) = recv_until_with_drain(&mut h, |v| v["id"] == p1).await; + assert_eq!( + response_t1["result"]["stopReason"], "end_turn", + "turn 1 must complete with end_turn" + ); + + // A usage_update notification must appear in the frames before the response. + let usage_t1 = frames_before_t1 + .iter() + .find(|v| is_usage_update(v)) + .unwrap_or_else(|| { + panic!( + "expected _goose/unstable/session/update usage_update before turn-1 response; frames: {frames_before_t1:#?}" + ) + }); + assert_eq!( + usage_t1["params"]["update"]["sessionUpdate"], "usage_update", + "sessionUpdate field must be 'usage_update'" + ); + assert_eq!( + usage_t1["params"]["update"]["accumulatedInputTokens"], + json!(10u64), + "turn 1 accumulated input tokens" + ); + assert_eq!( + usage_t1["params"]["update"]["accumulatedOutputTokens"], + json!(5u64), + "turn 1 accumulated output tokens" + ); + + // ── Turn 2 ────────────────────────────────────────────────────────────── + let p2 = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"turn 2"}]}), + ) + .await; + + let (frames_before_t2, response_t2) = recv_until_with_drain(&mut h, |v| v["id"] == p2).await; + assert_eq!( + response_t2["result"]["stopReason"], "end_turn", + "turn 2 must complete with end_turn" + ); + + // Notification arrives before the response, with cumulative sums (10+20, 5+8). + let usage_t2 = frames_before_t2 + .iter() + .find(|v| is_usage_update(v)) + .unwrap_or_else(|| { + panic!( + "expected _goose/unstable/session/update usage_update before turn-2 response; frames: {frames_before_t2:#?}" + ) + }); + assert_eq!( + usage_t2["params"]["update"]["accumulatedInputTokens"], + json!(30u64), + "turn 2 accumulated input tokens must be 10+20=30" + ); + assert_eq!( + usage_t2["params"]["update"]["accumulatedOutputTokens"], + json!(13u64), + "turn 2 accumulated output tokens must be 5+8=13" + ); + + h.shutdown().await; +} + +/// When the provider returns a response with no `usage` block, buzz-agent must +/// NOT emit a `_goose/unstable/session/update` notification for that turn. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn no_usage_turn_emits_no_usage_notification() { + let url = spawn_fake_llm(vec![openai_text("no usage here")]).await; + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + let p_id = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"go"}]}), + ) + .await; + + let (frames_before, response) = recv_until_with_drain(&mut h, |v| v["id"] == p_id).await; + assert_eq!( + response["result"]["stopReason"], "end_turn", + "turn must complete with end_turn" + ); + + // No usage notification must appear in the frames before the response. + let found = frames_before.iter().any(is_usage_update); + assert!( + !found, + "expected NO usage_update notification when provider reports no usage; frames: {frames_before:#?}" + ); + + h.shutdown().await; +} + +/// When a turn is cancelled AFTER the provider has already returned a response +/// (so token counts are observed), buzz-agent must still emit the usage +/// notification before the cancelled `session/prompt` response. +/// +/// Setup: round 1 is a tool call WITH usage (tokens are captured). The agent +/// sends the cancel before round 2's LLM call, so the turn exits with +/// `stopReason: "cancelled"`. The usage notification must precede that response. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn cancelled_turn_with_usage_emits_notification_before_response() { + // Round 1: tool call with usage — sets turn_input/output_tokens. + // Round 2 never starts because cancel fires at the round boundary. + let url = spawn_fake_llm(vec![openai_tool_call_with_usage( + "call_cancel_test", + "fake__noop", + json!({}), + 15, + 6, + )]) + .await; + let mut h = Harness::spawn(&url).await; + let sid = init_session(&mut h).await; + + let p_id = h + .send( + "session/prompt", + json!({"sessionId": sid, "prompt": [{"type":"text","text":"start work"}]}), + ) + .await; + + // Wait for the activeRunId advert (agent is live) then send cancel. + let _run_id = recv_active_run_id(&mut h).await; + // Wait for the tool_call_update (failed — unknown tool) so we know round 1 + // LLM response has been processed and tokens are captured, THEN cancel. + h.recv_until(|v| { + v.get("method") == Some(&json!("session/update")) + && v["params"]["update"]["sessionUpdate"] == "tool_call_update" + }) + .await; + let c_id = h.send("session/cancel", json!({"sessionId": sid})).await; + // Drain remaining frames; the cancel OK and the prompt response both arrive. + let mut saw_usage_before_prompt_response = false; + let mut saw_usage = false; + let mut saw_cancel_ok = false; + let mut saw_prompt_response = false; + for _ in 0..40 { + let v = h.recv().await; + if v["id"] == json!(c_id) { + // cancel acknowledged + saw_cancel_ok = true; + } else if is_usage_update(&v) { + saw_usage = true; + // Record that usage arrived before the prompt response (if it hasn't yet). + if !saw_prompt_response { + saw_usage_before_prompt_response = true; + } + } else if v["id"] == json!(p_id) { + saw_prompt_response = true; + // The prompt response is either a result (stopReason: cancelled or + // end_turn) or an error (if cancel races with round 2's LLM call + // returning no-more-responses). Both are acceptable — we only care + // that the usage notification precedes whichever frame terminates + // the turn. + let has_result = v.get("result").is_some(); + let has_error = v.get("error").is_some(); + assert!( + has_result || has_error, + "expected result or error on prompt response, got: {v}" + ); + } + if saw_usage && saw_prompt_response && saw_cancel_ok { + break; + } + } + assert!(saw_cancel_ok, "session/cancel was not acknowledged"); + assert!( + saw_usage, + "expected usage_update notification for cancelled turn with observed tokens" + ); + assert!( + saw_usage_before_prompt_response, + "usage_update must arrive before the session/prompt response" + ); + + h.shutdown().await; +} + +/// A tool-call OpenAI response with a `usage` block. Used to capture tokens in +/// round 1 before a cancel fires at the round boundary. +fn openai_tool_call_with_usage( + id: &str, + name: &str, + args: Value, + input_tokens: u64, + output_tokens: u64, +) -> Value { + json!({ + "id": "cc-u2", "object": "chat.completion", "model": "fake-model", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", "content": null, + "tool_calls": [{ + "id": id, "type": "function", + "function": { "name": name, "arguments": args.to_string() }, + }], + }, + "finish_reason": "tool_calls", + }], + "usage": { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + }, + }) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn steer_rejected_on_empty_prompt() { let (url, _captures) = spawn_capturing_fake_llm(vec![