Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a44dde0
docs(nips): add NIP-AM draft for durable agent turn metrics
Jul 1, 2026
19889ba
docs(nips): harden NIP-AM read gate and delta ordering semantics
Jul 1, 2026
b7480e8
feat(core/relay): add NIP-AM kind 44200 (agent turn metrics) with rel…
Jul 1, 2026
23b522d
fix(relay/core): close result-level read gate for kind:44200 (NIP-AM)
Jul 1, 2026
f8fe787
feat(buzz-acp): add goose usage adapter for NIP-AM turn metrics (Phas…
Jul 1, 2026
c9a1458
fix(relay/core): plug COUNT existence-leak and StopReason forward-com…
Jul 1, 2026
c9a583f
fix(acp): make GooseUsageTracker turn-scoped and close cost-decrease …
Jul 1, 2026
6771caf
chore(fmt): run rustfmt on NIP-AM kind 44200 relay changes
Jul 1, 2026
f3f751c
chore(fmt): run rustfmt on NIP-AM goose adapter
Jul 1, 2026
6f75236
Merge branch 'duncan/nip-am-kind-relay' into paul/nip-am-agent-turn-m…
Jul 2, 2026
6c68d8c
Merge branch 'paul/nip-am-agent-turn-metrics' into duncan/nip-am-goos…
Jul 2, 2026
3011944
feat(acp,buzz-agent): publish NIP-AM kind 44200 agent turn metrics
Jul 2, 2026
39fd2d8
fix(acp,buzz-agent): address Thufir pass-1 findings on NIP-AM step-2 …
Jul 2, 2026
6c0bf3a
refactor(acp,buzz-agent): unify NIP-AM metrics via shared usage notif…
Jul 2, 2026
ac42420
test(buzz-agent): add producer-contract tests for usage notification
Jul 2, 2026
e6b3f45
test(buzz-agent): fix flake in cancelled_turn_with_usage test
Jul 2, 2026
51c9b68
chore: merge origin/main into paul/nip-am-agent-turn-metrics
Jul 2, 2026
193a5e2
Merge remote-tracking branch 'origin/paul/nip-am-agent-turn-metrics' …
Jul 2, 2026
a0a6ce4
fix(lint): remove redundant closure in no_usage_turn test
Jul 2, 2026
760d1ec
Merge remote-tracking branch 'origin/main' into paul/nip-am-agent-tur…
wpfleger96 Jul 2, 2026
d6a1913
fix(nip-am): address Eva+Wren review findings (migration, delta, cost…
Jul 3, 2026
497f007
fix(nip-am): fix cross-session baseline corruption + add 44200 FTS re…
Jul 3, 2026
cb6b5f2
test(nip-am): apply full migration chain in FTS integration test setup
Jul 3, 2026
552cf5f
docs(nip-am): fix stale record() summary contradicting three-case con…
Jul 3, 2026
b512df4
chore(nip-am): merge origin/main; fix stale FTS doc pointers
Jul 3, 2026
07188c3
style(nip-am): apply rustfmt to test asserts in usage, agent_turn_met…
wpfleger96 Jul 3, 2026
8ebe6d9
fix(nip-am): address code-review findings — decrypt validation, else-…
Jul 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ Docker Compose provides the full local development stack. All services include h
Search runs over the `events.search_tsv` generated `tsvector` column on the
`events` table (no separate collection or service). The column is populated on
insert — `to_tsvector('simple', content)` — and excludes privacy-sensitive
kinds via `CASE WHEN kind IN (1059, 30300, 30622) THEN NULL`, so those rows are
kinds via `CASE WHEN kind IN (1059, 30300, 30622, 44100, 44101, 44200) THEN NULL`, so those rows are
storage-level unsearchable (a `NULL` tsvector never matches `@@`). A GIN index
(`idx_events_search_tsv`) backs the `@@` probe; in multi-community mode the
community-leading btree filters BitmapAnd with the GIN probe so every query is
Expand Down
5 changes: 3 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,9 @@ for team access setup, onboarding, and the full repo inventory. See
6. **Index for search** (if applicable) — Postgres FTS indexes persisted
events automatically via the `events.search_tsv` generated column. To
exclude a privacy-sensitive kind from search, add it to the `CASE WHEN
kind IN (...)` exclusion in the `search_tsv` definition (see the initial
schema migration) rather than wiring a separate indexer.
kind IN (...)` exclusion in `schema/schema.sql` and add a new additive
migration (see `migrations/0004_agent_turn_metric_fts.sql` as the model)
rather than modifying the initial migration or wiring a separate indexer.

7. **Audit** — the audit log captures all events automatically; no changes
needed unless you need custom audit metadata.
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

186 changes: 184 additions & 2 deletions crates/buzz-acp/src/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::process::{Child, ChildStdin, ChildStdout};
use tokio_util::codec::{FramedRead, LinesCodec, LinesCodecError};

use crate::observer::{ObserverContext, ObserverHandle};
use crate::usage::{TurnUsage, UsageTracker};

/// Maximum allowed size of a single NDJSON line from the agent's stdout.
/// Lines exceeding this limit are rejected to prevent OOM from rogue agents.
Expand Down Expand Up @@ -167,6 +168,11 @@ pub struct AcpClient {
/// outside of a goose-native turn — the read loop's steer arm is
/// disabled in that case.
steer_rx: Option<tokio::sync::mpsc::Receiver<crate::pool::SteerRequest>>,
/// Usage tracker — accumulates cumulative token counts from
/// `_goose/unstable/session/update` notifications and computes per-turn
/// deltas. Both goose and buzz-agent emit this notification; goose gates
/// on client capability advertisement, buzz-agent emits unconditionally.
goose_usage: UsageTracker,
}

impl AcpClient {
Expand Down Expand Up @@ -258,6 +264,7 @@ impl AcpClient {
observer_context: ObserverContext::default(),
active_run_id: None,
steer_rx: None,
goose_usage: UsageTracker::default(),
})
}

Expand Down Expand Up @@ -303,7 +310,16 @@ impl AcpClient {
// on ACP v2 ahead of the upstream ACP RFD. Revisit when that RFD merges.
let params = serde_json::json!({
"protocolVersion": 2,
"clientCapabilities": {},
"clientCapabilities": {
// Signal to goose that we handle `_goose/unstable/session/update`
// notifications. Without this the custom notification is suppressed
// on goose's side and usage data is never emitted.
"_meta": {
"goose": {
"customNotifications": true
}
}
},
"clientInfo": {
"name": "buzz-acp",
"version": env!("CARGO_PKG_VERSION")
Expand Down Expand Up @@ -427,6 +443,11 @@ impl AcpClient {
let hard_deadline = tokio::time::Instant::now() + max_duration;
self.current_hard_deadline = Some(hard_deadline);

// Mark the usage tracker as in-flight for this turn BEFORE sending the
// prompt so that any setup notifications recorded earlier are not
// misattributed to this turn.
self.goose_usage.begin_turn(session_id);

self.last_prompt_id = Some(self.next_id);
let id = self.next_id;
self.next_id += 1;
Expand Down Expand Up @@ -502,6 +523,20 @@ impl AcpClient {
self.active_run_id.as_deref()
}

/// Consume and return the per-turn usage record computed from the most
/// recent `_goose/unstable/session/update` notification.
///
/// Returns `None` if no usage update arrived since the last call (i.e.
/// the harness did not emit one for this turn, or this is not a goose
/// agent). Must be called at most once per turn; subsequent calls return
/// `None` until the next `usage_update` notification is recorded.
///
/// Intended for consumption by `publish_agent_turn_metric` in `pool.rs` to
/// publish a kind 44200 NIP-AM event.
pub fn take_turn_usage(&mut self) -> Option<TurnUsage> {
self.goose_usage.take()
}

/// Install a per-turn steer request channel for goose-native
/// non-cancelling mid-turn delivery.
///
Expand Down Expand Up @@ -840,6 +875,9 @@ impl AcpClient {
"session/update" => {
let _ = self.handle_session_update(&msg);
}
"_goose/unstable/session/update" => {
self.handle_goose_usage_update(&msg);
}
"session/request_permission" => {
self.handle_permission_request(&msg).await?;
}
Expand Down Expand Up @@ -1170,6 +1208,9 @@ impl AcpClient {
idle_deadline = Instant::now() + idle_timeout;
}
}
"_goose/unstable/session/update" => {
self.handle_goose_usage_update(&msg);
}
"session/request_permission" => {
self.handle_permission_request(&msg).await?;
}
Expand Down Expand Up @@ -1311,6 +1352,46 @@ impl AcpClient {
}
}

/// Parse a `_goose/unstable/session/update` notification and record the
/// usage snapshot in the per-session tracker.
///
/// Silently ignores malformed or non-`usage_update` variants — the
/// notification is best-effort observability data, not a protocol
/// requirement. Failures are logged at debug level.
fn handle_goose_usage_update(&mut self, msg: &serde_json::Value) {
use crate::usage::{GooseSessionUpdateNotification, GooseSessionUpdateVariant};
let params = match msg.get("params") {
Some(p) => p,
None => {
tracing::debug!(
target: "acp::usage",
"_goose/unstable/session/update: missing params"
);
return;
}
};
match serde_json::from_value::<GooseSessionUpdateNotification>(params.clone()) {
Ok(notif) => {
if let GooseSessionUpdateVariant::UsageUpdate(payload) = &notif.update {
tracing::debug!(
target: "acp::usage",
session_id = %notif.session_id,
input = payload.accumulated_input_tokens,
output = payload.accumulated_output_tokens,
"goose usage update"
);
self.goose_usage.record(&notif.session_id, payload);
}
}
Err(e) => {
tracing::debug!(
target: "acp::usage",
"_goose/unstable/session/update: deserialization error: {e}"
);
}
}
}

/// Auto-approve a `session/request_permission` request from the agent.
///
/// Finds the option with `kind == "allow_once"` and responds with its `optionId`.
Expand Down Expand Up @@ -1782,7 +1863,13 @@ mod tests {
"method": "initialize",
"params": {
"protocolVersion": 2,
"clientCapabilities": {},
"clientCapabilities": {
"_meta": {
"goose": {
"customNotifications": true
}
}
},
"clientInfo": {
"name": "buzz-acp",
"version": "0.1.0"
Expand All @@ -1795,6 +1882,11 @@ mod tests {
Some("buzz-acp")
);
assert!(msg["params"]["clientCapabilities"].is_object());
assert_eq!(
msg["params"]["clientCapabilities"]["_meta"]["goose"]["customNotifications"].as_bool(),
Some(true),
"goose customNotifications capability must be advertised"
);
}

#[test]
Expand Down Expand Up @@ -2825,4 +2917,94 @@ mod tests {
other => panic!("expected SteerAck::Success, got {other:?}"),
}
}

// ── Goose usage notification integration ──────────────────────────────

/// Build a `_goose/unstable/session/update` JSON-RPC notification.
fn goose_usage_update_msg(
session_id: &str,
input: u64,
output: u64,
cost: Option<f64>,
) -> serde_json::Value {
let mut update = serde_json::json!({
"sessionUpdate": "usage_update",
"used": input + output,
"contextLimit": 200000u64,
"accumulatedInputTokens": input,
"accumulatedOutputTokens": output,
});
if let Some(c) = cost {
update["accumulatedCost"] = serde_json::json!(c);
}
serde_json::json!({
"jsonrpc": "2.0",
"method": "_goose/unstable/session/update",
"params": {
"sessionId": session_id,
"update": update
}
})
}

#[tokio::test]
async fn goose_usage_notification_recorded_and_take_returns_usage() {
let mut client = spawn_inert_client().await;
assert!(client.take_turn_usage().is_none(), "starts empty");

// begin_turn before sending the prompt — mirrors the real call flow.
client.goose_usage.begin_turn("s1");
let msg = goose_usage_update_msg("s1", 1000, 200, Some(0.01));
client.handle_goose_usage_update(&msg);

let usage = client
.take_turn_usage()
.expect("usage should be present after notification");
assert_eq!(usage.session_id, "s1");
assert_eq!(usage.turn_seq, 1);
assert!(!usage.delta_reliable, "first turn must be unreliable");
assert_eq!(usage.cumulative_input_tokens, 1000);
assert_eq!(usage.cumulative_output_tokens, 200);
assert_eq!(usage.cumulative_cost_usd, Some(0.01));

// Second take must be None.
assert!(
client.take_turn_usage().is_none(),
"take after drain is None"
);
}

#[tokio::test]
async fn goose_usage_second_turn_delta_reliable() {
let mut client = spawn_inert_client().await;
// Turn 1.
client.goose_usage.begin_turn("s2");
client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1000, 200, None));
let _ = client.take_turn_usage();
// Turn 2.
client.goose_usage.begin_turn("s2");
client.handle_goose_usage_update(&goose_usage_update_msg("s2", 1800, 450, None));
let usage = client.take_turn_usage().expect("turn 2 usage");
assert!(usage.delta_reliable);
assert_eq!(usage.turn_input_tokens, Some(800));
assert_eq!(usage.turn_output_tokens, Some(250));
}

#[tokio::test]
async fn goose_usage_malformed_notification_does_not_panic() {
let mut client = spawn_inert_client().await;
// Missing params entirely.
let bad = serde_json::json!({"jsonrpc":"2.0","method":"_goose/unstable/session/update"});
client.handle_goose_usage_update(&bad);
assert!(client.take_turn_usage().is_none());

// params present but wrong shape.
let bad2 = serde_json::json!({
"jsonrpc": "2.0",
"method": "_goose/unstable/session/update",
"params": { "oops": true }
});
client.handle_goose_usage_update(&bad2);
assert!(client.take_turn_usage().is_none());
}
}
2 changes: 1 addition & 1 deletion crates/buzz-acp/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ fn validate_multiple_event_handling(
Ok(())
}

fn normalize_agent_command_identity(command: &str) -> String {
pub(crate) fn normalize_agent_command_identity(command: &str) -> String {
let normalized = command.trim().replace('\\', "/");
let trimmed = normalized.trim_end_matches('/');
let basename = trimmed
Expand Down
4 changes: 4 additions & 0 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod observer;
mod pool;
mod queue;
mod relay;
mod usage;

pub use usage::TurnUsage;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
Expand Down Expand Up @@ -1402,6 +1405,7 @@ async fn tokio_main() -> Result<()> {
.as_deref()
.and_then(|hex| nostr::PublicKey::from_hex(hex).ok()),
memory_enabled: config.memory_enabled,
harness_name: crate::config::normalize_agent_command_identity(&config.agent_command),
});

if !config.memory_enabled {
Expand Down
Loading
Loading