From 9a059c03e3a7a1d7f2823f59fdf783c42e716063 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Wed, 1 Jul 2026 14:20:26 -0400 Subject: [PATCH 1/2] feat(buzz-agent): emit agent_thought_chunk for reasoning content MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit buzz-agent never emitted agent_thought_chunk, so the observer feed's Thinking block was always absent even with a reasoning model — unlike goose/claude/codex which all emit it. Add a `reasoning` field to `LlmResponse` and populate it in all three provider parse paths: - Responses API: concatenated summary[].text from type=="reasoning" items - Anthropic: concatenated thinking blocks (type=="thinking") - OpenAI chat/completions: always empty (reasoning not exposed) Emit agent_thought_chunk before agent_message_chunk in agent.rs when reasoning is non-empty. Tests cover both the Anthropic and Responses API paths and verify no thought chunk is emitted for plain text responses. Also fix two stale comments in acp.rs claiming session_info_update is goose-only; buzz-agent emits it too (lib.rs:510). Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-acp/src/acp.rs | 15 +- crates/buzz-agent/src/agent.rs | 14 ++ crates/buzz-agent/src/llm.rs | 37 ++- crates/buzz-agent/src/types.rs | 9 + crates/buzz-agent/tests/golden_transcripts.rs | 212 ++++++++++++++++++ 5 files changed, 279 insertions(+), 8 deletions(-) diff --git a/crates/buzz-acp/src/acp.rs b/crates/buzz-acp/src/acp.rs index ac7a31c59..1dbd5af62 100644 --- a/crates/buzz-acp/src/acp.rs +++ b/crates/buzz-acp/src/acp.rs @@ -490,8 +490,9 @@ impl AcpClient { /// Most recently observed goose `_meta.goose.activeRunId` from a /// `session_info_update`, if any. /// - /// Goose-only: other agents leave this `None` for the lifetime of the - /// client. Read directly by `read_until_response_with_idle_timeout`'s + /// Both goose and buzz-agent emit `session_info_update`; other agents + /// leave this `None` for the lifetime of the client. Read directly by + /// `read_until_response_with_idle_timeout`'s /// steer arm at write time (see [`crate::pool::SteerRequest`] for /// why the read loop owns this); production callers do not need this /// accessor. Kept as `pub` so tests can introspect the field. @@ -1266,11 +1267,11 @@ impl AcpClient { false } "session_info_update" => { - // Goose-only as of writing: `_meta.goose.activeRunId` carries - // the id of the currently-active prompt run, or `null` when - // the run has cleared. Other agents don't emit this field; - // for them `active_run_id` stays `None` and steer callers - // will fall back to cancel+merge. + // Both goose and buzz-agent emit `session_info_update` with + // `_meta.goose.activeRunId`: the id of the currently-active + // prompt run, or `null` when the run has cleared. Other agents + // don't emit this field; for them `active_run_id` stays `None` + // and steer callers will fall back to cancel+merge. // // Per the ACP `SessionInfoUpdate` schema, `_meta` is a field // on the update object itself — nested inside `update`, not diff --git a/crates/buzz-agent/src/agent.rs b/crates/buzz-agent/src/agent.rs index 54cb43939..7474f0e4b 100644 --- a/crates/buzz-agent/src/agent.rs +++ b/crates/buzz-agent/src/agent.rs @@ -160,6 +160,20 @@ impl RunCtx<'_> { ); } + if !response.reasoning.is_empty() { + wire::send( + self.wire, + wire::session_update( + self.session_id, + json!({ + "sessionUpdate": "agent_thought_chunk", + "content": { "type": "text", "text": &response.reasoning } + }), + ), + ) + .await; + } + if !response.text.is_empty() { wire::send( self.wire, diff --git a/crates/buzz-agent/src/llm.rs b/crates/buzz-agent/src/llm.rs index f961d7f26..e7a33c971 100644 --- a/crates/buzz-agent/src/llm.rs +++ b/crates/buzz-agent/src/llm.rs @@ -620,6 +620,7 @@ fn databricks_v2_path(route: DatabricksV2Route) -> &'static str { fn parse_responses(v: Value) -> Result { let mut text = String::new(); + let mut reasoning = String::new(); let mut tool_calls = Vec::new(); let mut saw_function_call = false; @@ -663,7 +664,28 @@ fn parse_responses(v: Value) -> Result { args, )?); } - // Reasoning items are opaque/internal; we don't replay them. + Some("reasoning") => { + // Reasoning summary items from the Responses API. Each item has a + // `summary` array of `{"type": "summary_text", "text": "..."}` objects. + for s in item + .get("summary") + .and_then(Value::as_array) + .into_iter() + .flatten() + { + if matches!( + s.get("type").and_then(Value::as_str), + Some("summary_text" | "text") + ) { + if let Some(t) = s.get("text").and_then(Value::as_str) { + if !reasoning.is_empty() { + reasoning.push('\n'); + } + reasoning.push_str(t); + } + } + } + } // Unknown types ignored for forward-compat. _ => {} } @@ -691,6 +713,7 @@ fn parse_responses(v: Value) -> Result { tool_calls, stop, input_tokens, + reasoning, }) } @@ -760,6 +783,7 @@ fn parse_anthropic(v: Value) -> Result { let stop = map_stop(v.get("stop_reason").and_then(Value::as_str)); let mut tool_calls = Vec::new(); let mut text = String::new(); + let mut reasoning = String::new(); if let Some(blocks) = v.get("content").and_then(Value::as_array) { for b in blocks { match b.get("type").and_then(Value::as_str) { @@ -768,6 +792,15 @@ fn parse_anthropic(v: Value) -> Result { text.push_str(t); } } + Some("thinking") => { + // Anthropic extended thinking block: `{"type": "thinking", "thinking": "..."}` + if let Some(t) = b.get("thinking").and_then(Value::as_str) { + if !reasoning.is_empty() { + reasoning.push('\n'); + } + reasoning.push_str(t); + } + } Some("tool_use") => tool_calls.push(make_tool_call( str_field(b, "id"), str_field(b, "name"), @@ -783,6 +816,7 @@ fn parse_anthropic(v: Value) -> Result { tool_calls, stop, input_tokens, + reasoning, }) } @@ -819,6 +853,7 @@ fn parse_openai(v: Value) -> Result { tool_calls, stop, input_tokens, + reasoning: String::new(), }) } diff --git a/crates/buzz-agent/src/types.rs b/crates/buzz-agent/src/types.rs index 0b90f9b51..a8acb52a6 100644 --- a/crates/buzz-agent/src/types.rs +++ b/crates/buzz-agent/src/types.rs @@ -139,6 +139,15 @@ pub struct LlmResponse { /// tokens, so reading it alone would undercount). Used to gate handoff on /// the real token budget rather than a byte estimate. pub input_tokens: Option, + /// Reasoning/thinking content emitted by the model before its answer, if + /// any. Non-empty when the provider returns extended-thinking tokens: + /// + /// - Responses API: concatenated `summary[].text` from `type == "reasoning"` output items. + /// - Anthropic: concatenated `thinking` from `type == "thinking"` content blocks. + /// - OpenAI chat/completions: not exposed; always empty. + /// + /// Empty string when the provider returned no reasoning content. + pub reasoning: String, } #[derive(Debug, Clone, Copy, PartialEq)] diff --git a/crates/buzz-agent/tests/golden_transcripts.rs b/crates/buzz-agent/tests/golden_transcripts.rs index 1b1926617..3d5155a7b 100644 --- a/crates/buzz-agent/tests/golden_transcripts.rs +++ b/crates/buzz-agent/tests/golden_transcripts.rs @@ -492,6 +492,218 @@ async fn test_oversized_line_kills_agent() { .expect("agent did not exit on oversized line"); } +/// Build an Anthropic Messages API response with an optional `thinking` block +/// followed by a `text` block. The `thinking` field is omitted when `None`. +fn anthropic_thinking_response(thinking: Option<&str>, text: &str) -> Value { + let mut content: Vec = Vec::new(); + if let Some(t) = thinking { + content.push(json!({ "type": "thinking", "thinking": t })); + } + content.push(json!({ "type": "text", "text": text })); + json!({ + "id": "msg_1", + "type": "message", + "role": "assistant", + "model": "claude-fake", + "stop_reason": "end_turn", + "content": content, + "usage": { "input_tokens": 10, "output_tokens": 5 }, + }) +} + +/// Build an OpenAI Responses API response with a `reasoning` output item +/// (containing a single `summary_text` entry) followed by a message item. +fn responses_reasoning_response(reasoning: &str, text: &str) -> Value { + json!({ + "id": "resp_1", + "status": "completed", + "output": [ + { + "type": "reasoning", + "id": "rs_1", + "summary": [{ "type": "summary_text", "text": reasoning }], + }, + { + "type": "message", + "id": "msg_1", + "content": [{ "type": "output_text", "text": text }], + }, + ], + "usage": { "input_tokens": 10 }, + }) +} + +/// Drain all `session/update` notifications until the `session/prompt` reply +/// arrives for `prompt_id`, collecting notification payloads in order. +async fn collect_updates_until_done(h: &mut Harness, prompt_id: i64) -> Vec { + let mut updates = Vec::new(); + loop { + let v = h.recv().await; + if v.get("id") == Some(&json!(prompt_id)) { + return updates; + } + if v.get("method") == Some(&json!("session/update")) { + if let Some(u) = v["params"].get("update") { + updates.push(u.clone()); + } + } + } +} + +/// Asserts that `agent_thought_chunk` appears in `updates` BEFORE +/// `agent_message_chunk`, and that both are present. +fn assert_thought_before_message(updates: &[Value]) { + let thought_pos = updates + .iter() + .position(|u| u["sessionUpdate"] == "agent_thought_chunk"); + let message_pos = updates + .iter() + .position(|u| u["sessionUpdate"] == "agent_message_chunk"); + assert!( + thought_pos.is_some(), + "expected agent_thought_chunk in updates: {updates:?}" + ); + assert!( + message_pos.is_some(), + "expected agent_message_chunk in updates: {updates:?}" + ); + assert!( + thought_pos.unwrap() < message_pos.unwrap(), + "agent_thought_chunk must precede agent_message_chunk, got updates: {updates:?}" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_thought_chunk_emitted_before_message_chunk_anthropic() { + // Anthropic extended-thinking: the response contains a `thinking` block + // followed by a `text` block. We expect agent_thought_chunk to be emitted + // before agent_message_chunk on the wire. + let url = spawn_fake_llm(vec![anthropic_thinking_response( + Some("Let me reason about this carefully."), + "Here is my answer.", + )]) + .await; + let mut h = Harness::spawn(&[ + ("BUZZ_AGENT_PROVIDER", "anthropic"), + ("ANTHROPIC_API_KEY", "test"), + ("ANTHROPIC_MODEL", "claude-fake"), + ("ANTHROPIC_BASE_URL", &url), + ("OPENAI_COMPAT_BASE_URL", ""), + ]) + .await; + + let sid = handshake(&mut h).await; + let p = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{ "type": "text", "text": "think hard" }], + }), + ) + .await; + + let updates = collect_updates_until_done(&mut h, p).await; + assert_thought_before_message(&updates); + + let thought = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_thought_chunk") + .unwrap(); + assert_eq!(thought["content"]["text"], "Let me reason about this carefully."); + + let message = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_message_chunk") + .unwrap(); + assert_eq!(message["content"]["text"], "Here is my answer."); + + h.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_thought_chunk_emitted_before_message_chunk_responses_api() { + // OpenAI Responses API: reasoning item followed by message item. + // Setting OPENAI_COMPAT_API=responses forces the Responses API parse path. + let url = spawn_fake_llm(vec![responses_reasoning_response( + "Thinking step by step.", + "Final answer.", + )]) + .await; + let mut h = Harness::spawn(&[ + ("BUZZ_AGENT_PROVIDER", "openai"), + ("OPENAI_COMPAT_API_KEY", "test"), + ("OPENAI_COMPAT_MODEL", "fake-model"), + ("OPENAI_COMPAT_API", "responses"), + ("OPENAI_COMPAT_BASE_URL", &url), + ]) + .await; + + let sid = handshake(&mut h).await; + let p = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{ "type": "text", "text": "reason it out" }], + }), + ) + .await; + + let updates = collect_updates_until_done(&mut h, p).await; + assert_thought_before_message(&updates); + + let thought = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_thought_chunk") + .unwrap(); + assert_eq!(thought["content"]["text"], "Thinking step by step."); + + let message = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_message_chunk") + .unwrap(); + assert_eq!(message["content"]["text"], "Final answer."); + + h.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_no_reasoning_no_thought_chunk() { + // Plain text response with no reasoning content — no agent_thought_chunk + // should appear on the wire. This guards against empty thought emissions. + let url = spawn_fake_llm(vec![openai_text("just text, no thinking")]).await; + let mut h = Harness::spawn(&[("OPENAI_COMPAT_BASE_URL", &url)]).await; + + let sid = handshake(&mut h).await; + let p = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{ "type": "text", "text": "hi" }], + }), + ) + .await; + + let updates = collect_updates_until_done(&mut h, p).await; + + let has_thought = updates + .iter() + .any(|u| u["sessionUpdate"] == "agent_thought_chunk"); + assert!( + !has_thought, + "expected no agent_thought_chunk for a plain text response, got: {updates:?}" + ); + + let has_message = updates + .iter() + .any(|u| u["sessionUpdate"] == "agent_message_chunk"); + assert!(has_message, "expected agent_message_chunk in updates"); + + h.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_cancel_notification_no_reply() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); From 04398b47db41e70629939e7545036e646fa26505 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Wed, 1 Jul 2026 14:30:16 -0400 Subject: [PATCH 2/2] fix(buzz-agent): extract reasoning from chat/completions; fix stale docs Per Thufir review pass 1: 1. parse_openai now extracts reasoning_content (DeepSeek) and reasoning (other compat hosts) from the message object. OPENAI_COMPAT_API=auto routes non-openai.com hosts to chat/completions, so self-hosted reasoning models (DeepSeek, vLLM) now emit agent_thought_chunk. 2. Fix remaining stale acp.rs struct-field doc (active_run_id field): was 'Goose-specific / other agents will never populate this'. Now correctly states goose AND buzz-agent both emit session_info_update with _meta.goose.activeRunId; other agents may leave it unset. 3. cargo fmt --all (golden_transcripts.rs:610 assert_eq! wrap). Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-acp/src/acp.rs | 13 +++-- crates/buzz-agent/src/llm.rs | 14 ++++- crates/buzz-agent/tests/golden_transcripts.rs | 58 ++++++++++++++++++- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/crates/buzz-acp/src/acp.rs b/crates/buzz-acp/src/acp.rs index 1dbd5af62..0ba5a8c6a 100644 --- a/crates/buzz-acp/src/acp.rs +++ b/crates/buzz-acp/src/acp.rs @@ -144,19 +144,20 @@ pub struct AcpClient { observer_agent_index: Option, /// Best-effort context attached to raw ACP wire events. observer_context: ObserverContext, - /// Goose-specific: most recently observed `_meta.goose.activeRunId` from - /// a `session/update` notification of kind `session_info_update`. + /// Most recently observed `_meta.goose.activeRunId` from a + /// `session/update` notification of kind `session_info_update`. /// - /// Goose emits this whenever it starts or clears an active prompt run + /// Both goose and buzz-agent emit `session_info_update` with this field; + /// goose emits it whenever it starts or clears an active prompt run /// (`crates/goose/src/acp/server.rs:2277` `send_active_run_update`). /// Required as `expectedRunId` when calling the non-standard /// `_goose/unstable/session/steer` method to inject a message into an /// in-flight turn without cancelling it. /// /// `None` until the first `session_info_update` arrives, or after the - /// run clears (goose emits `activeRunId: null` at end of turn). Other - /// agents will simply never populate this — readers must treat `None` - /// as "no active run to steer into" and fall back to cancel+merge. + /// run clears (goose/buzz-agent emit `activeRunId: null` at end of turn). + /// Other agents may leave this unset — readers must treat `None` as + /// "no active run to steer into" and fall back to cancel+merge. active_run_id: Option, /// Per-turn channel for receiving goose-native non-cancelling steer /// requests from the main loop. Installed by diff --git a/crates/buzz-agent/src/llm.rs b/crates/buzz-agent/src/llm.rs index e7a33c971..628449db2 100644 --- a/crates/buzz-agent/src/llm.rs +++ b/crates/buzz-agent/src/llm.rs @@ -831,6 +831,18 @@ fn parse_openai(v: Value) -> Result { .get("message") .ok_or_else(|| AgentError::Llm("missing message".into()))?; let text = str_field(msg, "content"); + // DeepSeek and vLLM-style OpenAI-compat hosts expose reasoning tokens on the + // message object. Prefer `reasoning_content` (DeepSeek's field name); fall + // back to `reasoning` (some other providers). Both are absent for standard + // OpenAI responses, which leaves this empty without any special-casing. + let reasoning = { + let rc = str_field(msg, "reasoning_content"); + if rc.is_empty() { + str_field(msg, "reasoning") + } else { + rc + } + }; let mut tool_calls = Vec::new(); if let Some(arr) = msg.get("tool_calls").and_then(Value::as_array) { for tc in arr { @@ -853,7 +865,7 @@ fn parse_openai(v: Value) -> Result { tool_calls, stop, input_tokens, - reasoning: String::new(), + reasoning, }) } diff --git a/crates/buzz-agent/tests/golden_transcripts.rs b/crates/buzz-agent/tests/golden_transcripts.rs index 3d5155a7b..4ac350346 100644 --- a/crates/buzz-agent/tests/golden_transcripts.rs +++ b/crates/buzz-agent/tests/golden_transcripts.rs @@ -610,7 +610,10 @@ async fn test_thought_chunk_emitted_before_message_chunk_anthropic() { .iter() .find(|u| u["sessionUpdate"] == "agent_thought_chunk") .unwrap(); - assert_eq!(thought["content"]["text"], "Let me reason about this carefully."); + assert_eq!( + thought["content"]["text"], + "Let me reason about this carefully." + ); let message = updates .iter() @@ -668,6 +671,59 @@ async fn test_thought_chunk_emitted_before_message_chunk_responses_api() { h.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_thought_chunk_emitted_before_message_chunk_chat_completions_reasoning_content() { + // OpenAI chat/completions path with DeepSeek-style `reasoning_content` field + // on the message object. OPENAI_COMPAT_API defaults to Auto, which routes + // non-openai.com hosts to chat/completions — this is the live path for + // self-hosted reasoning models (DeepSeek, vLLM, etc.). + let response = json!({ + "id": "cc-r1", "object": "chat.completion", "model": "fake-model", + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "Here is the answer.", + "reasoning_content": "Let me think through this step by step.", + }, + "finish_reason": "stop", + }], + }); + let url = spawn_fake_llm(vec![response]).await; + let mut h = Harness::spawn(&[("OPENAI_COMPAT_BASE_URL", &url)]).await; + + let sid = handshake(&mut h).await; + let p = h + .send( + "session/prompt", + json!({ + "sessionId": sid, + "prompt": [{ "type": "text", "text": "solve it" }], + }), + ) + .await; + + let updates = collect_updates_until_done(&mut h, p).await; + assert_thought_before_message(&updates); + + let thought = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_thought_chunk") + .unwrap(); + assert_eq!( + thought["content"]["text"], + "Let me think through this step by step." + ); + + let message = updates + .iter() + .find(|u| u["sessionUpdate"] == "agent_message_chunk") + .unwrap(); + assert_eq!(message["content"]["text"], "Here is the answer."); + + h.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_no_reasoning_no_thought_chunk() { // Plain text response with no reasoning content — no agent_thought_chunk