Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

186 changes: 184 additions & 2 deletions crates/buzz-acp/src/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::process::{Child, ChildStdin, ChildStdout};
use tokio_util::codec::{FramedRead, LinesCodec, LinesCodecError};

use crate::observer::{ObserverContext, ObserverHandle};
use crate::usage::{TurnUsage, UsageTracker};

/// Maximum allowed size of a single NDJSON line from the agent's stdout.
/// Lines exceeding this limit are rejected to prevent OOM from rogue agents.
Expand Down Expand Up @@ -167,6 +168,11 @@ pub struct AcpClient {
/// outside of a goose-native turn — the read loop's steer arm is
/// disabled in that case.
steer_rx: Option<tokio::sync::mpsc::Receiver<crate::pool::SteerRequest>>,
/// Usage tracker — accumulates cumulative token counts from
/// `_goose/unstable/session/update` notifications and computes per-turn
/// deltas. Both goose and buzz-agent emit this notification; goose gates
/// on client capability advertisement, buzz-agent emits unconditionally.
goose_usage: UsageTracker,
}

impl AcpClient {
Expand Down Expand Up @@ -258,6 +264,7 @@ impl AcpClient {
observer_context: ObserverContext::default(),
active_run_id: None,
steer_rx: None,
goose_usage: UsageTracker::default(),
})
}

