Skip to content
This repository was archived by the owner on Apr 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

## [Unreleased]

## [20.9.0] - 02 Aprile 2026

### Added
- Declarative kernel tool catalog with 43 MCP tools (plan, org, agent, platform, infra)
- Cloud escalation engine: Opus via `stream_with_fallback` with 5-round tool loop
- Automatic local/cloud inference routing (`InferenceLevel::Local` vs `Cloud`)
- Write-intent classifier (`is_write_intent`) for Italian/English keyword detection
- Cloud tool loop with `<tool_call>` dispatch (same format as local Qwen)

### Fixed
- Telegram poll now shares engine with HTTP API via `Arc<Mutex<KernelEngine>>`
- `api_ask.rs` escalates to cloud on write intents (was always local-only)
- `telegram_voice.rs` wraps `route_intent` in `spawn_blocking` (no async deadlock)
- MCP `tool_catalog.rs` header updated from 18 to 43 tools

### Changed
- Kernel tools refactored from single `tools.rs` to `tools/` module (5 domain files)
- `engine_tool_loop.rs` uses `ToolCatalog::all()` instead of hardcoded definitions
- `engine_context.rs` uses new catalog for context gathering

## [20.8.2] - 01 Aprile 2026

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion VERSION.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20.8.2
20.9.0
2 changes: 1 addition & 1 deletion daemon/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion daemon/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "convergio-platform-daemon"
version = "20.8.2"
version = "20.9.0"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion daemon/src/kernel/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod handlers {
let chat_id = crate::telegram_config::telegram_chat_id();
match (token, chat_id) {
(Some(token), Ok(Some(chat_id))) => {
let engine_clone = Arc::new(KernelEngine::new(config.clone()));
let engine_clone = state.engine.clone();
let daemon_url = std::env::var("DAEMON_URL")
.unwrap_or_else(|_| "http://localhost:8420".to_string());
tracing::info!("jarvis: spawning Telegram poll loop for chat_id={chat_id}");
Expand Down
50 changes: 28 additions & 22 deletions daemon/src/kernel/api_ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,42 @@ mod inner {
///
/// Routes through voice_router: classifies intent first.
/// - Simple queries (stato, costi) → handled by voice_router directly
/// - EscalateToAli → forwards to Ali (Opus) via chat API
/// - EscalateToAli (unrecognized) → forwarded to Ali (Opus)
/// - EscalateToAli / write intents → cloud escalation (Opus)
pub async fn handle_ask(
State(state): State<KernelState>,
Json(body): Json<AskRequest>,
) -> Json<AskResponse> {
let question = body.question.clone();
let engine = state.engine.clone();
let answer = tokio::task::spawn_blocking(move || {
let q = question.to_lowercase();
// Check for Ali escalation FIRST (before any LLM classification)
if q.starts_with("ali ") || q.starts_with("ali,") || q == "ali"
|| q.contains(" ali ") || q.contains("chiedi ad ali")
|| q.contains("opus") || q.contains("cloud")
{
return route_intent(
VoiceIntent::EscalateToAli { question: question.clone() },
"http://localhost:8420",
);
}
// For everything else: classify intent then route

// Check inference level — cloud for write intents or Ali escalation
let level = {
let eng = engine.lock().unwrap_or_else(|p| p.into_inner());
let intent = classify_intent(&question, &eng);
match &intent {
VoiceIntent::EscalateToAli { .. } => eng.ask(&question),
_ => route_intent(intent, "http://localhost:8420"),
eng.inference_level_for(&question)
};

let answer = if level == crate::kernel::engine::InferenceLevel::Cloud {
crate::kernel::cloud_escalation::cloud_ask_with_tools(&question, "").await
} else {
let q = question.clone();
let q2 = question.clone();
// Classify locally; if EscalateToAli, escalate to cloud
let intent = {
let eng = engine.lock().unwrap_or_else(|p| p.into_inner());
classify_intent(&q, &eng)
};
Comment on lines +49 to +53
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classify_intent() may perform local model inference (KernelEngine::classify -> bridge.infer), which is blocking. In this handler it's executed on the async runtime thread (inside a mutex lock) before spawn_blocking, which can stall the tokio worker under load. Consider moving the intent classification into spawn_blocking as well (or providing an async classification path).

Suggested change
// Classify locally; if EscalateToAli, escalate to cloud
let intent = {
let eng = engine.lock().unwrap_or_else(|p| p.into_inner());
classify_intent(&q, &eng)
};
let engine_for_intent = engine.clone();
let q_for_intent = q.clone();
// Classify locally; if EscalateToAli, escalate to cloud
let intent = tokio::task::spawn_blocking(move || {
let eng = engine_for_intent
.lock()
.unwrap_or_else(|p| p.into_inner());
classify_intent(&q_for_intent, &eng)
})
.await
.expect("classify_intent join failed");

Copilot uses AI. Check for mistakes.
if matches!(intent, VoiceIntent::EscalateToAli { .. }) {
crate::kernel::cloud_escalation::cloud_ask_with_tools(&q2, "").await
} else {
tokio::task::spawn_blocking(move || {
let du = std::env::var("DAEMON_URL")
.unwrap_or_else(|_| "http://localhost:8420".into());
route_intent(intent, &du)
})
.await
.unwrap_or_else(|e| format!("Errore interno: {e}"))
}
})
.await
.unwrap_or_else(|e| format!("Errore interno: {e}"));
};

Json(AskResponse { answer })
}
Expand Down
173 changes: 173 additions & 0 deletions daemon/src/kernel/cloud_escalation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2026 Roberto D'Angelo. All rights reserved.
// Cloud escalation — async LLM via Convergio provider/fallback pipeline.
// Used when local model unavailable or for write-intent operations.

use crate::kernel::engine_context::{extract_tool_call, smart_context_gather_pub};
use crate::kernel::tools::ToolCatalog;
use crate::server::llm_client::{ChatMessage, StreamChunk};
use crate::server::provider::stream_with_fallback;
use futures_util::StreamExt;
use tracing::{info, warn};

/// Default cloud model for kernel escalation.
pub const CLOUD_MODEL: &str = "claude-opus-4-20250514";
const MAX_CLOUD_ROUNDS: usize = 5;

fn daemon_url() -> String {
std::env::var("DAEMON_URL").unwrap_or_else(|_| "http://localhost:8420".to_string())
}

/// Ask via cloud provider — no tool loop.
pub async fn cloud_ask(question: &str, history_chatml: &str) -> String {
let q = question.to_string();
let context = match tokio::task::spawn_blocking(move || {
smart_context_gather_pub(&q, &daemon_url())
})
.await
{
Ok(ctx) => ctx,
Err(e) => {
warn!(error = %e, "kernel: context gather failed");
String::from("Nessun contesto disponibile.")
}
};
let messages = build_messages(&context, question, history_chatml, false);
collect_stream(messages).await
}

/// Ask via cloud provider WITH tool calling loop (up to 5 rounds).
pub async fn cloud_ask_with_tools(question: &str, history: &str) -> String {
let q = question.to_string();
let context = match tokio::task::spawn_blocking(move || {
smart_context_gather_pub(&q, &daemon_url())
})
.await
{
Ok(ctx) => ctx,
Err(e) => {
warn!(error = %e, "kernel: context gather failed");
String::from("Nessun contesto disponibile.")
}
};

let mut messages = build_messages(&context, question, history, true);
let du = daemon_url();

Comment thread
Roberdan marked this conversation as resolved.
for round in 0..MAX_CLOUD_ROUNDS {
let response = collect_stream(messages.clone()).await;

let Some((tool_name, args_str)) = extract_tool_call(&response) else {
return strip_after_tool_call(&response);
};

info!(round, tool = %tool_name, "kernel.cloud: tool call");
let args: serde_json::Value =
serde_json::from_str(&args_str).unwrap_or(serde_json::json!({}));
let url = du.clone();
let tn = tool_name.clone();
let tool_result = tokio::task::spawn_blocking(move || {
ToolCatalog::all()
.call_tool(&tn, &url, &args)
.unwrap_or_else(|| format!("Tool '{tn}' not found."))
})
.await
.unwrap_or_else(|e| format!("Tool dispatch error: {e}"));

messages.push(ChatMessage {
role: "assistant".into(),
content: response,
});
messages.push(ChatMessage {
role: "user".into(),
content: format!("[Tool result: {tool_name}]\n{tool_result}"),
});
}
"Raggiunto il limite di chiamate strumenti cloud.".into()
}

fn build_messages(
context: &str,
question: &str,
history: &str,
with_tools: bool,
) -> Vec<ChatMessage> {
let tools_section = if with_tools {
ToolCatalog::all().descriptions_block()
} else {
String::new()
};
let system = format!(
"Sei Jarvis, l'assistente AI di Convergio. Rispondi in italiano, conciso.\n\n\
Dati sistema:\n{context}\n\n{tools_section}"
);
let mut msgs = vec![ChatMessage {
role: "system".into(),
content: system,
}];
if !history.is_empty() {
msgs.push(ChatMessage {
role: "user".into(),
content: format!("[Conversazione precedente]\n{history}"),
});
}
msgs.push(ChatMessage {
role: "user".into(),
content: question.to_string(),
});
msgs
}

async fn collect_stream(messages: Vec<ChatMessage>) -> String {
let (provider, _) = crate::server::provider::provider_for_model(CLOUD_MODEL);
info!(model = CLOUD_MODEL, "kernel: cloud inference");
let mut stream = stream_with_fallback(provider, CLOUD_MODEL, messages);
let mut text = String::new();
while let Some(chunk) = stream.next().await {
match chunk {
StreamChunk::Text(t) => text.push_str(&t),
StreamChunk::Error(e) => {
warn!(error = %e, "kernel: cloud stream error");
if text.is_empty() {
return format!("Errore cloud: {e}");
}
}
StreamChunk::Usage(_) => {}
}
}
if text.is_empty() {
"Nessuna risposta dal provider cloud.".into()
} else {
text.trim().to_string()
}
}

fn strip_after_tool_call(text: &str) -> String {
if let Some(pos) = text.find("</tool_call>") {
let after = text[pos + "</tool_call>".len()..].trim();
if !after.is_empty() {
return after.to_string();
}
}
text.to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn cloud_model_is_opus() {
assert!(CLOUD_MODEL.contains("opus"));
}

#[test]
fn cloud_model_routes_to_claude() {
let (p, _) = crate::server::provider::provider_for_model(CLOUD_MODEL);
assert_eq!(p, crate::server::llm_client::Provider::ClaudeSubscription);
}

#[test]
fn max_rounds_is_5() {
assert_eq!(MAX_CLOUD_ROUNDS, 5);
}
}
47 changes: 47 additions & 0 deletions daemon/src/kernel/cloud_escalation_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Tests for cloud escalation and write-intent classification.

use crate::kernel::cloud_escalation::CLOUD_MODEL;
use crate::kernel::voice_router_helpers::is_write_intent;

#[test]
fn test_cloud_model_is_opus() {
assert!(CLOUD_MODEL.contains("opus"), "must target Opus");
}

#[test]
fn test_cloud_model_routes_to_claude() {
let (p, _) = crate::server::provider::provider_for_model(CLOUD_MODEL);
assert_eq!(p, crate::server::llm_client::Provider::ClaudeSubscription);
}

#[test]
fn test_write_intent_italian() {
assert!(is_write_intent("crea un piano"));
}

#[test]
fn test_write_intent_english() {
assert!(is_write_intent("create a new org"));
}

#[test]
fn test_read_intent_italian() {
assert!(!is_write_intent("stato dei piani"));
}
Comment on lines +27 to +30
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests assume local inference can be available after load_model(), but inference_level_for() first checks bridge.is_available() (Apple Silicon + mlx_lm). In CI/non-Apple environments this will always be false, so the expected InferenceLevel::Local assertions will fail. Make the tests environment-agnostic (e.g. assert Local only when eng.is_local_available() is true, otherwise Cloud), or add a test-only hook to force local availability.

Copilot uses AI. Check for mistakes.

#[test]
fn test_write_intent_ali_escalation() {
assert!(is_write_intent("parla con ali"));
}

#[test]
fn test_no_model_returns_cloud() {
use crate::kernel::engine::{InferenceLevel, KernelConfig, KernelEngine};
let eng = KernelEngine::new(KernelConfig::default());
assert_eq!(eng.inference_level_for("qualsiasi cosa"), InferenceLevel::Cloud);
}

#[test]
fn test_read_intent_cost_query() {
assert!(!is_write_intent("quanto costa"));
}
24 changes: 24 additions & 0 deletions daemon/src/kernel/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
use crate::ipc::models::apple_fm::{AppleFmBridge, InferenceRequest};
use crate::kernel::engine_context::smart_context_gather;
use crate::kernel::engine_tool_loop;
use crate::kernel::voice_router_helpers;
use serde::{Deserialize, Serialize};
use std::time::Instant;

pub use crate::kernel::engine_context::smart_context_gather_pub;

/// Inference routing: local Qwen or cloud Opus.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InferenceLevel {
Local,
Cloud,
}
Comment on lines +14 to +19
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is now 251 lines long, exceeding the repo’s 250-line limit for .rs files (pre-commit hook will block). To keep engine.rs within limits, move InferenceLevel and/or the routing helpers into a small submodule (e.g. engine_routing.rs) or trim existing code so the file is ≤ 250 lines.

Copilot generated this review using guidance from repository custom instructions.

/// Classification severity from kernel inference.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -90,6 +98,22 @@ impl KernelEngine {
self.loaded_model.is_some()
}

/// True when local inference is available (bridge + model).
pub fn is_local_available(&self) -> bool {
self.bridge.is_available() && self.loaded_model.is_some()
}

/// Decide local vs cloud based on question content and model availability.
pub fn inference_level_for(&self, question: &str) -> InferenceLevel {
if !self.is_local_available() {
return InferenceLevel::Cloud;
}
if voice_router_helpers::is_write_intent(question) {
return InferenceLevel::Cloud;
}
InferenceLevel::Local
}
Comment on lines +101 to +115
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is now 251 lines (see line 251 at EOF), but the repo's FileSizeGuard blocks .rs files over 250 lines. Please trim/split to get back under the 250-line limit (often removing a trailing blank line or moving a helper into a submodule is enough).

Copilot uses AI. Check for mistakes.

/// Classify a situation string via MLX inference.
///
/// Falls back to heuristic keyword-based classification when the bridge is
Expand Down
Loading
Loading