From aba7bdf45c4b6ffd40baf89ef0a70965ce74cfd5 Mon Sep 17 00:00:00 2001 From: klopez4212 Date: Tue, 30 Jun 2026 13:33:32 +0100 Subject: [PATCH] Add agent conversation runtime foundation --- crates/buzz-acp/src/lib.rs | 29 +- crates/buzz-acp/src/pool.rs | 438 ++++++------------ crates/buzz-acp/src/queue.rs | 238 ++++++++-- crates/buzz-acp/src/relay.rs | 20 +- crates/buzz-core/src/kind.rs | 3 + crates/buzz-relay/src/handlers/ingest.rs | 8 +- desktop/scripts/check-file-sizes.mjs | 44 +- .../src-tauri/src/commands/agent_config.rs | 1 + .../src-tauri/src/commands/agent_models.rs | 2 + .../src/commands/agent_profile_reconcile.rs | 32 ++ desktop/src-tauri/src/commands/agents.rs | 44 +- desktop/src-tauri/src/commands/messages.rs | 8 +- desktop/src-tauri/src/commands/mod.rs | 1 + .../src-tauri/src/managed_agents/discovery.rs | 42 +- .../src-tauri/src/managed_agents/restore.rs | 50 +- .../src-tauri/src/managed_agents/runtime.rs | 67 ++- .../src/managed_agents/runtime/tests.rs | 33 +- desktop/src/shared/api/tauri.ts | 2 + desktop/src/shared/constants/kinds.ts | 12 + .../channels/channel_messages_provider.dart | 5 +- mobile/lib/shared/relay/nostr_models.dart | 5 + 21 files changed, 661 insertions(+), 423 deletions(-) create mode 100644 desktop/src-tauri/src/commands/agent_profile_reconcile.rs diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 21fa41cac..f652d89c8 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -1753,21 +1753,6 @@ async fn tokio_main() -> Result<()> { // their sessions stripped when they return to the pool. removed_channels.insert(ch); typing_channels.remove(&ch); - // Best-effort: clean up πŸ‘€ on drained events. - // Note: the relay revokes membership before - // emitting the notification, so this DELETE may - // 403 on non-open channels. Stale πŸ‘€ in that - // case is a known limitation β€” fix belongs in - // the relay (clean up bot reactions on removal). - if !drained_ids.is_empty() { - let rc = ctx.rest_client.clone(); - let ids = drained_ids.clone(); - tokio::spawn(async move { - for eid in &ids { - pool::reaction_remove(&rc, eid, "πŸ‘€").await; - } - }); - } if !drained_ids.is_empty() || invalidated > 0 { tracing::info!( channel_id = %ch, @@ -1931,7 +1916,6 @@ async fn tokio_main() -> Result<()> { // Capture author pubkey before queue.push() moves // buzz_event.event (needed for mode gate below). let author_hex = buzz_event.event.pubkey.to_hex(); - let event_id_hex = buzz_event.event.id.to_hex(); // Clone for the non-cancelling steer fork, which // needs the event to render the steer body. The // clone is unconditional because we don't know @@ -1950,18 +1934,7 @@ async fn tokio_main() -> Result<()> { received_at: std::time::Instant::now(), prompt_tag, }); - // πŸ‘€ β€” immediate "seen" reaction, only if the event - // was actually queued (not dropped by DedupMode::Drop). - // Fire-and-forget: on rare fast-failure paths the - // guard's cleanup may race with this add, leaving a - // cosmetic stale πŸ‘€. Acceptable β€” see ReactionGuard docs. - if accepted { - let rc = ctx.rest_client.clone(); - let eid = event_id_hex.clone(); - tokio::spawn(async move { - pool::reaction_add(&rc, &eid, "πŸ‘€").await; - }); - } + // ── Multiple-event-handling mode gate ───────────── // Event is already queued. If mode requires it AND // the channel has an in-flight task, fire cancel β€” // OR take the non-cancelling (ACP steer) fork for Steer signals. diff --git a/crates/buzz-acp/src/pool.rs b/crates/buzz-acp/src/pool.rs index d6b6f82f9..55989b957 100644 --- a/crates/buzz-acp/src/pool.rs +++ b/crates/buzz-acp/src/pool.rs @@ -1046,15 +1046,7 @@ pub async fn run_prompt_task( }), ); - // Collects event IDs up front. On drop (any exit path β€” normal, early - // return, or panic), spawns best-effort cleanup of both πŸ‘€ and πŸ’¬. - // See `ReactionGuard` docs for ordering guarantees and known edge cases. - let reaction_ids: Vec = batch - .as_ref() - .map(|b| b.events.iter().map(|be| be.event.id.to_hex()).collect()) - .unwrap_or_default(); - let _reaction_guard = ReactionGuard::new(ctx.rest_client.clone(), reaction_ids.clone()); - + // ── Turn completion guard ───────────────────────────────────────────── // Emits `turn_completed` on any exit path. Captures observer handle and // metadata now, before the agent is moved into PromptResult. let _turn_guard = TurnCompletionGuard::new( @@ -1405,16 +1397,7 @@ pub async fn run_prompt_task( return; }; - // πŸ’¬ β€” fire-and-forget so the prompt fires immediately. - // The guard's cleanup (spawned on drop) removes πŸ’¬ after the turn completes. - // A brief race where πŸ’¬ appears slightly after the agent starts is acceptable. - if !reaction_ids.is_empty() { - let rest = ctx.rest_client.clone(); - let ids = reaction_ids.clone(); - tokio::spawn(async move { - react_working(&rest, &ids).await; - }); - } + // ── Send the actual prompt ──────────────────────────────────────────── // Slash-command pass-through sends the bare command as the first text // block (so connector detection fires), then each prompt section as its @@ -1730,7 +1713,6 @@ pub async fn run_prompt_task( }); } } - // _reaction_guard drops here β†’ spawns clear_reactions for all exit paths. } /// Retry wrapper for context fetches: one retry with `CONTEXT_FETCH_RETRY_DELAY` @@ -1847,7 +1829,14 @@ async fn fetch_conversation_context( let last_event = batch.events.last()?; let tags = crate::queue::parse_thread_tags(&last_event.event); if let Some(root_id) = tags.root_event_id { - return fetch_thread_context(batch.channel_id, &root_id, limit, &ctx.rest_client).await; + return fetch_thread_context( + batch.channel_id, + &root_id, + tags.agent_reply_event_id.as_deref(), + limit, + &ctx.rest_client, + ) + .await; } // DM non-reply: fetch recent conversation history. @@ -2012,16 +2001,14 @@ async fn fetch_prompt_profile_lookup( async fn fetch_thread_context( channel_id: Uuid, root_event_id: &str, + agent_reply_event_id: Option<&str>, limit: u32, rest: &RestClient, ) -> Option { use nostr::{Alphabet, SingleLetterTag}; // Defense-in-depth: validate hex event ID. - if root_event_id.is_empty() - || root_event_id.len() != 64 - || !root_event_id.chars().all(|c| c.is_ascii_hexdigit()) - { + if !is_valid_event_id_hex(root_event_id) { tracing::warn!( channel_id = %channel_id, "invalid root_event_id (expected 64 hex chars) β€” skipping thread context fetch" @@ -2033,7 +2020,7 @@ async fn fetch_thread_context( let h_tag = SingleLetterTag::lowercase(Alphabet::H); let ch_str = channel_id.to_string(); - // Two filters: (1) root event by ID, (2) replies with #e=root + #h=channel. + // Base filters: (1) root event by ID, (2) replies with #e=root + #h=channel. let root_filter = nostr::Filter::new().id(nostr::EventId::from_hex(root_event_id).ok()?); let replies_filter = nostr::Filter::new() .kinds([ @@ -2043,14 +2030,27 @@ async fn fetch_thread_context( .custom_tags(e_tag, [root_event_id]) .custom_tags(h_tag, [ch_str.as_str()]) .limit(limit as usize); + let mut filters = vec![root_filter, replies_filter]; + + if let Some(agent_reply_event_id) = agent_reply_event_id { + if is_valid_event_id_hex(agent_reply_event_id) && agent_reply_event_id != root_event_id { + if let Ok(event_id) = nostr::EventId::from_hex(agent_reply_event_id) { + // A selected task anchor may be outside the bounded prompt + // window. Fetch it independently, but require the same + // channel and thread root so forged client tags cannot route + // replies into a different conversation. + filters.push( + nostr::Filter::new() + .id(event_id) + .custom_tags(e_tag, [root_event_id]) + .custom_tags(h_tag, [ch_str.as_str()]), + ); + } + } + } fetch_with_retry(|| async { - match timeout( - CONTEXT_FETCH_TIMEOUT, - rest.query(&[root_filter.clone(), replies_filter.clone()]), - ) - .await - { + match timeout(CONTEXT_FETCH_TIMEOUT, rest.query(&filters)).await { Ok(Ok(json)) => parse_nostr_thread_response(json, root_event_id), Ok(Err(e)) => { tracing::warn!( @@ -2073,6 +2073,10 @@ async fn fetch_thread_context( .await } +fn is_valid_event_id_hex(event_id: &str) -> bool { + event_id.len() == 64 && event_id.chars().all(|c| c.is_ascii_hexdigit()) +} + /// Fetch DM context via Nostr query: recent messages in channel by `#h` tag. async fn fetch_dm_context( channel_id: Uuid, @@ -2195,6 +2199,11 @@ fn parse_dm_response(json: serde_json::Value, limit: u32) -> Option Option { + let event_id = obj + .get("id") + .or_else(|| obj.get("event_id")) + .and_then(|v| v.as_str()) + .map(str::to_string); let content = obj.get("content").and_then(|v| v.as_str())?; let pubkey = obj .get("pubkey") @@ -2215,6 +2224,7 @@ fn json_to_context_message(obj: &serde_json::Value) -> Option { .unwrap_or_else(|| "unknown".to_string()); Some(ContextMessage { + event_id, pubkey: pubkey.to_string(), timestamp, content: content.to_string(), @@ -2232,13 +2242,16 @@ fn parse_nostr_thread_response( let events = json.as_array()?; let mut root_msg = None; let mut reply_msgs = Vec::new(); + let mut seen_reply_ids = HashSet::new(); for ev in events { let ev_id = ev.get("id").and_then(|v| v.as_str()).unwrap_or(""); if let Some(msg) = json_to_context_message(ev) { if ev_id == root_event_id { root_msg = Some(msg); - } else { + } else if event_references_thread_root(ev, root_event_id) + && seen_reply_ids.insert(ev_id.to_string()) + { reply_msgs.push(( ev.get("created_at").and_then(|v| v.as_u64()).unwrap_or(0), msg, @@ -2268,6 +2281,19 @@ fn parse_nostr_thread_response( }) } +fn event_references_thread_root(ev: &serde_json::Value, root_event_id: &str) -> bool { + ev.get("tags") + .and_then(|tags| tags.as_array()) + .is_some_and(|tags| { + tags.iter().any(|tag| { + tag.as_array().is_some_and(|parts| { + parts.first().and_then(|part| part.as_str()) == Some("e") + && parts.get(1).and_then(|part| part.as_str()) == Some(root_event_id) + }) + }) + }) +} + /// Parse a Nostr query response (array of events) into DM context. /// /// Events arrive in relay order (newest first); reversed to chronological. @@ -2361,67 +2387,7 @@ fn log_stop_reason(source: &PromptSource, stop_reason: &StopReason) { } } -// -// Two-phase lifecycle visible to users: -// πŸ‘€ "seen" β€” event was queued and an agent will handle it -// πŸ’¬ "working" β€” agent is actively prompting -// -// πŸ’¬ is awaited inline in `run_prompt_task` before the prompt fires, so -// add-before-remove ordering is structural. πŸ‘€ is fire-and-forget from -// `main.rs` at queue-push time for immediate responsiveness; on rare -// fast-failure paths the guard's cleanup may race with the πŸ‘€ add, -// leaving a cosmetic stale πŸ‘€ (see `ReactionGuard` docs). -// -// Cleanup is fire-and-forget via `ReactionGuard` (spawned on drop). -// Failures are debug-logged and ignored β€” reactions are cosmetic. - -/// Drop guard that spawns reaction cleanup on any exit path. -/// -/// Created at the top of `run_prompt_task`. On drop β€” normal return, early -/// return, or panic β€” spawns fire-and-forget removal of both πŸ‘€ and πŸ’¬. -/// -/// ## Ordering -/// -/// πŸ’¬ (`react_working`) is fire-and-forget (spawned before the prompt fires). -/// A brief race where πŸ’¬ appears slightly after the agent starts is acceptable. -/// -/// πŸ‘€ (`react_seen`) is fire-and-forget from `main.rs` at queue-push time. -/// On rare fast-failure paths (e.g., `session_new` error on an idle agent), -/// the cleanup spawn may race with the πŸ‘€ add, leaving a stale πŸ‘€. This is -/// accepted as a cosmetic edge case β€” the message will be retried and the -/// stale πŸ‘€ is harmless. -struct ReactionGuard { - rest: Option, - ids: Vec, -} - -impl ReactionGuard { - fn new(rest: crate::relay::RestClient, ids: Vec) -> Self { - Self { - rest: if ids.is_empty() { None } else { Some(rest) }, - ids, - } - } -} - -impl Drop for ReactionGuard { - fn drop(&mut self) { - // Guard against drop outside a tokio runtime (e.g., in unit tests or - // during process teardown before the runtime is fully initialized). - // `run_prompt_task` is always spawned via `JoinSet::spawn`, so a - // runtime handle is normally available; `try_current` is the safe - // fallback for the rare cases it isn't. - if let Some(rest) = self.rest.take() { - let ids = std::mem::take(&mut self.ids); - if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.spawn(clear_reactions(rest, ids)); - } - // If no runtime is available, reactions are left as-is β€” they are - // cosmetic indicators and the stale state is harmless. - } - } -} - +// ── Turn liveness emission ─────────────────────────────────────────────────── // Periodically emits a `turn_liveness` observer event while a turn is in-flight, // so the desktop can prune turns whose host died without unwinding (kill -9 / // crash) far sooner than the no-activity backstop. Runs as a non-resolving @@ -2506,178 +2472,7 @@ impl Drop for TurnCompletionGuard { } } -const REACTION_SEEN: &str = "πŸ‘€"; -const REACTION_WORKING: &str = "πŸ’¬"; - -/// Best-effort timeout for a single reaction REST call. -const REACTION_TIMEOUT: Duration = Duration::from_millis(500); - -/// Percent-encode a string for use in a URL path segment (used in tests only). -#[cfg(test)] -fn pct_encode(s: &str) -> String { - let mut out = String::with_capacity(s.len() * 3); - for byte in s.bytes() { - match byte { - b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { - out.push(byte as char); - } - _ => { - use std::fmt::Write; - let _ = write!(out, "%{byte:02X}"); - } - } - } - out -} - -/// Best-effort: add a reaction via a signed Nostr kind-7 event (NIP-25). -/// -/// Builds a reaction event with `buzz_sdk::build_reaction`, signs it with -/// the keys already stored in `RestClient`, and submits via `POST /events`. -/// Returns immediately on timeout or any error β€” reactions are cosmetic. -pub(crate) async fn reaction_add(rest: &crate::relay::RestClient, event_id: &str, emoji: &str) { - let target_id = match nostr::EventId::from_hex(event_id) { - Ok(id) => id, - Err(e) => { - tracing::debug!(event_id, emoji, "reaction add: invalid event ID: {e}"); - return; - } - }; - let builder = match buzz_sdk::build_reaction(target_id, emoji) { - Ok(b) => b, - Err(e) => { - tracing::warn!(event_id, emoji, "reaction add: build failed: {e}"); - return; - } - }; - let event = match builder.sign_with_keys(&rest.keys) { - Ok(e) => e, - Err(e) => { - tracing::warn!(event_id, emoji, "reaction add: sign failed: {e}"); - return; - } - }; - match tokio::time::timeout(REACTION_TIMEOUT, rest.submit_event(&event)).await { - Ok(Ok(_)) => {} - Ok(Err(e)) => tracing::debug!(event_id, emoji, "reaction add failed: {e}"), - Err(_) => tracing::debug!(event_id, emoji, "reaction add timed out"), - } -} - -/// Best-effort: remove a reaction via a signed kind:5 (NIP-09) deletion event. -/// -/// Queries kind:7 reactions by our pubkey targeting the event, finds the matching -/// emoji, then submits a signed kind:5 deletion via `POST /events`. -/// Returns immediately on timeout or any error β€” reactions are cosmetic. -pub(crate) async fn reaction_remove(rest: &crate::relay::RestClient, event_id: &str, emoji: &str) { - use nostr::{Alphabet, SingleLetterTag}; - - // Step 1: query our kind:7 reactions targeting this event. - let my_pubkey = rest.keys.public_key(); - let e_tag = SingleLetterTag::lowercase(Alphabet::E); - let filter = nostr::Filter::new() - .kind(nostr::Kind::Reaction) - .author(my_pubkey) - .custom_tags(e_tag, [event_id]); - - let resp = match tokio::time::timeout(Duration::from_millis(1_000), rest.query(&[filter])).await - { - Ok(Ok(v)) => v, - Ok(Err(e)) => { - tracing::debug!(event_id, emoji, "reaction remove: query failed: {e}"); - return; - } - Err(_) => { - tracing::debug!(event_id, emoji, "reaction remove: query timed out"); - return; - } - }; - - // Find our reaction event with matching emoji content. - let reid = resp.as_array().and_then(|events| { - events.iter().find_map(|ev| { - let content = ev.get("content")?.as_str()?; - if content != emoji { - return None; - } - ev.get("id")?.as_str().map(|s| s.to_string()) - }) - }); - - let reid = match reid { - Some(id) => id, - None => { - tracing::debug!(event_id, emoji, "reaction remove: no reaction event found"); - return; - } - }; - - // Step 2: build and submit a signed kind:5 deletion for the reaction event. - let target_id = match nostr::EventId::from_hex(&reid) { - Ok(id) => id, - Err(e) => { - tracing::debug!( - event_id, - emoji, - "reaction remove: invalid reaction event ID: {e}" - ); - return; - } - }; - let builder = match buzz_sdk::build_remove_reaction(target_id) { - Ok(b) => b, - Err(e) => { - tracing::warn!(event_id, emoji, "reaction remove: build failed: {e}"); - return; - } - }; - let event = match builder.sign_with_keys(&rest.keys) { - Ok(e) => e, - Err(e) => { - tracing::warn!(event_id, emoji, "reaction remove: sign failed: {e}"); - return; - } - }; - match tokio::time::timeout(Duration::from_millis(1_000), rest.submit_event(&event)).await { - Ok(Ok(_)) => {} - Ok(Err(e)) => tracing::debug!(event_id, emoji, "reaction remove failed: {e}"), - Err(_) => tracing::debug!(event_id, emoji, "reaction remove timed out"), - } -} - -/// Maximum concurrent reaction HTTP requests per fan-out call. -/// Prevents unbounded parallelism when a large batch of events arrives. -const REACTION_CONCURRENCY: usize = 10; - -/// Add πŸ’¬ to all events, capped at `REACTION_CONCURRENCY` concurrent requests. -/// Awaited inline before the prompt fires. -async fn react_working(rest: &crate::relay::RestClient, event_ids: &[String]) { - for chunk in event_ids.chunks(REACTION_CONCURRENCY) { - futures_util::future::join_all( - chunk - .iter() - .map(|eid| reaction_add(rest, eid, REACTION_WORKING)), - ) - .await; - } -} - -/// Fire-and-forget: remove both πŸ‘€ and πŸ’¬ from all events. Spawned on turn complete. -/// Capped at `REACTION_CONCURRENCY` concurrent requests per chunk to avoid -/// unbounded HTTP fan-out on large batches. -async fn clear_reactions(rest: crate::relay::RestClient, event_ids: Vec) { - // Each event needs two removals (πŸ‘€ and πŸ’¬); pair them and chunk by - // REACTION_CONCURRENCY pairs so the total concurrent requests stay bounded. - for chunk in event_ids.chunks(REACTION_CONCURRENCY) { - futures_util::future::join_all(chunk.iter().flat_map(|eid| { - [ - reaction_remove(&rest, eid, REACTION_SEEN), - reaction_remove(&rest, eid, REACTION_WORKING), - ] - })) - .await; - } -} +// ─── Unit Tests ────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { @@ -2901,6 +2696,81 @@ mod tests { assert!(parse_thread_response(json).is_none()); } + #[test] + fn test_parse_nostr_thread_response_keeps_independently_fetched_anchor() { + let root_id = "a".repeat(64); + let reply_id = "b".repeat(64); + let anchor_id = "c".repeat(64); + let json = json!([ + { + "id": anchor_id.clone(), + "pubkey": "agent", + "content": "selected task anchor", + "created_at": 1710518520_u64, + "tags": [["e", root_id.clone(), "", "reply"]] + }, + { + "id": root_id.clone(), + "pubkey": "human", + "content": "root message", + "created_at": 1710518400_u64, + "tags": [] + }, + { + "id": reply_id.clone(), + "pubkey": "agent", + "content": "visible reply", + "created_at": 1710518460_u64, + "tags": [["e", root_id.clone(), "", "reply"]] + } + ]); + + let ctx = parse_nostr_thread_response(json, &root_id).expect("should parse"); + + match ctx { + ConversationContext::Thread { messages, .. } => { + assert_eq!(messages.len(), 3); + assert_eq!(messages[0].content, "root message"); + assert_eq!(messages[1].content, "visible reply"); + assert_eq!(messages[2].content, "selected task anchor"); + } + _ => panic!("expected Thread context"), + } + } + + #[test] + fn test_parse_nostr_thread_response_rejects_wrong_root_anchor() { + let root_id = "a".repeat(64); + let forged_anchor_id = "c".repeat(64); + let other_root_id = "d".repeat(64); + let json = json!([ + { + "id": root_id.clone(), + "pubkey": "human", + "content": "root message", + "created_at": 1710518400_u64, + "tags": [] + }, + { + "id": forged_anchor_id.clone(), + "pubkey": "agent", + "content": "wrong thread", + "created_at": 1710518520_u64, + "tags": [["e", other_root_id.clone(), "", "reply"]] + } + ]); + + let ctx = parse_nostr_thread_response(json, &root_id).expect("should parse"); + + match ctx { + ConversationContext::Thread { messages, .. } => { + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].content, "root message"); + } + _ => panic!("expected Thread context"), + } + } + #[test] fn test_parse_dm_response_basic() { let json = json!({ @@ -3066,6 +2936,7 @@ mod tests { }; let context = ConversationContext::Thread { messages: vec![ContextMessage { + event_id: None, pubkey: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".into(), timestamp: "2026-03-25T05:51:25Z".into(), content: "follow up".into(), @@ -3146,40 +3017,7 @@ mod tests { assert_eq!(msg.pubkey, "unknown"); } - #[test] - fn test_pct_encode_hex_passthrough() { - let hex = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"; - assert_eq!(pct_encode(hex), hex); - } - - #[test] - fn test_pct_encode_emoji() { - // πŸ‘€ = U+1F440 = F0 9F 91 80 in UTF-8 - assert_eq!(pct_encode("πŸ‘€"), "%F0%9F%91%80"); - } - - #[test] - fn test_pct_encode_emoji_speech_balloon() { - // πŸ’¬ = U+1F4AC = F0 9F 92 AC in UTF-8 - assert_eq!(pct_encode("πŸ’¬"), "%F0%9F%92%AC"); - } - - #[test] - fn test_pct_encode_empty() { - assert_eq!(pct_encode(""), ""); - } - - #[test] - fn test_pct_encode_unreserved_passthrough() { - assert_eq!(pct_encode("AZaz09-_.~"), "AZaz09-_.~"); - } - - #[test] - fn test_pct_encode_reserved_chars() { - assert_eq!(pct_encode("/"), "%2F"); - assert_eq!(pct_encode("+"), "%2B"); - assert_eq!(pct_encode(" "), "%20"); - } + // ── SessionState tests ─────────────────────────────────────────────── fn make_state() -> (SessionState, Uuid, Uuid) { let ch_a = Uuid::new_v4(); diff --git a/crates/buzz-acp/src/queue.rs b/crates/buzz-acp/src/queue.rs index 2fb1acfd2..94b72d031 100644 --- a/crates/buzz-acp/src/queue.rs +++ b/crates/buzz-acp/src/queue.rs @@ -559,7 +559,7 @@ impl EventQueue { /// Also clears any `retry_after` throttle for the channel. /// /// Returns the event IDs of dropped events so the caller can clean up - /// any reactions (πŸ‘€) that were added at queue-push time. + /// channel-scoped side effects if needed. pub fn drain_channel(&mut self, channel_id: Uuid) -> Vec { let ids = self .queues @@ -769,6 +769,9 @@ pub struct ThreadTags { pub root_event_id: Option, /// Parent event ID (hex). For direct replies to root, equals root. pub parent_event_id: Option, + /// Dedicated agent-conversation anchor, when a client asks follow-ups to + /// stay attached to a selected task instead of the flat thread root. + pub agent_reply_event_id: Option, /// Mentioned pubkeys from `p` tags (hex). pub mentioned_pubkeys: Vec, } @@ -779,6 +782,8 @@ pub struct ThreadTags { /// - Find an `e` tag with `root` marker β†’ its value is `root_event_id` /// - Find an `e` tag with `reply` marker β†’ its value is `parent_event_id` /// - If only `reply` marker found (direct reply to root), root == parent +/// - Find a client task marker (`["client", "agent-conversation", id]`) β†’ +/// its value is `agent_reply_event_id` /// - `p` tags β†’ mentioned pubkeys /// /// NOTE: Only handles NIP-10 marker-based format (preferred). The deprecated @@ -787,6 +792,7 @@ pub struct ThreadTags { pub fn parse_thread_tags(event: &Event) -> ThreadTags { let mut root = None; let mut reply = None; + let mut agent_reply = None; let mut mentions = Vec::new(); for tag in event.tags.iter() { @@ -804,6 +810,12 @@ pub fn parse_thread_tags(event: &Event) -> ThreadTags { Some("p") if parts.len() >= 2 => { mentions.push(parts[1].clone()); } + Some("client") + if parts.len() >= 3 + && parts.get(1).map(|value| value.as_str()) == Some("agent-conversation") => + { + agent_reply = parts.get(2).cloned(); + } _ => {} } } @@ -820,6 +832,7 @@ pub fn parse_thread_tags(event: &Event) -> ThreadTags { ThreadTags { root_event_id, parent_event_id, + agent_reply_event_id: agent_reply, mentioned_pubkeys: mentions, } } @@ -923,6 +936,7 @@ pub enum ConversationContext { /// A single message in a conversation context section. #[derive(Debug, Clone)] pub struct ContextMessage { + pub event_id: Option, pub pubkey: String, pub timestamp: String, pub content: String, @@ -1061,6 +1075,9 @@ pub(crate) fn format_event_block( if let Some(ref r) = thread.root_event_id { parsed_parts.push(format!("root={r}")); } + if let Some(ref agent_reply) = thread.agent_reply_event_id { + parsed_parts.push(format!("agent_reply={agent_reply}")); + } if !thread.mentioned_pubkeys.is_empty() { parsed_parts.push(format!( "mentions=[{}]", @@ -1081,15 +1098,15 @@ pub(crate) fn format_event_block( /// Append a reply instruction when the agent is responding to a thread event. /// -/// Tells the agent to default to `--reply-to ` for ordinary replies -/// while still allowing an explicit human request to post at the channel root or -/// top level. -fn append_reply_instruction(s: &mut String, event_id: &str) { +/// Tells the agent to pass `--reply-to ` on every `buzz +/// messages send` call for ordinary replies, while still allowing an explicit +/// human request to post at the channel root or top level. +fn append_reply_instruction(s: &mut String, thread_root_id: &str) { s.push_str(&format!( - "\nIMPORTANT: For ordinary replies in this turn, use `--reply-to {event_id}` \ - on `buzz messages send` so the conversation stays threaded. \ - If the human explicitly asks for a channel-root, top-level, \ - or broadcast post, send that message without `--reply-to`. \ + "\nIMPORTANT: For ordinary replies in this turn, use `--reply-to {thread_root_id}` \ + on `buzz messages send` so the conversation stays in the thread without adding \ + another visible nesting level. If the human explicitly asks for a channel-root, \ + top-level, or broadcast post, send that message without `--reply-to`. \ If the requested destination is ambiguous, ask before sending." )); } @@ -1136,11 +1153,37 @@ fn turn_is_human_facing( thread_tags.mentioned_pubkeys.iter().any(|pk| !is_agent(pk)) } +fn context_contains_event_id(context: Option<&ConversationContext>, event_id: &str) -> bool { + let messages = match context { + Some(ConversationContext::Thread { messages, .. }) + | Some(ConversationContext::Dm { messages, .. }) => messages, + None => return false, + }; + + messages + .iter() + .any(|message| message.event_id.as_deref() == Some(event_id)) +} + +fn resolve_valid_agent_conversation_anchor( + thread_tags: &ThreadTags, + conversation_context: Option<&ConversationContext>, +) -> Option { + let agent_reply_event_id = thread_tags.agent_reply_event_id.as_deref()?; + if context_contains_event_id(conversation_context, agent_reply_event_id) { + return Some(agent_reply_event_id.to_string()); + } + + None +} + /// Resolve the `--reply-to` anchor for a non-DM turn. /// /// Returns `Some(id)` only for human-facing turns (see [`turn_is_human_facing`]): -/// - in a thread β†’ the thread ROOT, keeping the reply flat at layer 1 -/// - top-level β†’ the triggering event id, which becomes the new thread root +/// - dedicated task β†’ the selected agent reply anchor, after validating it +/// exists in the fetched context for this thread/task +/// - in a thread β†’ the thread ROOT, keeping the reply flat at layer 1 +/// - top-level β†’ the triggering event id, which becomes the new thread root /// /// Returns `None` for agent↔agent turns, leaving the agent free to nest deeply /// (intentional for agent coordination). @@ -1148,15 +1191,15 @@ fn resolve_reply_anchor( sender_pubkey: &str, thread_tags: &ThreadTags, triggering_event_id: &str, + conversation_context: Option<&ConversationContext>, profile_lookup: Option<&PromptProfileLookup>, ) -> Option { if !turn_is_human_facing(sender_pubkey, thread_tags, profile_lookup) { return None; } Some( - thread_tags - .root_event_id - .clone() + resolve_valid_agent_conversation_anchor(thread_tags, conversation_context) + .or_else(|| thread_tags.root_event_id.clone()) .unwrap_or_else(|| triggering_event_id.to_string()), ) } @@ -1375,6 +1418,7 @@ pub fn format_prompt(batch: &FlushBatch, args: &FormatPromptArgs<'_>) -> Vec) -> Vec, mentions: &[&str]) -> ThreadTags { ThreadTags { + agent_reply_event_id: None, root_event_id: root.map(str::to_string), parent_event_id: root.map(str::to_string), mentioned_pubkeys: mentions.iter().map(|s| s.to_string()).collect(), @@ -3118,7 +3186,7 @@ mod tests { fn test_anchor_human_in_thread_uses_root() { // Human asks inside a thread β†’ anchor to the thread ROOT (flat at L1). let tags = thread_tags(Some(ROOT_ID), &[AGENT_A_PK]); - let anchor = resolve_reply_anchor(HUMAN_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(HUMAN_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor.as_deref(), Some(ROOT_ID)); } @@ -3126,7 +3194,7 @@ mod tests { fn test_anchor_human_top_level_uses_triggering_event() { // Human top-level mention (no thread tags) β†’ triggering event is root. let tags = thread_tags(None, &[AGENT_A_PK]); - let anchor = resolve_reply_anchor(HUMAN_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(HUMAN_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor.as_deref(), Some(TRIGGER_ID)); } @@ -3134,14 +3202,14 @@ mod tests { fn test_anchor_agent_to_agent_in_thread_is_none() { // Agent pings agent inside a thread β†’ no forced anchor (deep nesting ok). let tags = thread_tags(Some(ROOT_ID), &[AGENT_B_PK]); - let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor, None); } #[test] fn test_anchor_agent_to_agent_top_level_is_none() { let tags = thread_tags(None, &[AGENT_B_PK]); - let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor, None); } @@ -3149,7 +3217,7 @@ mod tests { fn test_anchor_agent_sender_but_human_tagged_flattens() { // Agent-authored, but a human is tagged β†’ human-facing β†’ anchor to root. let tags = thread_tags(Some(ROOT_ID), &[AGENT_B_PK, HUMAN_PK]); - let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor.as_deref(), Some(ROOT_ID)); } @@ -3157,7 +3225,7 @@ mod tests { fn test_anchor_unknown_identity_treated_as_human() { // No profile lookup β†’ fail open (treat as human so visibility is kept). let tags = thread_tags(Some(ROOT_ID), &[]); - let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None); + let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None, None); assert_eq!(anchor.as_deref(), Some(ROOT_ID)); } @@ -3166,7 +3234,7 @@ mod tests { // Raw p-tag presence must NOT flatten when every tagged pubkey is an // agent β€” this is the regression Pinky flagged. let tags = thread_tags(Some(ROOT_ID), &[AGENT_A_PK, AGENT_B_PK]); - let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, Some(&id_lookup())); + let anchor = resolve_reply_anchor(AGENT_A_PK, &tags, TRIGGER_ID, None, Some(&id_lookup())); assert_eq!(anchor, None); } @@ -3217,6 +3285,7 @@ mod tests { // Thread context fetched (as the fetch path does for DM replies). let ctx = ConversationContext::Thread { messages: vec![ContextMessage { + event_id: None, pubkey: "npub1xyz".into(), timestamp: "2026-03-15T16:30:00Z".into(), content: "Should I deploy?".into(), @@ -3743,15 +3812,128 @@ mod tests { ); } + #[test] + fn test_reply_instruction_prefers_agent_conversation_anchor() { + let ch = Uuid::new_v4(); + let root_id = "a".repeat(64); + let parent_id = "b".repeat(64); + let agent_reply_id = "c".repeat(64); + let event = make_event_with_tags( + "@bot keep going", + vec![ + vec!["e".into(), root_id.clone(), "".into(), "root".into()], + vec!["e".into(), parent_id.clone(), "".into(), "reply".into()], + vec![ + "client".into(), + "agent-conversation".into(), + agent_reply_id.clone(), + ], + ], + ); + let batch = FlushBatch { + channel_id: ch, + events: vec![BatchEvent { + event, + prompt_tag: "@mention".into(), + received_at: Instant::now(), + }], + cancelled_events: vec![], + cancel_reason: None, + }; + let ctx = ConversationContext::Thread { + messages: vec![ContextMessage { + event_id: Some(agent_reply_id.clone()), + pubkey: "agent".into(), + timestamp: "2026-03-15T16:30:00Z".into(), + content: "I'll take this into a task.".into(), + }], + total: 1, + truncated: false, + }; + + let prompt = format_prompt( + &batch, + &FormatPromptArgs { + conversation_context: Some(&ctx), + ..Default::default() + }, + ) + .join("\n\n"); + assert!( + prompt.contains(&format!("--reply-to {agent_reply_id}")), + "dedicated task follow-up should anchor agent reply to the selected task" + ); + assert!( + !prompt.contains(&format!("--reply-to {root_id}")), + "dedicated task follow-up should not fall back to the flat thread root" + ); + } + + #[test] + fn test_reply_instruction_rejects_unvalidated_agent_conversation_anchor() { + let ch = Uuid::new_v4(); + let root_id = "a".repeat(64); + let parent_id = "b".repeat(64); + let forged_agent_reply_id = "c".repeat(64); + let event = make_event_with_tags( + "@bot keep going", + vec![ + vec!["e".into(), root_id.clone(), "".into(), "root".into()], + vec!["e".into(), parent_id.clone(), "".into(), "reply".into()], + vec![ + "client".into(), + "agent-conversation".into(), + forged_agent_reply_id.clone(), + ], + ], + ); + let batch = FlushBatch { + channel_id: ch, + events: vec![BatchEvent { + event, + prompt_tag: "@mention".into(), + received_at: Instant::now(), + }], + cancelled_events: vec![], + cancel_reason: None, + }; + let ctx = ConversationContext::Thread { + messages: vec![ContextMessage { + event_id: Some(parent_id), + pubkey: "human".into(), + timestamp: "2026-03-15T16:30:00Z".into(), + content: "Keep going.".into(), + }], + total: 1, + truncated: false, + }; + + let prompt = format_prompt( + &batch, + &FormatPromptArgs { + conversation_context: Some(&ctx), + ..Default::default() + }, + ) + .join("\n\n"); + assert!( + prompt.contains(&format!("--reply-to {root_id}")), + "unvalidated task anchors should fall back to the flat thread root" + ); + assert!( + !prompt.contains(&format!("--reply-to {forged_agent_reply_id}")), + "forged task anchors must not route the agent into another thread" + ); + } + #[test] fn test_reply_instruction_present_for_dm_thread_reply() { let ch = Uuid::new_v4(); let root_id = "b".repeat(64); let event = make_event_with_tags( "thanks", - vec![vec!["e".into(), root_id, "".into(), "reply".into()]], + vec![vec!["e".into(), root_id.clone(), "".into(), "reply".into()]], ); - let event_id = event.id.to_hex(); let batch = FlushBatch { channel_id: ch, events: vec![BatchEvent { @@ -3776,7 +3958,7 @@ mod tests { ) .join("\n\n"); assert!( - prompt.contains(&format!("--reply-to {event_id}")), + prompt.contains(&format!("--reply-to {root_id}")), "DM thread reply should include reply instruction" ); } diff --git a/crates/buzz-acp/src/relay.rs b/crates/buzz-acp/src/relay.rs index 0f26fc5d4..815b65c48 100644 --- a/crates/buzz-acp/src/relay.rs +++ b/crates/buzz-acp/src/relay.rs @@ -161,8 +161,7 @@ fn merge_discovered_channels( /// Extracted from `HarnessRelay` fields so it can be shared (via `Arc`) with /// spawned prompt tasks without giving them access to the WebSocket. /// -/// All reads go through `POST /query` with NIP-98 auth. Event submission goes -/// through `POST /events` with NIP-98 auth. +/// All reads go through `POST /query` with NIP-98 auth. #[derive(Debug, Clone)] pub struct RestClient { pub http: reqwest::Client, @@ -338,23 +337,6 @@ impl RestClient { .await .map_err(|e| RelayError::Http(e.to_string())) } - - /// Submit a signed event via the HTTP bridge: `POST /events` with NIP-98 auth. - /// - /// The event must already be signed. Returns the relay response JSON. - pub async fn submit_event(&self, event: &Event) -> Result { - let body_bytes = serde_json::to_vec(event) - .map_err(|e| RelayError::Http(format!("event serialize error: {e}")))?; - let resp = self.bridge_post("/events", &body_bytes).await?; - let text = resp - .text() - .await - .map_err(|e| RelayError::Http(e.to_string()))?; - if text.is_empty() { - return Ok(Value::Null); - } - serde_json::from_str(&text).map_err(|e| RelayError::Http(e.to_string())) - } } /// Events the harness cares about. diff --git a/crates/buzz-core/src/kind.rs b/crates/buzz-core/src/kind.rs index f2e918424..abf31dc27 100644 --- a/crates/buzz-core/src/kind.rs +++ b/crates/buzz-core/src/kind.rs @@ -297,6 +297,8 @@ pub const KIND_STREAM_MESSAGE_SCHEDULED: u32 = 40006; pub const KIND_STREAM_REMINDER: u32 = 40007; /// A diff/patch message showing file changes (unified diff format). pub const KIND_STREAM_MESSAGE_DIFF: u32 = 40008; +/// Shared marker that a channel thread has a focused agent conversation view. +pub const KIND_AGENT_CONVERSATION: u32 = 40010; /// Canvas (shared document) for a channel. pub const KIND_CANVAS: u32 = 40100; /// System message for channel state changes (join, leave, rename, etc.). @@ -486,6 +488,7 @@ pub const ALL_KINDS: &[u32] = &[ KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_REMINDER, KIND_STREAM_MESSAGE_DIFF, + KIND_AGENT_CONVERSATION, KIND_CANVAS, KIND_SYSTEM_MESSAGE, KIND_CHANNEL_SUMMARY, diff --git a/crates/buzz-relay/src/handlers/ingest.rs b/crates/buzz-relay/src/handlers/ingest.rs index e41d2717f..4cbfca4a0 100644 --- a/crates/buzz-relay/src/handlers/ingest.rs +++ b/crates/buzz-relay/src/handlers/ingest.rs @@ -12,9 +12,9 @@ use uuid::Uuid; use buzz_auth::Scope; use buzz_core::kind::{ event_kind_u32, is_identity_archive_request_kind, is_parameterized_replaceable, - is_relay_admin_kind, KIND_AGENT_ENGRAM, KIND_AGENT_PROFILE, KIND_APPROVAL_DENY, - KIND_APPROVAL_GRANT, KIND_AUTH, KIND_BOOKMARK_LIST, KIND_BOOKMARK_SET, KIND_CANVAS, - KIND_CONTACT_LIST, KIND_DELETION, KIND_DM_ADD_MEMBER, KIND_DM_HIDE, KIND_DM_OPEN, + is_relay_admin_kind, KIND_AGENT_CONVERSATION, KIND_AGENT_ENGRAM, KIND_AGENT_PROFILE, + KIND_APPROVAL_DENY, KIND_APPROVAL_GRANT, KIND_AUTH, KIND_BOOKMARK_LIST, KIND_BOOKMARK_SET, + KIND_CANVAS, KIND_CONTACT_LIST, KIND_DELETION, KIND_DM_ADD_MEMBER, KIND_DM_HIDE, KIND_DM_OPEN, KIND_EMOJI_LIST, KIND_EMOJI_SET, KIND_EVENT_REMINDER, KIND_FOLLOW_SET, KIND_FORUM_COMMENT, KIND_FORUM_POST, KIND_FORUM_VOTE, KIND_GIFT_WRAP, KIND_GIT_ISSUE, KIND_GIT_PATCH, KIND_GIT_PR_UPDATE, KIND_GIT_PULL_REQUEST, KIND_GIT_REPO_ANNOUNCEMENT, KIND_GIT_REPO_STATE, @@ -182,6 +182,7 @@ fn required_scope_for_kind(kind: u32, event: &Event) -> Result Ok(Scope::MessagesWrite), @@ -391,6 +392,7 @@ pub(crate) fn requires_h_channel_scope(kind: u32) -> bool { | KIND_STREAM_MESSAGE_SCHEDULED | KIND_STREAM_REMINDER | KIND_STREAM_MESSAGE_DIFF + | KIND_AGENT_CONVERSATION | KIND_CANVAS | KIND_FORUM_POST | KIND_FORUM_VOTE diff --git a/desktop/scripts/check-file-sizes.mjs b/desktop/scripts/check-file-sizes.mjs index 69f19f664..06e4e9f4b 100644 --- a/desktop/scripts/check-file-sizes.mjs +++ b/desktop/scripts/check-file-sizes.mjs @@ -60,7 +60,10 @@ const overrides = new Map([ // config-bridge: get_agent_config_surface/write_agent_config_field/put_agent_session_config // commands add ~40 lines. Queued to split. // branch cut; override bumped to cover the merged total. Queued to split. - ["src-tauri/src/commands/agents.rs", 1437], + // continued-agent-conversations: refreshes the owner auth tag before + // starting/restoring/deploying agents so staged identities keep working. + // latest-main rebase adds the config-bridge and task-review fixes together. + ["src-tauri/src/commands/agents.rs", 1467], // Residual repos_dir integration in ensure_nest_at: REPOS is provisioned // outside NEST_DIRS (it may be a symlink), so it needs its own create + // chmod-only-when-real-dir handling plus integration test coverage. The @@ -73,7 +76,12 @@ const overrides = new Map([ // unify refactor followup. +26 for resolve_effective_prompt_model_provider // re-introduced after 826d735fe removal (config-bridge caller still needs it). // PGID resolution helper + PID-recycling safety guard added for orphan sweep. - ["src-tauri/src/managed_agents/runtime.rs", 2150], + // continued-agent-conversations: owner-scoped auth tag refresh is threaded + // through the runtime env builder and covered by regression tests. + // latest-main rebase adds the config-bridge and task-review fixes together. + // latest main added runtime restore plumbing on top of the task anchor review fixes. + ["src-tauri/src/managed_agents/runtime.rs", 2174], + ["src-tauri/src/managed_agents/personas.rs", 1080], // Phase-2 inbound reconcile + review-fix cycle: reconcile_inbound_persona_event // dispatches 30175/30176/30177 inbound plus kind:5 tombstone consume // (reconcile_inbound_tombstone), the two apply_inbound_* fns, the @@ -82,11 +90,12 @@ const overrides = new Map([ // queued to split with the list. The two `agents-data-changed` emits (live // UI refresh on inbound reconcile + tombstone) add the latest growth. ["src-tauri/src/commands/personas.rs", 1279], + ["src-tauri/src/managed_agents/persona_card.rs", 1050], // applyWorkspace reposDir parameter plus the validateReposDir binding, // threaded through Tauri invokes for configurable repos_dir, plus the // harness-persona-sync `harnessOverride` create-input bit β€” load-bearing - // parameter plumbing, not generic debt growth. Approved override; still - // queued to split. + // parameter plumbing, plus continued-agent-conversations client task-anchor + // tags on message sends. Approved override; still queued to split. ["src/shared/api/tauri.ts", 1235], // harness-persona-sync feature growth, queued to split in the resolver-unify // refactor followup. discovery.rs is dominated by the new test module @@ -95,7 +104,11 @@ const overrides = new Map([ // agents keep an installed runtime alias when the primary command is absent. // Load-bearing, not generic debt. // config-bridge: schema-driven field extraction adds ~26 lines. Queued to split. - ["src-tauri/src/managed_agents/discovery.rs", 1111], + // latest-main rebase adds the config-bridge and task-review fixes together. + ["src-tauri/src/managed_agents/discovery.rs", 1131], + // types.rs adds the persona/instance harness fields. Load-bearing, not + // generic debt. + ["src-tauri/src/managed_agents/types.rs", 1037], // migration_tests.rs carries the harness-sync migration coverage plus the // patch_json_records owner-only writeback regression test (SECURITY.md:90 // crash-safe 0o600 fallback). Load-bearing security + feature coverage, not @@ -115,6 +128,24 @@ const overrides = new Map([ // +135 for AgentInfoFocusedView/DiagnosticsFocusedView/ChannelsFocusedView // props restored after 826d735fe removal (UserProfilePanel.tsx still needs them). ["src/features/profile/ui/UserProfilePanelSections.tsx", 1140], + // useDueReminderBadgeCount hook call + sum to wire due-reminder count into + // the Inbox nav badge β€” a small overage from load-bearing badge plumbing, + // not generic debt growth. Approved override; still queued to split. + // continued-agent-conversations: persisted channel-scoped conversation state + // and route wiring. Queued to split with the rest of AppShell state. + ["src/app/AppShell.tsx", 1060], + // continued-agent-conversations: marker filtering, thread handoff, and + // activity handoff props live at the channel surface for now. + ["src/features/channels/ui/ChannelPane.tsx", 1107], + // continued-agent-conversations: channel task/message surface routing is + // threaded through the screen while the pane split follow-up is pending. + ["src/features/channels/ui/ChannelScreen.tsx", 1027], + // continued-agent-conversations: composer notice banner for read-only agent + // conversations. + ["src/features/messages/ui/MessageComposer.tsx", 1010], + // continued-agent-conversations: channel sidebar children and active + // conversation unread suppression. Queued to split with sidebar sections. + ["src/features/sidebar/ui/AppSidebar.tsx", 1081], // PersistBackend enum + marker-on-keyring-success plumbing and its three // fail-closed regression tests (silent identity rotation on keyring outage). // A small overage from load-bearing security plumbing on a file already at @@ -136,7 +167,8 @@ const overrides = new Map([ // catalog module; agent_models.rs retains the thin wrapper (~50 lines). // File still exceeds 1000 due to OpenAI/Anthropic discovery + subprocess // fallback. Queued to split into dedicated discovery modules. - ["src-tauri/src/commands/agent_models.rs", 1066], + // latest main rebase adds the provider fallback guard. + ["src-tauri/src/commands/agent_models.rs", 1068], ]); await runFileSizeCheck({ diff --git a/desktop/src-tauri/src/commands/agent_config.rs b/desktop/src-tauri/src/commands/agent_config.rs index 1983cfbd0..454336bdb 100644 --- a/desktop/src-tauri/src/commands/agent_config.rs +++ b/desktop/src-tauri/src/commands/agent_config.rs @@ -176,6 +176,7 @@ pub async fn get_agent_config_surface( record.persona_id.as_deref(), &personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let runtime_meta = known_acp_runtime(&effective_cmd); let session_cache = state.get_session_cache(&pubkey); diff --git a/desktop/src-tauri/src/commands/agent_models.rs b/desktop/src-tauri/src/commands/agent_models.rs index cef2e7115..faa08e3b9 100644 --- a/desktop/src-tauri/src/commands/agent_models.rs +++ b/desktop/src-tauri/src/commands/agent_models.rs @@ -66,6 +66,7 @@ pub async fn get_agent_models( record.persona_id.as_deref(), &personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let args = normalize_agent_args(&effective_command, record.agent_args.clone()); @@ -929,6 +930,7 @@ pub async fn update_managed_agent( record.persona_id.as_deref(), &personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let avatar_url = record .avatar_url diff --git a/desktop/src-tauri/src/commands/agent_profile_reconcile.rs b/desktop/src-tauri/src/commands/agent_profile_reconcile.rs new file mode 100644 index 000000000..516c2cd6a --- /dev/null +++ b/desktop/src-tauri/src/commands/agent_profile_reconcile.rs @@ -0,0 +1,32 @@ +use tauri::AppHandle; + +use crate::{app_state::AppState, managed_agents::load_managed_agents}; + +use super::agents::ProfileReconcileData; + +pub(super) fn refresh_auth_tag( + app: &AppHandle, + state: &AppState, + pubkey: &str, + data: &mut ProfileReconcileData, +) { + let result = (|| -> Result<(), String> { + let _store_guard = state + .managed_agents_store_lock + .lock() + .map_err(|error| error.to_string())?; + let records = load_managed_agents(app)?; + let record = records + .iter() + .find(|record| record.pubkey == pubkey) + .ok_or_else(|| format!("agent {pubkey} not found"))?; + data.auth_tag = record.auth_tag.clone(); + Ok(()) + })(); + + if let Err(error) = result { + eprintln!( + "buzz-desktop: profile reconciliation using pre-start auth tag for agent {pubkey}: {error}" + ); + } +} diff --git a/desktop/src-tauri/src/commands/agents.rs b/desktop/src-tauri/src/commands/agents.rs index 03e8a1786..ec3c78d93 100644 --- a/desktop/src-tauri/src/commands/agents.rs +++ b/desktop/src-tauri/src/commands/agents.rs @@ -241,6 +241,11 @@ async fn start_local_agent_with_preflight( ensure_relay_mesh_for_record(app, &record_snapshot, allow_fresh_create_start).await?; + let refreshed_auth_tag = { + let owner_keys = state.keys.lock().map_err(|e| e.to_string())?; + crate::managed_agents::managed_agent_auth_tag_for_owner(&owner_keys, pubkey)? + }; + let _store_guard = state .managed_agents_store_lock .lock() @@ -254,6 +259,14 @@ async fn start_local_agent_with_preflight( if record.backend != BackendKind::Local { return Err(format!("agent {pubkey} is no longer a local agent")); } + if !crate::managed_agents::auth_tag_matches_owner( + record.auth_tag.as_deref(), + &record.pubkey, + Some(owner_hex), + ) { + record.auth_tag = refreshed_auth_tag; + record.updated_at = crate::util::now_iso(); + } // Re-snapshot the persona onto the record at every spawn so the agent always // starts with the current persona config (system_prompt, model, provider, // env_vars). This clears the "out of date" drift badge without requiring a @@ -655,6 +668,7 @@ pub async fn create_managed_agent( requested_persona_id.as_deref(), &personas, agent_command_override.as_deref(), + None, ); let agent_args = normalize_agent_args( &agent_command, @@ -976,6 +990,10 @@ pub async fn start_managed_agent( // Snapshot the workspace owner pubkey for the legacy auth_tag fallback. // Read outside the records lock to keep lock ordering simple. let owner_hex = workspace_owner_hex(&state)?; + let refreshed_auth_tag = { + let owner_keys = state.keys.lock().map_err(|error| error.to_string())?; + crate::managed_agents::managed_agent_auth_tag_for_owner(&owner_keys, &pubkey)? + }; enum StartTarget { Local, Provider { @@ -986,8 +1004,7 @@ pub async fn start_managed_agent( } // Collect backend info under lock; async preflight/spawn happens below. - // Also snapshot profile reconciliation data for the background task. - let (target, reconcile_data) = { + let (target, mut reconcile_data) = { let _store_guard = state .managed_agents_store_lock .lock() @@ -1000,14 +1017,21 @@ pub async fn start_managed_agent( let (sync_changed, exited_pubkeys) = sync_managed_agent_processes(&mut records, &mut runtimes, ¤t_instance_id(&app)); - if sync_changed { - save_managed_agents(&app, &records)?; - } + let mut records_changed = sync_changed; for pubkey in &exited_pubkeys { state.clear_session_cache(pubkey); } let record = find_managed_agent_mut(&mut records, &pubkey)?; + if !crate::managed_agents::auth_tag_matches_owner( + record.auth_tag.as_deref(), + &record.pubkey, + Some(&owner_hex), + ) { + record.auth_tag = refreshed_auth_tag; + record.updated_at = crate::util::now_iso(); + records_changed = true; + } // Resolve the effective harness for the avatar-fallback derivation in // profile reconcile (the create-time snapshot may be empty or stale for @@ -1017,6 +1041,7 @@ pub async fn start_managed_agent( record.persona_id.as_deref(), &reconcile_personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let reconcile = ProfileReconcileData { @@ -1040,6 +1065,10 @@ pub async fn start_managed_agent( } }; + if records_changed { + save_managed_agents(&app, &records)?; + } + (target, reconcile) }; @@ -1088,9 +1117,10 @@ pub async fn start_managed_agent( // ── Profile reconciliation (fire-and-forget) ──────────────────────────── // On successful start, spawn a background task to ensure the agent's kind:0 // profile is published on the relay. This self-heals cases where the initial - // profile sync at creation time failed silently. For legacy records (pre-PR-921) - // with no persisted avatar, this also backfills the avatar from the relay. + // profile sync at creation time failed silently. if result.is_ok() { + use super::agent_profile_reconcile as reconcile; + reconcile::refresh_auth_tag(&app, &state, &pubkey, &mut reconcile_data); let reconcile_pubkey = pubkey.clone(); let reconcile_app = app.clone(); tauri::async_runtime::spawn(async move { diff --git a/desktop/src-tauri/src/commands/messages.rs b/desktop/src-tauri/src/commands/messages.rs index 2c8dac5a4..1b28a5f5f 100644 --- a/desktop/src-tauri/src/commands/messages.rs +++ b/desktop/src-tauri/src/commands/messages.rs @@ -283,6 +283,7 @@ pub async fn send_channel_message( media_tags: Option>>, emoji_tags: Option>>, mention_tags: Option>>, + client_tags: Option>>, mention_pubkeys: Option>, kind: Option, state: State<'_, AppState>, @@ -294,7 +295,11 @@ pub async fn send_channel_message( let media = media_tags.unwrap_or_default(); let emoji = emoji_tags.unwrap_or_default(); let mention_refs_only = mention_tags.unwrap_or_default(); + let client = client_tags.unwrap_or_default(); let kind_num = kind.unwrap_or(buzz_core_pkg::kind::KIND_STREAM_MESSAGE); + if kind_num != buzz_core_pkg::kind::KIND_STREAM_MESSAGE && !client.is_empty() { + return Err("client tags are only supported on stream messages".into()); + } let mut resolved_root: Option = None; @@ -330,7 +335,7 @@ pub async fn send_channel_message( } None => None, }; - events::build_message( + events::build_message_with_client_tags( channel_uuid, content.trim(), thread_ref.as_ref(), @@ -338,6 +343,7 @@ pub async fn send_channel_message( &media, &emoji, &mention_refs_only, + &client, )? } }; diff --git a/desktop/src-tauri/src/commands/mod.rs b/desktop/src-tauri/src/commands/mod.rs index 445fe2956..9959583a3 100644 --- a/desktop/src-tauri/src/commands/mod.rs +++ b/desktop/src-tauri/src/commands/mod.rs @@ -1,6 +1,7 @@ mod agent_config; mod agent_discovery; mod agent_models; +mod agent_profile_reconcile; mod agent_settings; mod agents; mod canvas; diff --git a/desktop/src-tauri/src/managed_agents/discovery.rs b/desktop/src-tauri/src/managed_agents/discovery.rs index 59c6dee99..d6322b863 100644 --- a/desktop/src-tauri/src/managed_agents/discovery.rs +++ b/desktop/src-tauri/src/managed_agents/discovery.rs @@ -279,11 +279,14 @@ pub fn default_agent_command() -> String { /// Resolution order: /// 1. explicit override (non-empty) β€” a deliberate per-instance pin; /// 2. the linked persona's `runtime` id mapped to its primary command; -/// 3. `default_agent_command()` β€” no persona/runtime, or persona deleted. +/// 3. the record's stored `agent_command` snapshot for legacy records whose +/// persona has no runtime field; +/// 4. `default_agent_command()` β€” no persona/runtime/snapshot, or persona deleted. pub fn effective_agent_command( persona_id: Option<&str>, personas: &[crate::managed_agents::types::PersonaRecord], agent_command_override: Option<&str>, + record_agent_command: Option<&str>, ) -> String { if let Some(pin) = agent_command_override .map(str::trim) @@ -298,6 +301,12 @@ pub fn effective_agent_command( .and_then(known_acp_runtime_exact) .and_then(|r| r.commands.first().copied()) .map(str::to_string) + .or_else(|| { + record_agent_command + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_string) + }) .unwrap_or_else(default_agent_command) } @@ -320,7 +329,7 @@ pub fn divergent_agent_command_override( let picked = picked_command .map(str::trim) .filter(|value| !value.is_empty())?; - let persona_command = effective_agent_command(persona_id, personas, None); + let persona_command = effective_agent_command(persona_id, personas, None, None); let same_runtime = match ( known_acp_runtime(picked), known_acp_runtime(&persona_command), @@ -373,7 +382,7 @@ pub fn create_time_agent_command_override( let picked = picked_command .map(str::trim) .filter(|value| !value.is_empty())?; - let inherited_command = effective_agent_command(persona_id, personas, None); + let inherited_command = effective_agent_command(persona_id, personas, None, None); return (picked != inherited_command).then(|| picked.to_string()); } @@ -927,7 +936,7 @@ mod tests { // An explicit pin beats the persona's runtime. let personas = vec![persona_with_runtime("p1", Some("claude"))]; assert_eq!( - effective_agent_command(Some("p1"), &personas, Some("codex-acp")), + effective_agent_command(Some("p1"), &personas, Some("codex-acp"), Some("goose")), "codex-acp" ); } @@ -937,7 +946,7 @@ mod tests { // No override β†’ persona runtime id maps to its primary command. let personas = vec![persona_with_runtime("p1", Some("claude"))]; assert_eq!( - effective_agent_command(Some("p1"), &personas, None), + effective_agent_command(Some("p1"), &personas, None, Some("goose")), "claude-agent-acp" ); } @@ -947,26 +956,37 @@ mod tests { // A blank/whitespace override is treated as "inherit", not a pin. let personas = vec![persona_with_runtime("p1", Some("goose"))]; assert_eq!( - effective_agent_command(Some("p1"), &personas, Some(" ")), + effective_agent_command(Some("p1"), &personas, Some(" "), Some("buzz-agent")), + "goose" + ); + } + + #[test] + fn effective_agent_command_falls_back_to_record_snapshot() { + // Legacy records may predate persona runtime fields. Preserve their + // stored harness instead of silently changing them to the new default. + let personas = vec![persona_with_runtime("p1", None)]; + assert_eq!( + effective_agent_command(Some("p1"), &personas, None, Some("goose")), "goose" ); } #[test] fn effective_agent_command_falls_back_to_default() { - // No override, no persona runtime, and a deleted persona all fall back - // to the bundled default. + // No override, no persona runtime, no record snapshot, and a deleted + // persona all fall back to the bundled default. let personas = vec![persona_with_runtime("p1", None)]; assert_eq!( - effective_agent_command(Some("p1"), &personas, None), + effective_agent_command(Some("p1"), &personas, None, None), default_agent_command() ); assert_eq!( - effective_agent_command(Some("gone"), &personas, None), + effective_agent_command(Some("gone"), &personas, None, None), default_agent_command() ); assert_eq!( - effective_agent_command(None, &personas, None), + effective_agent_command(None, &personas, None, None), default_agent_command() ); } diff --git a/desktop/src-tauri/src/managed_agents/restore.rs b/desktop/src-tauri/src/managed_agents/restore.rs index 85f539112..44216d663 100644 --- a/desktop/src-tauri/src/managed_agents/restore.rs +++ b/desktop/src-tauri/src/managed_agents/restore.rs @@ -197,15 +197,46 @@ pub async fn restore_managed_agents_on_launch( return Ok(()); } - // Snapshot the workspace owner pubkey once for the legacy auth_tag fallback. - // Read outside the per-agent spawn loop so all parallel spawns see the same - // value and we don't lock `state.keys` repeatedly. - let owner_hex: Option = state - .keys - .lock() - .map_err(|e| e.to_string()) - .ok() - .map(|k| k.public_key().to_hex()); + // Snapshot the workspace owner once and refresh each restored agent's + // NIP-OA auth tag when the app identity changed since the record was + // created. Without this, owner-only agents can start successfully but ignore + // the current user's messages because the harness resolves an old owner. + let (owner_hex, auth_tag_updates): (Option, Vec<(String, Option, String)>) = { + let owner_keys = state.keys.lock().map_err(|e| e.to_string())?; + let owner_hex = owner_keys.public_key().to_hex(); + let mut updates = Vec::new(); + for record in &mut agents_to_start { + if super::auth_tag_matches_owner( + record.auth_tag.as_deref(), + &record.pubkey, + Some(&owner_hex), + ) { + continue; + } + let refreshed_auth_tag = + super::managed_agent_auth_tag_for_owner(&owner_keys, &record.pubkey)?; + let updated_at = util::now_iso(); + record.auth_tag = refreshed_auth_tag.clone(); + record.updated_at = updated_at.clone(); + updates.push((record.pubkey.clone(), refreshed_auth_tag, updated_at)); + } + (Some(owner_hex), updates) + }; + + if !auth_tag_updates.is_empty() { + let _store_guard = state + .managed_agents_store_lock + .lock() + .map_err(|error| error.to_string())?; + let mut records = load_managed_agents(app)?; + for (pubkey, auth_tag, updated_at) in &auth_tag_updates { + if let Ok(record) = find_managed_agent_mut(&mut records, pubkey) { + record.auth_tag = auth_tag.clone(); + record.updated_at = updated_at.clone(); + } + } + save_managed_agents(app, &records)?; + } #[cfg(feature = "mesh-llm")] let agents_to_start = { @@ -310,6 +341,7 @@ pub async fn restore_managed_agents_on_launch( record.persona_id.as_deref(), &reconcile_personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); Some(( pubkey.clone(), diff --git a/desktop/src-tauri/src/managed_agents/runtime.rs b/desktop/src-tauri/src/managed_agents/runtime.rs index 585ffea26..c45284a0e 100644 --- a/desktop/src-tauri/src/managed_agents/runtime.rs +++ b/desktop/src-tauri/src/managed_agents/runtime.rs @@ -1517,6 +1517,7 @@ pub fn build_managed_agent_summary( record.persona_id.as_deref(), personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let effective_args = normalize_agent_args(&effective_command, record.agent_args.clone()); let effective_mcp_command = known_acp_runtime(&effective_command) @@ -1611,12 +1612,10 @@ pub(crate) fn build_respond_to_env( remove.push("BUZZ_ACP_RESPOND_TO_ALLOWLIST"); } - // Legacy fallback: agents created before NIP-OA lack `auth_tag`. Without - // it the harness can't resolve the owner, and owner-dependent gate modes - // would drop every event. Forwarding the workspace owner pubkey via - // BUZZ_ACP_AGENT_OWNER keeps those records functional. Modern records - // (`auth_tag = Some(...)`) use `BUZZ_AUTH_TAG` as before. - if record.auth_tag.is_none() { + // Fallback: if the record has no usable NIP-OA `auth_tag` for the current + // workspace owner, forward the owner pubkey explicitly. This covers both + // pre-NIP-OA records and records restored after the local identity changed. + if !auth_tag_matches_owner(record.auth_tag.as_deref(), &record.pubkey, owner_hex) { if let Some(owner) = owner_hex { set.push(("BUZZ_ACP_AGENT_OWNER", owner.to_string())); } else { @@ -1629,6 +1628,53 @@ pub(crate) fn build_respond_to_env( Ok((set, remove)) } +pub(crate) fn auth_tag_owner_hex(auth_tag: &str, agent_pubkey_hex: &str) -> Option { + let trimmed = auth_tag.trim(); + if trimmed.is_empty() { + return None; + } + let agent_pubkey = nostr::PublicKey::from_hex(agent_pubkey_hex).ok()?; + buzz_sdk_pkg::nip_oa::verify_auth_tag(trimmed, &agent_pubkey) + .ok() + .map(|owner| owner.to_hex().to_ascii_lowercase()) +} + +pub(crate) fn auth_tag_matches_owner( + auth_tag: Option<&str>, + agent_pubkey_hex: &str, + owner_hex: Option<&str>, +) -> bool { + let Some(tag) = auth_tag.map(str::trim).filter(|tag| !tag.is_empty()) else { + return false; + }; + match owner_hex { + Some(owner) => auth_tag_owner_hex(tag, agent_pubkey_hex) + .is_some_and(|tag_owner| tag_owner.eq_ignore_ascii_case(owner)), + None => true, + } +} + +pub(crate) fn managed_agent_auth_tag_for_owner( + owner_keys: &nostr::Keys, + agent_pubkey_hex: &str, +) -> Result, String> { + if owner_keys + .public_key() + .to_hex() + .eq_ignore_ascii_case(agent_pubkey_hex) + { + return Ok(None); + } + + let compat_owner = nostr::Keys::parse(&owner_keys.secret_key().to_secret_hex()) + .map_err(|error| format!("failed to bridge owner keys: {error}"))?; + let compat_agent = nostr::PublicKey::from_hex(agent_pubkey_hex) + .map_err(|error| format!("failed to bridge agent pubkey: {error}"))?; + buzz_sdk_pkg::nip_oa::compute_auth_tag(&compat_owner, &compat_agent, "") + .map(Some) + .map_err(|error| format!("failed to compute NIP-OA auth tag: {error}")) +} + /// Spawn an agent process without holding any locks on records or runtimes. /// Returns the child process and log path on success. The caller is responsible /// for updating `ManagedAgentRecord` fields and inserting into the runtimes map. @@ -1668,6 +1714,7 @@ pub fn spawn_agent_child( record.persona_id.as_deref(), &personas, record.agent_command_override.as_deref(), + Some(&record.agent_command), ); let agent_args = normalize_agent_args(&effective_command, record.agent_args.clone()); let resolved_acp_command = resolve_command(&record.acp_command) @@ -1822,7 +1869,13 @@ pub fn spawn_agent_child( command.env_remove("BUZZ_ACP_API_TOKEN"); command.env_remove("BUZZ_API_TOKEN"); - if let Some(ref auth_tag) = record.auth_tag { + let auth_tag_for_child = record + .auth_tag + .as_deref() + .map(str::trim) + .filter(|tag| !tag.is_empty()) + .filter(|tag| auth_tag_matches_owner(Some(tag), &record.pubkey, owner_hex)); + if let Some(auth_tag) = auth_tag_for_child { command.env("BUZZ_AUTH_TAG", auth_tag); } else { command.env_remove("BUZZ_AUTH_TAG"); diff --git a/desktop/src-tauri/src/managed_agents/runtime/tests.rs b/desktop/src-tauri/src/managed_agents/runtime/tests.rs index cc9857205..90e2ec1d8 100644 --- a/desktop/src-tauri/src/managed_agents/runtime/tests.rs +++ b/desktop/src-tauri/src/managed_agents/runtime/tests.rs @@ -159,8 +159,16 @@ fn fixture( #[test] fn build_env_owner_only_sets_mode_and_removes_others() { - let rec = fixture(RespondTo::OwnerOnly, vec![], Some("tag".into())); - let (set, remove) = build_respond_to_env(&rec, Some("owner")).unwrap(); + let owner = nostr::Keys::generate(); + let agent = nostr::Keys::generate(); + let agent_pubkey = agent.public_key().to_hex(); + let auth_tag = super::managed_agent_auth_tag_for_owner(&owner, &agent_pubkey) + .unwrap() + .unwrap(); + let mut rec = fixture(RespondTo::OwnerOnly, vec![], Some(auth_tag)); + rec.pubkey = agent_pubkey; + let owner_hex = owner.public_key().to_hex(); + let (set, remove) = build_respond_to_env(&rec, Some(&owner_hex)).unwrap(); let set_map: std::collections::HashMap<_, _> = set.into_iter().collect(); assert_eq!( set_map.get("BUZZ_ACP_RESPOND_TO").map(String::as_str), @@ -172,6 +180,27 @@ fn build_env_owner_only_sets_mode_and_removes_others() { assert!(remove.contains(&"BUZZ_ACP_AGENT_OWNER")); } +#[test] +fn build_env_stale_auth_tag_emits_current_owner() { + let previous_owner = nostr::Keys::generate(); + let current_owner = nostr::Keys::generate(); + let agent = nostr::Keys::generate(); + let agent_pubkey = agent.public_key().to_hex(); + let stale_auth_tag = super::managed_agent_auth_tag_for_owner(&previous_owner, &agent_pubkey) + .unwrap() + .unwrap(); + let mut rec = fixture(RespondTo::OwnerOnly, vec![], Some(stale_auth_tag)); + rec.pubkey = agent_pubkey; + let current_owner_hex = current_owner.public_key().to_hex(); + let (set, remove) = build_respond_to_env(&rec, Some(¤t_owner_hex)).unwrap(); + let set_map: std::collections::HashMap<_, _> = set.into_iter().collect(); + assert_eq!( + set_map.get("BUZZ_ACP_AGENT_OWNER").map(String::as_str), + Some(current_owner_hex.as_str()) + ); + assert!(!remove.contains(&"BUZZ_ACP_AGENT_OWNER")); +} + #[test] fn build_env_allowlist_sets_both_envs_and_joins() { let a = "a".repeat(64); diff --git a/desktop/src/shared/api/tauri.ts b/desktop/src/shared/api/tauri.ts index 805151e04..3d93b7a79 100644 --- a/desktop/src/shared/api/tauri.ts +++ b/desktop/src/shared/api/tauri.ts @@ -717,6 +717,7 @@ export async function sendChannelMessage( kind?: number, emojiTags?: string[][], mentionTags?: string[][], + clientTags?: string[][], ): Promise { const response = await invokeTauri( "send_channel_message", @@ -727,6 +728,7 @@ export async function sendChannelMessage( mediaTags: mediaTags ?? null, emojiTags: emojiTags ?? null, mentionTags: mentionTags ?? null, + clientTags: clientTags ?? null, mentionPubkeys: mentionPubkeys ?? null, kind: kind ?? null, }, diff --git a/desktop/src/shared/constants/kinds.ts b/desktop/src/shared/constants/kinds.ts index c5ea0a899..76129205f 100644 --- a/desktop/src/shared/constants/kinds.ts +++ b/desktop/src/shared/constants/kinds.ts @@ -6,7 +6,10 @@ export const KIND_STREAM_MESSAGE = 9; export const KIND_NIP29_DELETE_EVENT = 9005; export const KIND_STREAM_MESSAGE_V2 = 40002; export const KIND_STREAM_MESSAGE_EDIT = 40003; +export const KIND_STREAM_MESSAGE_PINNED = 40004; export const KIND_STREAM_MESSAGE_DIFF = 40008; +export const KIND_AGENT_CONVERSATION = 40010; +export const KIND_AGENT_CONVERSATION_COMPAT = KIND_STREAM_MESSAGE_PINNED; export const KIND_REMINDER = 40007; export const KIND_SYSTEM_MESSAGE = 40099; export const KIND_JOB_REQUEST = 43001; @@ -71,7 +74,9 @@ export const CHANNEL_EVENT_KINDS = [ ...CHANNEL_MESSAGE_EVENT_KINDS, 40001, // legacy: pre-migration stream messages KIND_STREAM_MESSAGE_EDIT, // 40003 β€” message edits + KIND_AGENT_CONVERSATION_COMPAT, // 40004 β€” staging-compatible focused agent conversation marker KIND_STREAM_MESSAGE_DIFF, // 40008 β€” message diffs + KIND_AGENT_CONVERSATION, // 40010 β€” focused agent conversation marker KIND_SYSTEM_MESSAGE, // 40099 β€” system messages (join, leave, etc.) KIND_HUDDLE_STARTED, // 48100 β€” visible huddle session card KIND_HUDDLE_PARTICIPANT_JOINED, // 48101 β€” huddle lifecycle overlay @@ -79,6 +84,13 @@ export const CHANNEL_EVENT_KINDS = [ KIND_HUDDLE_ENDED, // 48103 β€” huddle lifecycle overlay ] as const; +// Stored channel-scoped state that should be fetched with timeline history but +// should not render as a message row or count against unread message tallies. +export const CHANNEL_TIMELINE_STATE_KINDS = [ + KIND_AGENT_CONVERSATION_COMPAT, // 40004 β€” staging-compatible focused agent conversation marker + KIND_AGENT_CONVERSATION, // 40010 β€” focused agent conversation marker +] as const; + // Auxiliary (non-row) timeline kinds: events that overlay onto or hide an // existing message rather than rendering their own row β€” reactions, edits, and // deletions. History fetches request the visible content kinds only, so the diff --git a/mobile/lib/features/channels/channel_messages_provider.dart b/mobile/lib/features/channels/channel_messages_provider.dart index b659f9a0f..37cd237d8 100644 --- a/mobile/lib/features/channels/channel_messages_provider.dart +++ b/mobile/lib/features/channels/channel_messages_provider.dart @@ -152,8 +152,7 @@ class ChannelMessagesNotifier extends Notifier>> { content.contains('member_removed'); } - /// Kinds that are metadata rather than displayable content (deletions, - /// reactions, edits, legacy pre-migration messages). + /// reactions, edits, legacy pre-migration messages, task markers). static const _metadataKinds = { EventKind.deletion, EventKind.reaction, @@ -161,6 +160,8 @@ class ChannelMessagesNotifier extends Notifier>> { EventKind.streamMessageEdit, EventKind.huddleParticipantJoined, EventKind.huddleParticipantLeft, + EventKind.agentConversationCompat, + EventKind.agentConversation, }; /// Minimum displayable messages we want after the initial history load. diff --git a/mobile/lib/shared/relay/nostr_models.dart b/mobile/lib/shared/relay/nostr_models.dart index fd45a2133..c2cdcebef 100644 --- a/mobile/lib/shared/relay/nostr_models.dart +++ b/mobile/lib/shared/relay/nostr_models.dart @@ -22,7 +22,10 @@ abstract final class EventKind { static const dmVisibility = 30622; static const streamMessageV2 = 40002; static const streamMessageEdit = 40003; + static const streamMessagePinned = 40004; static const streamMessageDiff = 40008; + static const agentConversation = 40010; + static const agentConversationCompat = streamMessagePinned; static const systemMessage = 40099; static const forumPost = 45001; static const forumComment = 45003; @@ -48,7 +51,9 @@ abstract final class EventKind { ...channelMessageEventKinds, 40001, // legacy pre-migration stream messages streamMessageEdit, // 40003 + agentConversationCompat, // 40004 β€” staging-compatible task marker streamMessageDiff, // 40008 + agentConversation, // 40010 β€” task marker systemMessage, // 40099 huddleStarted, // 48100 β€” visible huddle session row huddleParticipantJoined, // 48101 β€” huddle lifecycle metadata