Expand Down Expand Up @@ -303,7 +310,16 @@ impl AcpClient {
// on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges.
let params = serde_json::json!({
"protocolVersion": 2,
"clientCapabilities": {},
"clientCapabilities": {
// Signal to goose that we handle `_goose/unstable/session/update`
// notifications. Without this the custom notification is suppressed
// on goose's side and usage data is never emitted.
"_meta": {
"goose": {
"customNotifications": true
}
}
},
"clientInfo": {
"name": "buzz-acp",
"version": env!("CARGO_PKG_VERSION")
Expand Down Expand Up @@ -427,6 +443,11 @@ impl AcpClient {
let hard_deadline = tokio::time::Instant::now() + max_duration;
self.current_hard_deadline = Some(hard_deadline);

// Mark the usage tracker as in-flight for this turn BEFORE sending the
// prompt so that any setup notifications recorded earlier are not
// misattributed to this turn.
self.goose_usage.begin_turn(session_id);

self.last_prompt_id = Some(self.next_id);
let id = self.next_id;
self.next_id += 1;
Expand Down Expand Up @@ -502,6 +523,20 @@ impl AcpClient {
self.active_run_id.as_deref()
}

/// Consume and return the per-turn usage record computed from the most
/// recent `_goose/unstable/session/update` notification.
///
/// Returns `None` if no usage update arrived since the last call (i.e.
/// the harness did not emit one for this turn, or this is not a goose
/// agent). Must be called at most once per turn; subsequent calls return
/// `None` until the next `usage_update` notification is recorded.
///
/// Intended for consumption by `publish_agent_turn_metric` in `pool.rs` to
/// publish a kind 44200 NIP-AM event.
pub fn take_turn_usage(&mut self) -> Option<TurnUsage> {
self.goose_usage.take()
}

/// Install a per-turn steer request channel for goose-native
/// non-cancelling mid-turn delivery.
///
Expand Down Expand Up @@ -840,6 +875,9 @@ impl AcpClient {
"session/update" => {
let _ = self.handle_session_update(&msg);
}
"_goose/unstable/session/update" => {
self.handle_goose_usage_update(&msg);
}
"session/request_permission" => {
self.handle_permission_request(&msg).await?;
}
Expand Down Expand Up @@ -1170,6 +1208,9 @@ impl AcpClient {
idle_deadline = Instant::now() + idle_timeout;
}
}
"_goose/unstable/session/update" => {
self.handle_goose_usage_update(&msg);
}
"session/request_permission" => {
self.handle_permission_request(&msg).await?;
}
Expand Down Expand Up @@ -1311,6 +1352,46 @@ impl AcpClient {
}
}

/// Parse a `_goose/unstable/session/update` notification and record the
/// usage snapshot in the per-session tracker.
///
/// Silently ignores malformed or non-`usage_update` variants — the
/// notification is best-effort observability data, not a protocol
/// requirement. Failures are logged at debug level.
fn handle_goose_usage_update(&mut self, msg: &serde_json::Value) {
use crate::usage::{GooseSessionUpdateNotification, GooseSessionUpdateVariant};
let params = match msg.get("params") {
Some(p) => p,
None => {
tracing::debug!(
target: "acp::usage",
"_goose/unstable/session/update: missing params"
);
return;
}
};
match serde_json::from_value::<GooseSessionUpdateNotification>(params.clone()) {
Ok(notif) => {
if let GooseSessionUpdateVariant::UsageUpdate(payload) = &notif.update {
tracing::debug!(
target: "acp::usage",
session_id = %notif.session_id,
input = payload.accumulated_input_tokens,
output = payload.accumulated_output_tokens,
"goose usage update"
);
self.goose_usage.record(&notif.session_id, payload);
}
}
Err(e) => {
tracing::debug!(
target: "acp::usage",
"_goose/unstable/session/update: deserialization error: {e}"
);
}
}
}

/// Auto-approve a `session/request_permission` request from the agent.
///
/// Finds the option with `kind == "allow_once"` and responds with its `optionId`.
Expand Down Expand Up @@ -1782,7 +1863,13 @@ mod tests {
"method": "initialize",
"params": {
"protocolVersion": 2,
"clientCapabilities": {},
"clientCapabilities": {
"_meta": {
"goose": {
"customNotifications": true
}
}
},
"clientInfo": {
"name": "buzz-acp",
"version": "0.1.0"
Expand All @@ -1795,6 +1882,11 @@ mod tests {
Some("buzz-acp")
);
assert!(msg["params"]["clientCapabilities"].is_object());
assert_eq!(
msg["params"]["clientCapabilities"]["_meta"]["goose"]["customNotifications"].as_bool(),
Some(true),
"goose customNotifications capability must be advertised"
);
}

#[test]
Expand Down Expand Up @@ -2825,4 +2917,94 @@ mod tests {
other => panic!("expected SteerAck::Success, got {other:?}"),
}
}

// ── Goose usage notification integration ──────────────────────────────

/// Build a `_goose/unstable/session/update` JSON-RPC notification.
fn goose_usage_update_msg(
session_id: &str,
input: u64,
output: u64,
cost: Option<f64>,
) -> serde_json::Value {
let mut update = serde_json::json!({
"sessionUpdate": "usage_update",
"used": input + output,
"contextLimit": 200000u64,
"accumulatedInputTokens": input,
"accumulatedOutputTokens": output,
});
if let Some(c) = cost {
update["accumulatedCost"] = serde_json::json!(c);
}
serde_json::json!({
"jsonrpc": "2.0",
"method": "_goose/unstable/session/update",
"params": {
"sessionId": session_id,
"update": update
}
})
}

#[tokio::test]
async fn goose_usage_notification_recorded_and_take_returns_usage() {
let mut client = spawn_inert_client().await;
assert!(client.take_turn_usage().is_none(), "starts empty");

// begin_turn before sending the prompt — mirrors the real call flow.
client.goose_usage.begin_turn("s1");
let msg = goose_usage_update_msg("s1", 1000, 200, Some(0.01));
client.handle_goose_usage_update(&msg);

let usage = client
.take_turn_usage()
.expect("usage should be present after notification");
assert_eq!(usage.session_id, "s1");
assert_eq!(usage.turn_seq, 1);
assert!(!usage.delta_reliable, "first turn must be unreliable");
assert_eq!(usage.cumulative_input_tokens, 1000);
assert_eq!(usage.cumulative_output_tokens, 200);
assert_eq!(usage.cumulative_cost_usd, Some(0.01));

// Second take must be None.
assert!(
client.take_turn_usage().is_none(),
"take after drain is None"
);
}

#[tokio::test]
async fn goose_usage_second_turn_delta_reliable() {
let mut client = spawn_inert_client().await;
// Turn 1.
client.goose_usage.begin_turn("s2");
client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1000, 200, None));
let _ = client.take_turn_usage();
// Turn 2.
client.goose_usage.begin_turn("s2");
client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1800, 450, None));
let usage = client.take_turn_usage().expect("turn 2 usage");
assert!(usage.delta_reliable);
assert_eq!(usage.turn_input_tokens, Some(800));
assert_eq!(usage.turn_output_tokens, Some(250));
}

#[tokio::test]
async fn goose_usage_malformed_notification_does_not_panic() {
let mut client = spawn_inert_client().await;
// Missing params entirely.
let bad = serde_json::json!({"jsonrpc":"2.0","method":"_goose/unstable/session/update"});
client.handle_goose_usage_update(&bad);
assert!(client.take_turn_usage().is_none());

// params present but wrong shape.
let bad2 = serde_json::json!({
"jsonrpc": "2.0",
"method": "_goose/unstable/session/update",
"params": { "oops": true }
});
client.handle_goose_usage_update(&bad2);
assert!(client.take_turn_usage().is_none());
}
}
2 changes: 1 addition & 1 deletion crates/buzz-acp/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ fn validate_multiple_event_handling(
Ok(())
}

fn normalize_agent_command_identity(command: &str) -> String {
pub(crate) fn normalize_agent_command_identity(command: &str) -> String {
let normalized = command.trim().replace('\\', "/");
let trimmed = normalized.trim_end_matches('/');
let basename = trimmed
Expand Down
4 changes: 4 additions & 0 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod observer;
mod pool;
mod queue;
mod relay;
mod usage;

pub use usage::TurnUsage;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -1402,6 +1405,7 @@ async fn tokio_main() -> Result<()> {
.as_deref()
.and_then(|hex| nostr::PublicKey::from_hex(hex).ok()),
memory_enabled: config.memory_enabled,
harness_name: crate::config::normalize_agent_command_identity(&config.agent_command),
});

if !config.memory_enabled {
Expand Down
Loading
Loading