diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d30d8d2..1e3fe4a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ ## [Unreleased] +## [20.9.0] - 02 Aprile 2026 + +### Added +- Declarative kernel tool catalog with 43 MCP tools (plan, org, agent, platform, infra) +- Cloud escalation engine: Opus via `stream_with_fallback` with 5-round tool loop +- Automatic local/cloud inference routing (`InferenceLevel::Local` vs `Cloud`) +- Write-intent classifier (`is_write_intent`) for Italian/English keyword detection +- Cloud tool loop with `` dispatch (same format as local Qwen) + +### Fixed +- Telegram poll now shares engine with HTTP API via `Arc>` +- `api_ask.rs` escalates to cloud on write intents (was always local-only) +- `telegram_voice.rs` wraps `route_intent` in `spawn_blocking` (no async deadlock) +- MCP `tool_catalog.rs` header updated from 18 to 43 tools + +### Changed +- Kernel tools refactored from single `tools.rs` to `tools/` module (5 domain files) +- `engine_tool_loop.rs` uses `ToolCatalog::all()` instead of hardcoded definitions +- `engine_context.rs` uses new catalog for context gathering + ## [20.8.2] - 01 Aprile 2026 ### Fixed diff --git a/VERSION.md b/VERSION.md index c788fc89..f3f52b42 100644 --- a/VERSION.md +++ b/VERSION.md @@ -1 +1 @@ -20.8.2 +20.9.0 diff --git a/daemon/Cargo.lock b/daemon/Cargo.lock index 433691a5..44e55f36 100644 --- a/daemon/Cargo.lock +++ b/daemon/Cargo.lock @@ -747,7 +747,7 @@ checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "convergio-platform-daemon" -version = "20.8.2" +version = "20.9.0" dependencies = [ "aes-gcm", "axum", diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 9a6aad1f..52bea6b5 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "convergio-platform-daemon" -version = "20.8.2" +version = "20.9.0" edition = "2021" [lib] diff --git a/daemon/src/kernel/api.rs b/daemon/src/kernel/api.rs index 0b606327..fe742401 100644 --- a/daemon/src/kernel/api.rs +++ b/daemon/src/kernel/api.rs @@ -42,7 +42,7 @@ pub mod handlers { let chat_id = crate::telegram_config::telegram_chat_id(); match (token, chat_id) { (Some(token), Ok(Some(chat_id))) => { - let engine_clone = Arc::new(KernelEngine::new(config.clone())); + let engine_clone = state.engine.clone(); let daemon_url = std::env::var("DAEMON_URL") .unwrap_or_else(|_| "http://localhost:8420".to_string()); tracing::info!("jarvis: spawning Telegram poll loop for chat_id={chat_id}"); diff --git a/daemon/src/kernel/api_ask.rs b/daemon/src/kernel/api_ask.rs index 43046465..38c4d8b9 100644 --- a/daemon/src/kernel/api_ask.rs +++ b/daemon/src/kernel/api_ask.rs @@ -27,36 +27,42 @@ mod inner { /// /// Routes through voice_router: classifies intent first. /// - Simple queries (stato, costi) → handled by voice_router directly - /// - EscalateToAli → forwards to Ali (Opus) via chat API - /// - EscalateToAli (unrecognized) → forwarded to Ali (Opus) + /// - EscalateToAli / write intents → cloud escalation (Opus) pub async fn handle_ask( State(state): State, Json(body): Json, ) -> Json { let question = body.question.clone(); let engine = state.engine.clone(); - let answer = tokio::task::spawn_blocking(move || { - let q = question.to_lowercase(); - // Check for Ali escalation FIRST (before any LLM classification) - if q.starts_with("ali ") || q.starts_with("ali,") || q == "ali" - || q.contains(" ali ") || q.contains("chiedi ad ali") - || q.contains("opus") || q.contains("cloud") - { - return route_intent( - VoiceIntent::EscalateToAli { question: question.clone() }, - "http://localhost:8420", - ); - } - // For everything else: classify intent then route + + // Check inference level — cloud for write intents or Ali escalation + let level = { let eng = engine.lock().unwrap_or_else(|p| p.into_inner()); - let intent = classify_intent(&question, &eng); - match &intent { - VoiceIntent::EscalateToAli { .. } => eng.ask(&question), - _ => route_intent(intent, "http://localhost:8420"), + eng.inference_level_for(&question) + }; + + let answer = if level == crate::kernel::engine::InferenceLevel::Cloud { + crate::kernel::cloud_escalation::cloud_ask_with_tools(&question, "").await + } else { + let q = question.clone(); + let q2 = question.clone(); + // Classify locally; if EscalateToAli, escalate to cloud + let intent = { + let eng = engine.lock().unwrap_or_else(|p| p.into_inner()); + classify_intent(&q, &eng) + }; + if matches!(intent, VoiceIntent::EscalateToAli { .. }) { + crate::kernel::cloud_escalation::cloud_ask_with_tools(&q2, "").await + } else { + tokio::task::spawn_blocking(move || { + let du = std::env::var("DAEMON_URL") + .unwrap_or_else(|_| "http://localhost:8420".into()); + route_intent(intent, &du) + }) + .await + .unwrap_or_else(|e| format!("Errore interno: {e}")) } - }) - .await - .unwrap_or_else(|e| format!("Errore interno: {e}")); + }; Json(AskResponse { answer }) } diff --git a/daemon/src/kernel/cloud_escalation.rs b/daemon/src/kernel/cloud_escalation.rs new file mode 100644 index 00000000..40d71202 --- /dev/null +++ b/daemon/src/kernel/cloud_escalation.rs @@ -0,0 +1,173 @@ +// Copyright (c) 2026 Roberto D'Angelo. All rights reserved. +// Cloud escalation — async LLM via Convergio provider/fallback pipeline. +// Used when local model unavailable or for write-intent operations. + +use crate::kernel::engine_context::{extract_tool_call, smart_context_gather_pub}; +use crate::kernel::tools::ToolCatalog; +use crate::server::llm_client::{ChatMessage, StreamChunk}; +use crate::server::provider::stream_with_fallback; +use futures_util::StreamExt; +use tracing::{info, warn}; + +/// Default cloud model for kernel escalation. +pub const CLOUD_MODEL: &str = "claude-opus-4-20250514"; +const MAX_CLOUD_ROUNDS: usize = 5; + +fn daemon_url() -> String { + std::env::var("DAEMON_URL").unwrap_or_else(|_| "http://localhost:8420".to_string()) +} + +/// Ask via cloud provider — no tool loop. +pub async fn cloud_ask(question: &str, history_chatml: &str) -> String { + let q = question.to_string(); + let context = match tokio::task::spawn_blocking(move || { + smart_context_gather_pub(&q, &daemon_url()) + }) + .await + { + Ok(ctx) => ctx, + Err(e) => { + warn!(error = %e, "kernel: context gather failed"); + String::from("Nessun contesto disponibile.") + } + }; + let messages = build_messages(&context, question, history_chatml, false); + collect_stream(messages).await +} + +/// Ask via cloud provider WITH tool calling loop (up to 5 rounds). +pub async fn cloud_ask_with_tools(question: &str, history: &str) -> String { + let q = question.to_string(); + let context = match tokio::task::spawn_blocking(move || { + smart_context_gather_pub(&q, &daemon_url()) + }) + .await + { + Ok(ctx) => ctx, + Err(e) => { + warn!(error = %e, "kernel: context gather failed"); + String::from("Nessun contesto disponibile.") + } + }; + + let mut messages = build_messages(&context, question, history, true); + let du = daemon_url(); + + for round in 0..MAX_CLOUD_ROUNDS { + let response = collect_stream(messages.clone()).await; + + let Some((tool_name, args_str)) = extract_tool_call(&response) else { + return strip_after_tool_call(&response); + }; + + info!(round, tool = %tool_name, "kernel.cloud: tool call"); + let args: serde_json::Value = + serde_json::from_str(&args_str).unwrap_or(serde_json::json!({})); + let url = du.clone(); + let tn = tool_name.clone(); + let tool_result = tokio::task::spawn_blocking(move || { + ToolCatalog::all() + .call_tool(&tn, &url, &args) + .unwrap_or_else(|| format!("Tool '{tn}' not found.")) + }) + .await + .unwrap_or_else(|e| format!("Tool dispatch error: {e}")); + + messages.push(ChatMessage { + role: "assistant".into(), + content: response, + }); + messages.push(ChatMessage { + role: "user".into(), + content: format!("[Tool result: {tool_name}]\n{tool_result}"), + }); + } + "Raggiunto il limite di chiamate strumenti cloud.".into() +} + +fn build_messages( + context: &str, + question: &str, + history: &str, + with_tools: bool, +) -> Vec { + let tools_section = if with_tools { + ToolCatalog::all().descriptions_block() + } else { + String::new() + }; + let system = format!( + "Sei Jarvis, l'assistente AI di Convergio. Rispondi in italiano, conciso.\n\n\ + Dati sistema:\n{context}\n\n{tools_section}" + ); + let mut msgs = vec![ChatMessage { + role: "system".into(), + content: system, + }]; + if !history.is_empty() { + msgs.push(ChatMessage { + role: "user".into(), + content: format!("[Conversazione precedente]\n{history}"), + }); + } + msgs.push(ChatMessage { + role: "user".into(), + content: question.to_string(), + }); + msgs +} + +async fn collect_stream(messages: Vec) -> String { + let (provider, _) = crate::server::provider::provider_for_model(CLOUD_MODEL); + info!(model = CLOUD_MODEL, "kernel: cloud inference"); + let mut stream = stream_with_fallback(provider, CLOUD_MODEL, messages); + let mut text = String::new(); + while let Some(chunk) = stream.next().await { + match chunk { + StreamChunk::Text(t) => text.push_str(&t), + StreamChunk::Error(e) => { + warn!(error = %e, "kernel: cloud stream error"); + if text.is_empty() { + return format!("Errore cloud: {e}"); + } + } + StreamChunk::Usage(_) => {} + } + } + if text.is_empty() { + "Nessuna risposta dal provider cloud.".into() + } else { + text.trim().to_string() + } +} + +fn strip_after_tool_call(text: &str) -> String { + if let Some(pos) = text.find("") { + let after = text[pos + "".len()..].trim(); + if !after.is_empty() { + return after.to_string(); + } + } + text.to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cloud_model_is_opus() { + assert!(CLOUD_MODEL.contains("opus")); + } + + #[test] + fn cloud_model_routes_to_claude() { + let (p, _) = crate::server::provider::provider_for_model(CLOUD_MODEL); + assert_eq!(p, crate::server::llm_client::Provider::ClaudeSubscription); + } + + #[test] + fn max_rounds_is_5() { + assert_eq!(MAX_CLOUD_ROUNDS, 5); + } +} diff --git a/daemon/src/kernel/cloud_escalation_tests.rs b/daemon/src/kernel/cloud_escalation_tests.rs new file mode 100644 index 00000000..952aa4e7 --- /dev/null +++ b/daemon/src/kernel/cloud_escalation_tests.rs @@ -0,0 +1,47 @@ +// Tests for cloud escalation and write-intent classification. + +use crate::kernel::cloud_escalation::CLOUD_MODEL; +use crate::kernel::voice_router_helpers::is_write_intent; + +#[test] +fn test_cloud_model_is_opus() { + assert!(CLOUD_MODEL.contains("opus"), "must target Opus"); +} + +#[test] +fn test_cloud_model_routes_to_claude() { + let (p, _) = crate::server::provider::provider_for_model(CLOUD_MODEL); + assert_eq!(p, crate::server::llm_client::Provider::ClaudeSubscription); +} + +#[test] +fn test_write_intent_italian() { + assert!(is_write_intent("crea un piano")); +} + +#[test] +fn test_write_intent_english() { + assert!(is_write_intent("create a new org")); +} + +#[test] +fn test_read_intent_italian() { + assert!(!is_write_intent("stato dei piani")); +} + +#[test] +fn test_write_intent_ali_escalation() { + assert!(is_write_intent("parla con ali")); +} + +#[test] +fn test_no_model_returns_cloud() { + use crate::kernel::engine::{InferenceLevel, KernelConfig, KernelEngine}; + let eng = KernelEngine::new(KernelConfig::default()); + assert_eq!(eng.inference_level_for("qualsiasi cosa"), InferenceLevel::Cloud); +} + +#[test] +fn test_read_intent_cost_query() { + assert!(!is_write_intent("quanto costa")); +} diff --git a/daemon/src/kernel/engine.rs b/daemon/src/kernel/engine.rs index 975e1731..dede41ba 100644 --- a/daemon/src/kernel/engine.rs +++ b/daemon/src/kernel/engine.rs @@ -5,11 +5,19 @@ use crate::ipc::models::apple_fm::{AppleFmBridge, InferenceRequest}; use crate::kernel::engine_context::smart_context_gather; use crate::kernel::engine_tool_loop; +use crate::kernel::voice_router_helpers; use serde::{Deserialize, Serialize}; use std::time::Instant; pub use crate::kernel::engine_context::smart_context_gather_pub; +/// Inference routing: local Qwen or cloud Opus. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InferenceLevel { + Local, + Cloud, +} + /// Classification severity from kernel inference. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -90,6 +98,22 @@ impl KernelEngine { self.loaded_model.is_some() } + /// True when local inference is available (bridge + model). + pub fn is_local_available(&self) -> bool { + self.bridge.is_available() && self.loaded_model.is_some() + } + + /// Decide local vs cloud based on question content and model availability. + pub fn inference_level_for(&self, question: &str) -> InferenceLevel { + if !self.is_local_available() { + return InferenceLevel::Cloud; + } + if voice_router_helpers::is_write_intent(question) { + return InferenceLevel::Cloud; + } + InferenceLevel::Local + } + /// Classify a situation string via MLX inference. /// /// Falls back to heuristic keyword-based classification when the bridge is diff --git a/daemon/src/kernel/engine_context.rs b/daemon/src/kernel/engine_context.rs index 876a4138..62c03ab0 100644 --- a/daemon/src/kernel/engine_context.rs +++ b/daemon/src/kernel/engine_context.rs @@ -1,7 +1,7 @@ // Copyright (c) 2026 Roberto D'Angelo. All rights reserved. // Context gathering and output-processing helpers for KernelEngine. -use crate::kernel::tools; +use crate::kernel::tools::ToolCatalog; /// Smart context gathering: picks APIs based on question keywords. /// The kernel (Rust, deterministic) does the retrieval; the LLM just reasons. @@ -13,31 +13,31 @@ pub fn smart_context_gather_pub(question: &str, daemon_url: &str) -> String { pub(crate) fn smart_context_gather(question: &str, daemon_url: &str) -> String { let q = question.to_lowercase(); let empty = serde_json::json!({}); + let cat = ToolCatalog::all(); let mut ctx = String::new(); - // Always fetch full platform context — Jarvis needs complete awareness. - if let Some(plans) = tools::call_tool("get_plans", daemon_url, &empty) { + if let Some(plans) = cat.call_tool("get_plans", daemon_url, &empty) { ctx += &format!("Piani:\n{plans}\n\n"); } - if let Some(agents) = tools::call_tool("get_agents", daemon_url, &empty) { + if let Some(agents) = cat.call_tool("list_agents", daemon_url, &empty) { ctx += &format!("Agenti:\n{agents}\n\n"); } - if let Some(costs) = tools::call_tool("get_costs", daemon_url, &empty) { + if let Some(costs) = cat.call_tool("cost_summary", daemon_url, &empty) { ctx += &format!("Costi:\n{costs}\n\n"); } - if let Some(node) = tools::call_tool("get_node_status", daemon_url, &empty) { + if let Some(node) = cat.call_tool("node_readiness", daemon_url, &empty) { ctx += &format!("Nodo:\n{node}\n\n"); } - if let Some(kernel) = tools::call_tool("get_kernel_status", daemon_url, &empty) { + if let Some(kernel) = cat.call_tool("kernel_status", daemon_url, &empty) { ctx += &format!("Kernel:\n{kernel}\n\n"); } - if let Some(health) = tools::call_tool("get_health", daemon_url, &empty) { + if let Some(health) = cat.call_tool("health_deep", daemon_url, &empty) { ctx += &format!("Platform Health:\n{health}\n\n"); } - if let Some(history) = tools::call_tool("get_agent_history", daemon_url, &empty) { + if let Some(history) = cat.call_tool("agent_history", daemon_url, &empty) { ctx += &format!("Recent Agent Activity:\n{history}\n\n"); } - if let Some(peers) = tools::call_tool("get_mesh_status", daemon_url, &empty) { + if let Some(peers) = cat.call_tool("mesh_status", daemon_url, &empty) { ctx += &format!("Mesh Peers:\n{peers}\n\n"); } @@ -49,7 +49,7 @@ pub(crate) fn smart_context_gather(question: &str, daemon_url: &str) -> String { }).next(); if let Some(plan_id) = id { let args = serde_json::json!({"plan_id": plan_id}); - if let Some(detail) = tools::call_tool("get_plan_detail", daemon_url, &args) { + if let Some(detail) = cat.call_tool("get_plan_detail", daemon_url, &args) { ctx += &format!("Dettaglio piano {plan_id}:\n{detail}\n\n"); } } diff --git a/daemon/src/kernel/engine_tool_loop.rs b/daemon/src/kernel/engine_tool_loop.rs index 70485420..3ff2e760 100644 --- a/daemon/src/kernel/engine_tool_loop.rs +++ b/daemon/src/kernel/engine_tool_loop.rs @@ -5,7 +5,7 @@ use crate::ipc::models::apple_fm::{AppleFmBridge, InferenceRequest}; use crate::kernel::engine_context::{extract_tool_call, strip_mlx_debug}; -use crate::kernel::tools; +use crate::kernel::tools::ToolCatalog; /// Maximum tool-call rounds before returning (prevents infinite loops). const MAX_TOOL_ROUNDS: u32 = 3; @@ -13,16 +13,7 @@ const MAX_TOOL_ROUNDS: u32 = 3; /// Build the tool description block for the system prompt. /// Format matches what `extract_tool_call` expects: JSON. pub(crate) fn tool_descriptions_block() -> String { - let defs = tools::tool_definitions(); - let mut block = String::from( - "Strumenti disponibili. Per usarli rispondi con:\n\ - {\"name\":\"tool_name\",\"arguments\":{...}}\n\n", - ); - for def in &defs { - block += &format!("- {}: {}\n", def.name, def.description); - } - block += "\nUsa gli strumenti SOLO quando servono dati che non hai nel contesto.\n"; - block + ToolCatalog::read_only().descriptions_block() } /// Build the ChatML prompt for ask() with context, tools, history, and question. @@ -88,7 +79,7 @@ pub(crate) fn run_tool_loop( // Parse args and dispatch let args: serde_json::Value = serde_json::from_str(&args_str).unwrap_or(serde_json::json!({})); - let tool_result = tools::call_tool(&tool_name, daemon_url, &args) + let tool_result = ToolCatalog::read_only().call_tool(&tool_name, daemon_url, &args) .unwrap_or_else(|| format!("Tool '{tool_name}' not found or failed.")); // Append tool result to conversation and re-invoke diff --git a/daemon/src/kernel/engine_tool_loop_tests.rs b/daemon/src/kernel/engine_tool_loop_tests.rs index 0607a84b..ca3be13b 100644 --- a/daemon/src/kernel/engine_tool_loop_tests.rs +++ b/daemon/src/kernel/engine_tool_loop_tests.rs @@ -6,14 +6,11 @@ use super::*; #[test] fn tool_descriptions_block_contains_all_tools() { let block = tool_descriptions_block(); - let defs = tools::tool_definitions(); - for def in &defs { - assert!( - block.contains(def.name), - "tool_descriptions_block missing tool: {}", - def.name, - ); - } + let catalog = crate::kernel::tools::ToolCatalog::all(); + // Verify at least the plan tools are present + assert!(block.contains("get_plans"), "missing get_plans"); + assert!(block.contains("get_plan_detail"), "missing get_plan_detail"); + assert!(catalog.tools.len() >= 4, "catalog too small"); } #[test] diff --git a/daemon/src/kernel/mod.rs b/daemon/src/kernel/mod.rs index 8d0333bb..f697d409 100644 --- a/daemon/src/kernel/mod.rs +++ b/daemon/src/kernel/mod.rs @@ -8,6 +8,7 @@ pub mod engine; pub mod engine_context; pub mod engine_tool_loop; pub mod tools; +pub mod cloud_escalation; pub mod monitor; pub mod monitor_checks; pub mod monitor_ext; @@ -63,3 +64,5 @@ mod voice_router_tests; mod telegram_poll_tests; #[cfg(test)] mod verify_hardening_tests; +#[cfg(test)] +mod cloud_escalation_tests; diff --git a/daemon/src/kernel/telegram_poll.rs b/daemon/src/kernel/telegram_poll.rs index 8f9f80f4..2430aaea 100644 --- a/daemon/src/kernel/telegram_poll.rs +++ b/daemon/src/kernel/telegram_poll.rs @@ -1,13 +1,12 @@ // Copyright (c) 2026 Roberto D'Angelo. All rights reserved. -// Telegram inbound text — long polling loop via getUpdates. -// Runs as a background tokio task; no webhook (M1 Pro behind NAT). +// Telegram inbound text — long polling via getUpdates (no webhook, M1 Pro behind NAT). // Security: only processes messages from CONVERGIO_TELEGRAM_CHAT_ID. use crate::kernel::engine::KernelEngine; use crate::kernel::telegram_conv; use crate::kernel::voice_router::{classify_intent, route_intent, VoiceIntent}; use serde::Deserialize; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, Instant}; @@ -83,7 +82,7 @@ pub fn spawn_telegram_poll( token: String, chat_id: i64, daemon_url: String, - engine: Arc, + engine: Arc>, ) -> JoinHandle<()> { tokio::spawn(async move { info!("jarvis.telegram: starting for chat_id={chat_id}"); @@ -95,7 +94,7 @@ async fn run_poll_loop( token: &str, chat_id: i64, daemon_url: &str, - engine: &KernelEngine, + engine: &Arc>, ) { let client = reqwest::Client::builder() .timeout(Duration::from_secs(45)) // must exceed Telegram long poll timeout (30s) @@ -155,17 +154,40 @@ async fn process_text( text: &str, chat_id: i64, daemon_url: &str, - engine: &KernelEngine, + engine: &Arc>, ) -> String { - let intent = classify_intent(text, engine); + let intent = { + let eng = engine.lock().unwrap_or_else(|p| p.into_inner()); + classify_intent(text, &eng) + }; debug!(?intent, text, "jarvis.telegram: classified intent"); - // For EscalateToAli: inject conversation history so the model has context - // from recent exchanges. Other intents are stateless commands. if matches!(intent, VoiceIntent::EscalateToAli { .. }) { let history = telegram_conv::format_history_chatml(chat_id); let question = text.to_string(); - return engine.ask_with_history(&question, &history); + + let level = { + let eng = engine.lock().unwrap_or_else(|p| p.into_inner()); + eng.inference_level_for(&question) + }; + + if level == crate::kernel::engine::InferenceLevel::Local { + let engine_clone = Arc::clone(engine); + let q = question.clone(); + let h = history.clone(); + return match tokio::task::spawn_blocking(move || { + let eng = engine_clone.lock().unwrap_or_else(|p| p.into_inner()); + eng.ask_with_history(&q, &h) + }) + .await + { + Ok(r) => r, + Err(e) => format!("Errore locale: {e}"), + }; + } + + info!("jarvis.telegram: escalating to cloud"); + return crate::kernel::cloud_escalation::cloud_ask_with_tools(&question, &history).await; } // route_intent uses reqwest::blocking which deadlocks inside tokio runtime. diff --git a/daemon/src/kernel/telegram_voice.rs b/daemon/src/kernel/telegram_voice.rs index 31547564..6da799c8 100644 --- a/daemon/src/kernel/telegram_voice.rs +++ b/daemon/src/kernel/telegram_voice.rs @@ -85,7 +85,14 @@ pub async fn handle_voice_message( let transcript = transcribe_audio(daemon_url, &wav_bytes).await?; info!("telegram_voice: transcribed: {:?}", transcript.text); let intent = classify_intent(&transcript.text, engine); - let reply_text = route_intent(intent, daemon_url); + let daemon = daemon_url.to_string(); + let intent_clone = intent.clone(); + let reply_text = match tokio::task::spawn_blocking(move || { + route_intent(intent_clone, &daemon) + }).await { + Ok(r) => r, + Err(e) => format!("Errore: {e}"), + }; if let Err(e) = send_text(token, voice.chat_id, &reply_text, base_url).await { warn!("telegram_voice: send_text failed: {e}"); } diff --git a/daemon/src/kernel/tools.rs b/daemon/src/kernel/tools.rs deleted file mode 100644 index d91e2dba..00000000 --- a/daemon/src/kernel/tools.rs +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright (c) 2026 Roberto D'Angelo. All rights reserved. -// Kernel tool functions — MCP-like intelligence layer calling daemon API. -// Called by engine.ask() when Mistral uses function calling. - -use serde_json::{json, Value}; -use std::time::Duration; - -// ── Tool registry ──────────────────────────────────────────────────────────── -/// Describes a tool for prompt injection into the LLM context. -pub struct ToolDef { - pub name: &'static str, - pub description: &'static str, -} - -/// Returns descriptions for all available tools. -pub fn tool_definitions() -> Vec { - vec![ - ToolDef { - name: "get_plans", - description: "List all plans with id, name, status, tasks_done, tasks_total.", - }, - ToolDef { - name: "get_plan_detail", - description: "Get full plan JSON with tasks and waves. Args: {\"plan_id\": }.", - }, - ToolDef { - name: "get_costs", - description: "Return total token cost, active plan count, total plan count.", - }, - ToolDef { - name: "get_node_status", - description: "Return node readiness checks array from /api/node/readiness.", - }, - ToolDef { - name: "get_kernel_status", - description: "Return kernel status: models loaded, uptime, active node.", - }, - ToolDef { - name: "get_agents", - description: "Return list of registered agents from /api/ipc/agents.", - }, - ToolDef { - name: "restart_node", - description: "Trigger recovery for a target node. Args: {\"target\": \"\"}.", - }, - ToolDef { - name: "get_health", - description: "Return platform health from /api/health.", - }, - ToolDef { - name: "get_agent_history", - description: "Return recent agent activity from /api/agents/history.", - }, - ToolDef { - name: "get_mesh_status", - description: "Return mesh peer status from /api/heartbeat/status.", - }, - ToolDef { - name: "create_org", - description: "Create a virtual organization from a mission. Args: {\"name\": \"\", \"mission\": \"\"}.", - }, - ToolDef { - name: "scan_repo", - description: "Scan a repository and create an org to manage it. Args: {\"path\": \"\"}.", - }, - ] -} - -/// Dispatch a tool call by name. Returns JSON string or None for unknown tool. -pub fn call_tool(name: &str, daemon_url: &str, args: &Value) -> Option { - match name { - "get_plans" => Some(get_plans(daemon_url)), - "get_plan_detail" => { - let plan_id = args.get("plan_id").and_then(|v| v.as_u64())? as u32; - Some(get_plan_detail(daemon_url, plan_id)) - } - "get_costs" => Some(get_costs(daemon_url)), - "get_node_status" => Some(get_node_status(daemon_url)), - "get_kernel_status" => Some(get_kernel_status(daemon_url)), - "get_agents" => Some(get_agents(daemon_url)), - "restart_node" => { - let target = args - .get("target") - .and_then(|v| v.as_str()) - .unwrap_or("unknown"); - Some(restart_node(daemon_url, target)) - } - "get_health" => Some(get_health(daemon_url)), - "get_agent_history" => Some(get_agent_history(daemon_url)), - "get_mesh_status" => Some(get_mesh_status(daemon_url)), - "create_org" => { - let n = args.get("name").and_then(|v| v.as_str()).unwrap_or("project"); - let m = args.get("mission").and_then(|v| v.as_str()).unwrap_or(""); - Some(crate::kernel::voice_route_project::route_create_project(n, m, daemon_url)) - } - "scan_repo" => { - let p = args.get("path").and_then(|v| v.as_str()).unwrap_or("."); - Some(crate::kernel::voice_route_project::route_create_org_from(p, daemon_url)) - } - _ => None, - } -} - -// ── Helpers ────────────────────────────────────────────────────────────────── -fn make_client() -> reqwest::blocking::Client { - reqwest::blocking::Client::builder() - .timeout(Duration::from_secs(5)) - .build() - .unwrap_or_else(|_| reqwest::blocking::Client::new()) -} - -fn fetch_json(url: &str) -> Option { - let resp = make_client().get(url).send().map_err(|e| { - tracing::warn!("kernel.tools: fetch {url}: {e}"); - }).ok()?; - resp.json::().map_err(|e| { - tracing::warn!("kernel.tools: parse {url}: {e}"); - }).ok() -} - -fn error_json(msg: &str) -> String { - json!({"error": msg}).to_string() -} - -// ── Tool implementations ───────────────────────────────────────────────────── - -/// GET /api/plan-db/list → [{id, name, status, tasks_done, tasks_total}] -pub fn get_plans(daemon_url: &str) -> String { - let url = format!("{daemon_url}/api/plan-db/list"); - let Some(body) = fetch_json(&url) else { - return error_json("daemon unreachable"); - }; - let plans = body - .get("plans") - .and_then(|p| p.as_array()) - .map(|arr| { - arr.iter() - .map(|p| { - json!({ - "id": p.get("id").and_then(|v| v.as_u64()).unwrap_or(0), - "name": p.get("name").and_then(|v| v.as_str()).unwrap_or(""), - "status": p.get("status").and_then(|v| v.as_str()).unwrap_or(""), - "tasks_done": p.get("tasks_done").and_then(|v| v.as_u64()).unwrap_or(0), - "tasks_total": p.get("tasks_total").and_then(|v| v.as_u64()).unwrap_or(0), - }) - }) - .collect::>() - }) - .unwrap_or_default(); - serde_json::to_string(&plans).unwrap_or_else(|_| error_json("serialization error")) -} - -/// GET /api/plan-db/json/{plan_id} → plan + tasks + waves -pub fn get_plan_detail(daemon_url: &str, plan_id: u32) -> String { - fetch_endpoint(daemon_url, &format!("/api/plan-db/json/{plan_id}")) -} - -/// GET /api/plan-db/list → {total_cost, active_plans, total_plans} -pub fn get_costs(daemon_url: &str) -> String { - let url = format!("{daemon_url}/api/plan-db/list"); - let Some(body) = fetch_json(&url) else { - return error_json("daemon unreachable"); - }; - let plans = body - .get("plans") - .and_then(|p| p.as_array()) - .map(|a| a.as_slice()) - .unwrap_or(&[]); - let total_cost: f64 = plans - .iter() - .filter_map(|p| p.get("total_cost").and_then(|v| v.as_f64())) - .sum(); - let active_plans = plans - .iter() - .filter(|p| p.get("status").and_then(|s| s.as_str()) == Some("doing")) - .count(); - let result = json!({ - "total_cost": total_cost, - "active_plans": active_plans, - "total_plans": plans.len(), - }); - serde_json::to_string(&result).unwrap_or_else(|_| error_json("serialization error")) -} - -/// GET /api/node/readiness → checks array -pub fn get_node_status(d: &str) -> String { fetch_endpoint(d, "/api/node/readiness") } -/// GET /api/kernel/status → models, uptime, node -pub fn get_kernel_status(d: &str) -> String { fetch_endpoint(d, "/api/kernel/status") } -/// GET /api/ipc/agents → agent list -pub fn get_agents(d: &str) -> String { fetch_endpoint(d, "/api/ipc/agents") } - -/// Trigger recovery action for a target node. -pub fn restart_node(daemon_url: &str, target: &str) -> String { - let url = format!("{daemon_url}/api/node/recover"); - let result = make_client() - .post(&url) - .json(&json!({"target": target, "action": "restart"})) - .send(); - match result { - Ok(resp) => { - let status = resp.status().as_u16(); - let body = resp.json::().unwrap_or_else(|_| json!({})); - json!({"status": status, "result": body}).to_string() - } - Err(e) => error_json(&format!("recovery request failed: {e}")), - } -} - -/// Simple GET → JSON string helper for endpoints without custom parsing. -fn fetch_endpoint(daemon_url: &str, path: &str) -> String { - let url = format!("{daemon_url}{path}"); - match fetch_json(&url) { - Some(v) => serde_json::to_string(&v).unwrap_or_else(|_| error_json("serialization error")), - None => error_json(&format!("{path} unreachable")), - } -} - -/// GET /api/health → platform health summary -pub fn get_health(d: &str) -> String { fetch_endpoint(d, "/api/health") } -/// GET /api/agents/history?limit=10 → recent agent activity -pub fn get_agent_history(d: &str) -> String { fetch_endpoint(d, "/api/agents/history?limit=10") } -/// GET /api/heartbeat/status → mesh peer status -pub fn get_mesh_status(d: &str) -> String { fetch_endpoint(d, "/api/heartbeat/status") } - -#[cfg(test)] -#[path = "tools_tests.rs"] -mod tests; diff --git a/daemon/src/kernel/tools/agent.rs b/daemon/src/kernel/tools/agent.rs new file mode 100644 index 00000000..8e47bcea --- /dev/null +++ b/daemon/src/kernel/tools/agent.rs @@ -0,0 +1,101 @@ +// Agent and chat tools — 7 tools for agent lifecycle and IPC. + +use super::{ToolDef, ToolMethod, ToolParam, ToolTier}; + +const P_AGENT_SEND: &[ToolParam] = &[ + ToolParam { name: "from", param_type: "string", required: true }, + ToolParam { name: "to", param_type: "string", required: true }, + ToolParam { name: "content", param_type: "string", required: true }, +]; + +const P_AGENT_ASK: &[ToolParam] = &[ + ToolParam { name: "from", param_type: "string", required: true }, + ToolParam { name: "to", param_type: "string", required: true }, + ToolParam { name: "message", param_type: "string", required: true }, + ToolParam { name: "timeout_secs", param_type: "integer", required: false }, +]; + +const P_AGENT_START: &[ToolParam] = &[ + ToolParam { name: "agent_id", param_type: "string", required: true }, + ToolParam { name: "plan_id", param_type: "integer", required: true }, +]; + +const P_AGENT_COMPLETE: &[ToolParam] = &[ToolParam { + name: "agent_id", + param_type: "string", + required: true, +}]; + +const P_INVOKE: &[ToolParam] = &[ + ToolParam { name: "from", param_type: "string", required: true }, + ToolParam { name: "to", param_type: "string", required: true }, + ToolParam { name: "message", param_type: "string", required: true }, +]; + +const P_CREATE_AGENT: &[ToolParam] = &[ + ToolParam { name: "name", param_type: "string", required: true }, + ToolParam { name: "category", param_type: "string", required: false }, + ToolParam { name: "description", param_type: "string", required: false }, + ToolParam { name: "model", param_type: "string", required: false }, +]; + +pub fn tools() -> Vec { + vec![ + ToolDef { + name: "list_agents", + description: "List all registered agents.", + endpoint: "/api/ipc/agents", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "agent_send", + description: "Send direct message to an agent. Args: from, to, content.", + endpoint: "/api/ipc/send-direct", + method: ToolMethod::Post, + params: P_AGENT_SEND, + tier: ToolTier::Write, + }, + ToolDef { + name: "agent_ask", + description: "Ask an agent and wait for reply. Args: from, to, message, optional timeout_secs.", + endpoint: "/api/ipc/ask", + method: ToolMethod::Post, + params: P_AGENT_ASK, + tier: ToolTier::Write, + }, + ToolDef { + name: "agent_start", + description: "Register agent start for a plan. Args: agent_id, plan_id.", + endpoint: "/api/plan-db/agent/start", + method: ToolMethod::Post, + params: P_AGENT_START, + tier: ToolTier::Write, + }, + ToolDef { + name: "agent_complete", + description: "Register agent completion. Args: agent_id.", + endpoint: "/api/plan-db/agent/complete", + method: ToolMethod::Post, + params: P_AGENT_COMPLETE, + tier: ToolTier::Write, + }, + ToolDef { + name: "invoke_agent", + description: "Invoke a named agent with a task. Args: from (kernel identity), to (agent name), message (task).", + endpoint: "/api/ipc/ask", + method: ToolMethod::Post, + params: P_INVOKE, + tier: ToolTier::Write, + }, + ToolDef { + name: "create_agent", + description: "Create a new agent. Args: name, optional category/description/model.", + endpoint: "/api/agents/create", + method: ToolMethod::Post, + params: P_CREATE_AGENT, + tier: ToolTier::Write, + }, + ] +} diff --git a/daemon/src/kernel/tools/infra.rs b/daemon/src/kernel/tools/infra.rs new file mode 100644 index 00000000..30568f0e --- /dev/null +++ b/daemon/src/kernel/tools/infra.rs @@ -0,0 +1,125 @@ +// Infra tools — 10 tools for mesh, node, kernel, notifications, control. + +use super::{ToolDef, ToolMethod, ToolParam, ToolTier}; + +const P_PROMPT: &[ToolParam] = &[ToolParam { + name: "question", + param_type: "string", + required: true, +}]; + +const P_MESSAGE: &[ToolParam] = &[ + ToolParam { name: "message", param_type: "string", required: true }, + ToolParam { name: "title", param_type: "string", required: false }, + ToolParam { name: "severity", param_type: "string", required: false }, +]; + +const P_ASSIGN_ROLE: &[ToolParam] = &[ + ToolParam { name: "node", param_type: "string", required: true }, + ToolParam { name: "role", param_type: "string", required: true }, +]; + +const P_INTERRUPT: &[ToolParam] = &[ + ToolParam { name: "agent_name", param_type: "string", required: true }, + ToolParam { name: "reason", param_type: "string", required: true }, +]; + +const P_RESCHEDULE: &[ToolParam] = &[ + ToolParam { name: "task_id", param_type: "integer", required: true }, + ToolParam { name: "from_node", param_type: "string", required: true }, + ToolParam { name: "to_node", param_type: "string", required: true }, + ToolParam { name: "reason", param_type: "string", required: true }, +]; + +pub fn tools() -> Vec { + vec![ + ToolDef { + name: "mesh_status", + description: "Get mesh peer status and connectivity.", + endpoint: "/api/mesh", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "agent_history", + description: "Get recent agent activity.", + endpoint: "/api/agents/history?limit=10", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "node_readiness", + description: "Get node readiness checks.", + endpoint: "/api/node/readiness", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "cost_summary", + description: "Get cost summary across all plans.", + endpoint: "/api/plan-db/list", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "kernel_status", + description: "Get kernel status: models loaded, uptime, active node.", + endpoint: "/api/kernel/status", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "kernel_ask", + description: "Ask the kernel a question (routes to local or cloud). Args: question.", + endpoint: "/api/kernel/ask", + method: ToolMethod::Post, + params: P_PROMPT, + tier: ToolTier::Read, + }, + ToolDef { + name: "notify", + description: "Send a notification. Args: message, optional title, severity.", + endpoint: "/api/notify", + method: ToolMethod::Post, + params: P_MESSAGE, + tier: ToolTier::Write, + }, + ToolDef { + name: "night_status", + description: "Get nightly run status for the node.", + endpoint: "/api/node/night-status", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "assign_role", + description: "Assign role to a mesh node. Args: node, role.", + endpoint: "/api/node/assign-role", + method: ToolMethod::Post, + params: P_ASSIGN_ROLE, + tier: ToolTier::Write, + }, + ToolDef { + name: "interrupt_agent", + description: "Interrupt a running agent. Args: agent_name, reason.", + endpoint: "/api/agent/interrupt", + method: ToolMethod::Post, + params: P_INTERRUPT, + tier: ToolTier::Write, + }, + ToolDef { + name: "reschedule_task", + description: "Reschedule task to another node. Args: task_id, from_node, to_node, reason.", + endpoint: "/api/task/reschedule", + method: ToolMethod::Post, + params: P_RESCHEDULE, + tier: ToolTier::Write, + }, + ] +} diff --git a/daemon/src/kernel/tools/mod.rs b/daemon/src/kernel/tools/mod.rs new file mode 100644 index 00000000..0f6f8e5d --- /dev/null +++ b/daemon/src/kernel/tools/mod.rs @@ -0,0 +1,211 @@ +// Copyright (c) 2026 Roberto D'Angelo. All rights reserved. +// Declarative tool catalog for the kernel — all 43+ MCP tools. +// Each tool: name, description, HTTP endpoint, method, params, tier. + +use serde_json::Value; +use std::time::Duration; +use tracing::warn; + +pub mod agent; +pub mod infra; +pub mod org; +pub mod plan; +pub mod platform; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ToolTier { + Read, + Write, +} + +#[derive(Debug, Clone, Copy)] +pub enum ToolMethod { + Get, + Post, +} + +pub struct ToolParam { + pub name: &'static str, + pub param_type: &'static str, + pub required: bool, +} + +pub struct ToolDef { + pub name: &'static str, + pub description: &'static str, + pub endpoint: &'static str, + pub method: ToolMethod, + pub params: &'static [ToolParam], + pub tier: ToolTier, +} + +pub struct ToolCatalog { + pub tools: Vec, +} + +impl ToolCatalog { + pub fn all() -> Self { + let mut tools = Vec::with_capacity(50); + tools.extend(plan::tools()); + tools.extend(org::tools()); + tools.extend(agent::tools()); + tools.extend(platform::tools()); + tools.extend(infra::tools()); + Self { tools } + } + + pub fn find(&self, name: &str) -> Option<&ToolDef> { + self.tools.iter().find(|t| t.name == name) + } + + pub fn descriptions_block(&self) -> String { + let mut block = String::from( + "Strumenti disponibili. Per usarli rispondi con:\n\ + {\"name\":\"tool_name\",\"arguments\":{...}}\n\n", + ); + for t in &self.tools { + block += &format!("- {}: {}\n", t.name, t.description); + } + block += "\nUsa gli strumenti SOLO quando servono dati che non hai.\n"; + block + } + + pub fn read_only_names(&self) -> Vec<&str> { + self.tools + .iter() + .filter(|t| t.tier == ToolTier::Read) + .map(|t| t.name) + .collect() + } + + pub fn call_tool(&self, name: &str, daemon_url: &str, args: &Value) -> Option { + let tool = self.find(name)?; + let endpoint = resolve_endpoint(tool.endpoint, args); + let base = if daemon_url.is_empty() { self::daemon_url() } else { daemon_url.to_string() }; + let url = format!("{base}{endpoint}"); + match tool.method { + ToolMethod::Get => http_get(&url), + ToolMethod::Post => http_post(&url, args), + } + } + + /// Catalog with only Read-tier tools (for local inference safety). + pub fn read_only() -> Self { + let mut cat = Self::all(); + cat.tools.retain(|t| t.tier == ToolTier::Read); + cat + } +} + +fn resolve_endpoint(endpoint: &str, args: &Value) -> String { + let mut resolved = endpoint.to_string(); + if let Some(obj) = args.as_object() { + for (key, val) in obj { + let ph = format!("{{{key}}}"); + if let Some(s) = val.as_str() { + resolved = resolved.replace(&ph, s); + } else if let Some(n) = val.as_u64() { + resolved = resolved.replace(&ph, &n.to_string()); + } else if let Some(n) = val.as_i64() { + resolved = resolved.replace(&ph, &n.to_string()); + } + } + } + // Strip unresolved optional query params (e.g. &limit={limit}) + while let Some(start) = resolved.find('{') { + if let Some(end) = resolved[start..].find('}') { + let before = &resolved[..start]; + // Remove &key={val} or ?key={val} patterns + let trim_from = before.rfind('&').or_else(|| before.rfind('?')).unwrap_or(start); + let after = &resolved[start + end + 1..]; + resolved = format!("{}{}", &resolved[..trim_from], after); + } else { + break; + } + } + resolved +} + +fn daemon_url() -> String { + std::env::var("DAEMON_URL").unwrap_or_else(|_| "http://localhost:8420".to_string()) +} + +fn make_client() -> reqwest::blocking::Client { + reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .unwrap_or_else(|_| reqwest::blocking::Client::new()) +} + +fn auth_header() -> Option { + std::env::var("CONVERGIO_AUTH_TOKEN").ok() +} + +fn http_get(url: &str) -> Option { + let mut req = make_client().get(url); + if let Some(token) = auth_header() { + req = req.header("Authorization", format!("Bearer {token}")); + } + match req.send() { + Ok(resp) => Some(resp.text().unwrap_or_default()), + Err(e) => { + warn!("kernel.tools GET {url}: {e}"); + Some(format!("{{\"error\":\"{e}\"}}")) + } + } +} + +fn http_post(url: &str, body: &Value) -> Option { + let mut req = make_client().post(url).json(body); + if let Some(token) = auth_header() { + req = req.header("Authorization", format!("Bearer {token}")); + } + match req.send() { + Ok(resp) => Some(resp.text().unwrap_or_default()), + Err(e) => { + warn!("kernel.tools POST {url}: {e}"); + Some(format!("{{\"error\":\"{e}\"}}")) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_catalog_has_43_tools() { + let cat = ToolCatalog::all(); + assert!(cat.tools.len() >= 43, "got {} tools", cat.tools.len()); + } + + #[test] + fn test_catalog_find_existing() { + assert!(ToolCatalog::all().find("get_plans").is_some()); + } + + #[test] + fn test_catalog_find_missing() { + assert!(ToolCatalog::all().find("nonexistent").is_none()); + } + + #[test] + fn test_descriptions_block_nonempty() { + assert!(ToolCatalog::all().descriptions_block().len() > 100); + } + + #[test] + fn test_resolve_endpoint_substitution() { + let args = serde_json::json!({"plan_id": 42}); + let result = resolve_endpoint("/api/plan/{plan_id}", &args); + assert_eq!(result, "/api/plan/42"); + } + + #[test] + fn test_read_only_excludes_write() { + let cat = ToolCatalog::all(); + let read = cat.read_only_names(); + assert!(!read.contains(&"update_task")); + assert!(read.contains(&"get_plans")); + } +} diff --git a/daemon/src/kernel/tools/org.rs b/daemon/src/kernel/tools/org.rs new file mode 100644 index 00000000..e5ae9413 --- /dev/null +++ b/daemon/src/kernel/tools/org.rs @@ -0,0 +1,123 @@ +// Org domain tools — 10 tools for organization CRUD and intelligence. + +use super::{ToolDef, ToolMethod, ToolParam, ToolTier}; + +const P_ORG_ID: &[ToolParam] = &[ToolParam { + name: "org_id", + param_type: "string", + required: true, +}]; + +const P_ORG_CREATE: &[ToolParam] = &[ + ToolParam { name: "name", param_type: "string", required: true }, + ToolParam { name: "mission", param_type: "string", required: true }, + ToolParam { name: "objectives", param_type: "string", required: true }, + ToolParam { name: "ceo_agent", param_type: "string", required: true }, + ToolParam { name: "budget", param_type: "number", required: false }, +]; + +const P_ORG_MEMBERS: &[ToolParam] = &[ + ToolParam { name: "org_id", param_type: "string", required: true }, + ToolParam { name: "agent", param_type: "string", required: true }, + ToolParam { name: "role", param_type: "string", required: true }, + ToolParam { name: "dept", param_type: "string", required: false }, +]; + +const P_ORG_SERVICES: &[ToolParam] = &[ + ToolParam { name: "org_id", param_type: "string", required: true }, + ToolParam { name: "name", param_type: "string", required: true }, + ToolParam { name: "endpoint", param_type: "string", required: true }, + ToolParam { name: "description", param_type: "string", required: false }, +]; + +const P_ORG_DECIDE: &[ToolParam] = &[ + ToolParam { name: "org_id", param_type: "string", required: true }, + ToolParam { name: "decision", param_type: "string", required: true }, + ToolParam { name: "rationale", param_type: "string", required: true }, + ToolParam { name: "made_by", param_type: "string", required: true }, +]; + +pub fn tools() -> Vec { + vec![ + ToolDef { + name: "org_create", + description: "Create a new organization. Args: name, mission, objectives, ceo_agent, optional budget.", + endpoint: "/api/orgs", + method: ToolMethod::Post, + params: P_ORG_CREATE, + tier: ToolTier::Write, + }, + ToolDef { + name: "org_list", + description: "List all organizations.", + endpoint: "/api/orgs", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "org_show", + description: "Show organization details. Args: org_id.", + endpoint: "/api/orgs/{org_id}", + method: ToolMethod::Get, + params: P_ORG_ID, + tier: ToolTier::Read, + }, + ToolDef { + name: "org_add_member", + description: "Add member to organization. Args: org_id, agent, role.", + endpoint: "/api/orgs/{org_id}/members", + method: ToolMethod::Post, + params: P_ORG_MEMBERS, + tier: ToolTier::Write, + }, + ToolDef { + name: "org_add_service", + description: "Register a service for organization. Args: org_id, name, endpoint.", + endpoint: "/api/orgs/{org_id}/services", + method: ToolMethod::Post, + params: P_ORG_SERVICES, + tier: ToolTier::Write, + }, + ToolDef { + name: "org_decide", + description: "Record a decision for organization. Args: org_id, decision, rationale, made_by.", + endpoint: "/api/orgs/{org_id}/decisions", + method: ToolMethod::Post, + params: P_ORG_DECIDE, + tier: ToolTier::Write, + }, + ToolDef { + name: "org_telemetry", + description: "Get organization telemetry data. Args: org_id.", + endpoint: "/api/orgs/{org_id}/telemetry", + method: ToolMethod::Get, + params: P_ORG_ID, + tier: ToolTier::Read, + }, + ToolDef { + name: "org_digest", + description: "Get organization digest summary. Args: org_id.", + endpoint: "/api/orgs/{org_id}/digest", + method: ToolMethod::Get, + params: P_ORG_ID, + tier: ToolTier::Read, + }, + ToolDef { + name: "org_digest_generate", + description: "Generate fresh org digest. Args: org_id.", + endpoint: "/api/orgs/{org_id}/digest/generate", + method: ToolMethod::Post, + params: P_ORG_ID, + tier: ToolTier::Write, + }, + ToolDef { + name: "morning_brief", + description: "Get the daily morning brief across all orgs.", + endpoint: "/api/digest/morning", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ] +} diff --git a/daemon/src/kernel/tools/plan.rs b/daemon/src/kernel/tools/plan.rs new file mode 100644 index 00000000..e916b7e6 --- /dev/null +++ b/daemon/src/kernel/tools/plan.rs @@ -0,0 +1,52 @@ +// Plan domain tools — 4 tools for plan/task/checkpoint operations. + +use super::{ToolDef, ToolMethod, ToolParam, ToolTier}; + +const P_PLAN_ID: &[ToolParam] = &[ToolParam { + name: "plan_id", + param_type: "integer", + required: true, +}]; + +const P_UPDATE_TASK: &[ToolParam] = &[ + ToolParam { name: "task_id", param_type: "integer", required: true }, + ToolParam { name: "status", param_type: "string", required: true }, + ToolParam { name: "notes", param_type: "string", required: false }, +]; + +pub fn tools() -> Vec { + vec![ + ToolDef { + name: "get_plans", + description: "List all plans with id, name, status, tasks_done, tasks_total.", + endpoint: "/api/plan-db/list", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "get_plan_detail", + description: "Get full plan JSON with tasks and waves. Args: plan_id.", + endpoint: "/api/plan-db/json/{plan_id}", + method: ToolMethod::Get, + params: P_PLAN_ID, + tier: ToolTier::Read, + }, + ToolDef { + name: "update_task", + description: "Update task status. Args: task_id, status, optional notes.", + endpoint: "/api/plan-db/task/update", + method: ToolMethod::Post, + params: P_UPDATE_TASK, + tier: ToolTier::Write, + }, + ToolDef { + name: "checkpoint_save", + description: "Save a checkpoint for a plan. Args: plan_id.", + endpoint: "/api/plan-db/checkpoint/save", + method: ToolMethod::Post, + params: P_PLAN_ID, + tier: ToolTier::Write, + }, + ] +} diff --git a/daemon/src/kernel/tools/platform.rs b/daemon/src/kernel/tools/platform.rs new file mode 100644 index 00000000..891756d0 --- /dev/null +++ b/daemon/src/kernel/tools/platform.rs @@ -0,0 +1,150 @@ +// Platform tools — 12 tools for plan lifecycle, validation, memory, budget. + +use super::{ToolDef, ToolMethod, ToolParam, ToolTier}; + +const P_CREATE_PLAN: &[ToolParam] = &[ + ToolParam { name: "name", param_type: "string", required: true }, + ToolParam { name: "project_id", param_type: "string", required: true }, +]; + +const P_PLAN_ID: &[ToolParam] = &[ToolParam { + name: "plan_id", + param_type: "integer", + required: true, +}]; + +const P_CREATE_TASK: &[ToolParam] = &[ + ToolParam { name: "plan_id", param_type: "integer", required: true }, + ToolParam { name: "wave_id_fk", param_type: "integer", required: true }, + ToolParam { name: "task_id", param_type: "string", required: true }, + ToolParam { name: "title", param_type: "string", required: true }, +]; + +const P_VALIDATION: &[ToolParam] = &[ + ToolParam { name: "task_id", param_type: "integer", required: true }, + ToolParam { name: "verdict", param_type: "string", required: true }, + ToolParam { name: "validator", param_type: "string", required: true }, +]; + +const P_WORKSPACE: &[ToolParam] = &[ToolParam { + name: "workspace_id", + param_type: "string", + required: true, +}]; + +const P_REMEMBER: &[ToolParam] = &[ + ToolParam { name: "agent_id", param_type: "string", required: true }, + ToolParam { name: "memory_type", param_type: "string", required: true }, + ToolParam { name: "content", param_type: "string", required: true }, +]; + +const P_RECALL: &[ToolParam] = &[ + ToolParam { name: "query", param_type: "string", required: true }, + ToolParam { name: "agent_id", param_type: "string", required: false }, +]; + +const P_MESSAGES: &[ToolParam] = &[ + ToolParam { name: "to_agent", param_type: "string", required: true }, + ToolParam { name: "limit", param_type: "integer", required: false }, +]; + +pub fn tools() -> Vec { + vec![ + ToolDef { + name: "create_plan", + description: "Create a new plan. Args: name, project_id.", + endpoint: "/api/plan-db/create", + method: ToolMethod::Post, + params: P_CREATE_PLAN, + tier: ToolTier::Write, + }, + ToolDef { + name: "start_plan", + description: "Start a plan. Args: plan_id.", + endpoint: "/api/plan-db/start/{plan_id}", + method: ToolMethod::Post, + params: P_PLAN_ID, + tier: ToolTier::Write, + }, + ToolDef { + name: "create_task", + description: "Create task in a plan wave. Args: plan_id, wave_id_fk, task_id, title.", + endpoint: "/api/plan-db/task/create", + method: ToolMethod::Post, + params: P_CREATE_TASK, + tier: ToolTier::Write, + }, + ToolDef { + name: "record_validation", + description: "Record validation verdict. Args: task_id, verdict, validator.", + endpoint: "/api/validation/record", + method: ToolMethod::Post, + params: P_VALIDATION, + tier: ToolTier::Write, + }, + ToolDef { + name: "quality_gate", + description: "Run quality gate on workspace. Args: workspace_id.", + endpoint: "/api/workspace/quality-gate", + method: ToolMethod::Post, + params: P_WORKSPACE, + tier: ToolTier::Write, + }, + ToolDef { + name: "health_deep", + description: "Deep health check of the platform.", + endpoint: "/api/health/deep", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "list_workspaces", + description: "List all workspaces.", + endpoint: "/api/workspace/list", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "remember", + description: "Store a memory entry. Args: agent_id, memory_type, content.", + endpoint: "/api/memory/remember", + method: ToolMethod::Post, + params: P_REMEMBER, + tier: ToolTier::Write, + }, + ToolDef { + name: "recall", + description: "Recall from agent memory. Args: query, optional agent_id.", + endpoint: "/api/memory/recall?query={query}", + method: ToolMethod::Get, + params: P_RECALL, + tier: ToolTier::Read, + }, + ToolDef { + name: "budget", + description: "Get budget status and spending.", + endpoint: "/api/budget/status", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "agent_catalog", + description: "Get the full agent catalog with capabilities.", + endpoint: "/api/agents/catalog", + method: ToolMethod::Get, + params: &[], + tier: ToolTier::Read, + }, + ToolDef { + name: "list_messages", + description: "List IPC messages for an agent. Args: to_agent, optional limit.", + endpoint: "/api/ipc/messages?to_agent={to_agent}&limit={limit}", + method: ToolMethod::Get, + params: P_MESSAGES, + tier: ToolTier::Read, + }, + ] +} diff --git a/daemon/src/kernel/tools_tests.rs b/daemon/src/kernel/tools_tests.rs deleted file mode 100644 index ac1c5af3..00000000 --- a/daemon/src/kernel/tools_tests.rs +++ /dev/null @@ -1,55 +0,0 @@ -use super::*; - -#[test] -fn tool_definitions_returns_all_tools() { - let defs = tool_definitions(); - assert_eq!(defs.len(), 12); - let names: Vec<&str> = defs.iter().map(|d| d.name).collect(); - assert!(names.contains(&"get_plans")); - assert!(names.contains(&"get_plan_detail")); - assert!(names.contains(&"get_costs")); - assert!(names.contains(&"get_node_status")); - assert!(names.contains(&"get_kernel_status")); - assert!(names.contains(&"get_agents")); - assert!(names.contains(&"restart_node")); - assert!(names.contains(&"get_health")); - assert!(names.contains(&"get_agent_history")); - assert!(names.contains(&"get_mesh_status")); - assert!(names.contains(&"create_org")); - assert!(names.contains(&"scan_repo")); -} - -#[test] -fn call_tool_unknown_returns_none() { - let result = call_tool("nonexistent", "http://localhost:1", &serde_json::json!({})); - assert!(result.is_none()); -} - -#[test] -fn call_tool_get_plan_detail_missing_arg_returns_none() { - // plan_id not provided — should return None - let result = call_tool("get_plan_detail", "http://localhost:1", &serde_json::json!({})); - assert!(result.is_none()); -} - -#[test] -fn call_tool_dispatches_get_plans() { - // Daemon unreachable → returns error JSON (not None) - let result = call_tool("get_plans", "http://localhost:1", &serde_json::json!({})); - assert!(result.is_some()); - let v: Value = serde_json::from_str(&result.unwrap()).unwrap(); - assert!(v.get("error").is_some() || v.is_array()); -} - -#[test] -fn call_tool_restart_node_uses_target_arg() { - let result = call_tool( - "restart_node", - "http://localhost:1", - &serde_json::json!({"target": "macProM1"}), - ); - assert!(result.is_some()); - // Unreachable → error JSON - let v: Value = serde_json::from_str(&result.unwrap()).unwrap(); - assert!(v.get("error").is_some() || v.get("status").is_some()); -} diff --git a/daemon/src/kernel/voice_router_helpers.rs b/daemon/src/kernel/voice_router_helpers.rs index c208ac4e..f211998f 100644 --- a/daemon/src/kernel/voice_router_helpers.rs +++ b/daemon/src/kernel/voice_router_helpers.rs @@ -151,3 +151,33 @@ mod tests { assert_eq!(count, 0); } } + +// ── Write-intent classification ────────────────────────────────────────────── + +const WRITE_KW_IT: &[&str] = &[ + "crea", "avvia", "aggiorna", "elimina", "modifica", "aggiungi", "rimuovi", + "invia", "notifica", "riavvia", "interrompi", "assegna", "genera", + "pianifica", "organizza", +]; + +const WRITE_KW_EN: &[&str] = &[ + "create", "start", "update", "delete", "modify", "add", "remove", + "send", "notify", "restart", "interrupt", "assign", "generate", + "plan", "organize", +]; + +const CLOUD_TRIGGERS: &[&str] = &["ali", "opus", "cloud", "claude"]; + +/// Check if text contains write-intent keywords (Italian/English) or cloud triggers. +pub fn is_write_intent(text: &str) -> bool { + let lower = text.to_lowercase(); + lower.split_whitespace().any(|raw| { + let token = raw.trim_matches(|c: char| !c.is_alphanumeric()); + if token.is_empty() { + return false; + } + WRITE_KW_IT.contains(&token) + || WRITE_KW_EN.contains(&token) + || CLOUD_TRIGGERS.contains(&token) + }) +} diff --git a/daemon/src/mcp_server/tool_catalog.rs b/daemon/src/mcp_server/tool_catalog.rs index c9a13016..4b489429 100644 --- a/daemon/src/mcp_server/tool_catalog.rs +++ b/daemon/src/mcp_server/tool_catalog.rs @@ -1,5 +1,5 @@ // Copyright (c) 2026 Roberto D'Angelo. All rights reserved. -// MCP tool catalogue: all 18 tool definitions with JSON Schema and ring requirements. +// MCP tool catalogue: all 43 tool definitions with JSON Schema and ring requirements. use serde_json::json; @@ -10,7 +10,7 @@ use crate::mcp_server::tools::McpTool; /// Returns the full catalogue of MCP tools (unfiltered). pub fn all_tools() -> Vec { - let mut tools = Vec::with_capacity(35); + let mut tools = Vec::with_capacity(45); tools.extend(plan_tools()); tools.extend(chat_tools()); tools.extend(agent_tools());