diff --git a/crates/mimi-core/Cargo.toml b/crates/mimi-core/Cargo.toml index 675213f..79f6be8 100644 --- a/crates/mimi-core/Cargo.toml +++ b/crates/mimi-core/Cargo.toml @@ -19,6 +19,7 @@ uuid.workspace = true chrono.workspace = true log.workspace = true num_cpus = "1.16" +rand = "0.8" [dev-dependencies] tokio-test = "0.4" diff --git a/crates/mimi-core/src/health_monitor.rs b/crates/mimi-core/src/health_monitor.rs new file mode 100644 index 0000000..6771a8a --- /dev/null +++ b/crates/mimi-core/src/health_monitor.rs @@ -0,0 +1,328 @@ +//! Health Monitoring System +//! +//! Extends basic health checks with metric tracking, auto-publishing to Pandora, +//! and auto-escalation on threshold breaches. + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tracing::{debug, info, warn}; + +use crate::pandora_client::PandoraClient; +use crate::state_machine::{ComponentHealthCheck, MimiState}; +use crate::zenoh_bus::ZenohBusAdapter; + +const MAX_METRIC_HISTORY: usize = 1000; +const FAILURE_THRESHOLD: usize = 5; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthMetric { + pub timestamp: DateTime, + pub component_name: String, + pub metric_type: HealthMetricType, + pub value: f64, + pub threshold: f64, + pub is_healthy: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum HealthMetricType { + CpuUsage, + MemoryUsage, + Latency, + ErrorRate, + HeartbeatMissed, +} + +pub struct HealthMonitor { + metrics: Arc>>, + pandora: Option>, + zenoh: Option>, + failure_counts: Arc>>, +} + +impl HealthMonitor { + pub fn new() -> Self { + Self { + metrics: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_METRIC_HISTORY))), + pandora: None, + zenoh: None, + failure_counts: Arc::new(Mutex::new(std::collections::HashMap::new())), + } + } + + pub async fn with_pandora(mut self, pandora: Arc) -> Self { + self.pandora = Some(pandora); + self + } + + pub async fn with_zenoh(mut self, zenoh: Arc) -> Self { + self.zenoh = Some(zenoh); + self + } + + pub async fn record_metric(&self, metric: HealthMetric) -> Result<()> { + { + let mut metrics = self.metrics.lock().unwrap(); + if metrics.len() >= MAX_METRIC_HISTORY { + metrics.pop_front(); + } + metrics.push_back(metric.clone()); + } + + if !metric.is_healthy { + self.track_failure(&metric.component_name).await?; + } else { + self.reset_failure_count(&metric.component_name).await; + } + + if let Some(pandora) = &self.pandora { + self.publish_to_pandora(pandora, &metric).await?; + } + + if let Some(zenoh) = &self.zenoh { + self.publish_to_zenoh(zenoh, &metric).await?; + } + + Ok(()) + } + + async fn track_failure(&self, component: &str) -> Result<()> { + let count = { + let mut counts = self.failure_counts.lock().unwrap(); + let entry = counts.entry(component.to_string()).or_insert(0); + *entry += 1; + *entry + }; + + if count >= FAILURE_THRESHOLD { + warn!( + "Component {} exceeded failure threshold: {}/{}", + component, count, FAILURE_THRESHOLD + ); + self.escalate_failure(component).await?; + } + + Ok(()) + } + + async fn reset_failure_count(&self, component: &str) { + let mut counts = self.failure_counts.lock().unwrap(); + counts.remove(component); + } + + async fn escalate_failure(&self, component: &str) -> Result<()> { + info!("Escalating failure for component: {}", component); + + if let Some(pandora) = &self.pandora { + let metadata = serde_json::json!({ + "component": component, + "failure_count": FAILURE_THRESHOLD, + "action": "escalated", + }); + + pandora + .persist_critical_state(MimiState::FailedComponent, Utc::now(), metadata) + .await?; + } + + Ok(()) + } + + async fn publish_to_pandora( + &self, + pandora: &Arc, + metric: &HealthMetric, + ) -> Result<()> { + if !metric.is_healthy { + let metadata = serde_json::json!({ + "metric_type": format!("{:?}", metric.metric_type), + "value": metric.value, + "threshold": metric.threshold, + "component": metric.component_name, + }); + + pandora + .persist_critical_state(MimiState::Degraded, metric.timestamp, metadata) + .await?; + + debug!( + "Published unhealthy metric to Pandora: {:?}", + metric.metric_type + ); + } + + Ok(()) + } + + async fn publish_to_zenoh( + &self, + zenoh: &Arc, + metric: &HealthMetric, + ) -> Result<()> { + if !metric.is_healthy { + zenoh + .publish_state_change(MimiState::Idle, MimiState::Degraded, metric.timestamp) + .await?; + + debug!("Published health degradation to Zenoh"); + } + + Ok(()) + } + + pub fn get_recent_metrics(&self, count: usize) -> Vec { + let metrics = self.metrics.lock().unwrap(); + metrics.iter().rev().take(count).cloned().collect() + } + + pub fn get_metrics_in_window(&self, window_secs: i64) -> Vec { + let metrics = self.metrics.lock().unwrap(); + let cutoff = Utc::now() - chrono::Duration::seconds(window_secs); + + metrics + .iter() + .filter(|m| m.timestamp > cutoff) + .cloned() + .collect() + } + + pub async fn check_component_health(&self, health_check: &ComponentHealthCheck) -> Result<()> { + let is_healthy = health_check.is_healthy(); + + let metric = HealthMetric { + timestamp: Utc::now(), + component_name: "system".to_string(), + metric_type: HealthMetricType::ErrorRate, + value: if is_healthy { 0.0 } else { 100.0 }, + threshold: 10.0, + is_healthy, + }; + + self.record_metric(metric).await?; + Ok(()) + } +} + +impl Default for HealthMonitor { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_health_metric_tracking() { + let monitor = HealthMonitor::new(); + + let metric = HealthMetric { + timestamp: Utc::now(), + component_name: "test_component".to_string(), + metric_type: HealthMetricType::CpuUsage, + value: 45.0, + threshold: 80.0, + is_healthy: true, + }; + + let result = monitor.record_metric(metric).await; + assert!(result.is_ok()); + + let recent = monitor.get_recent_metrics(10); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].component_name, "test_component"); + } + + #[tokio::test] + async fn test_auto_escalation() { + let monitor = HealthMonitor::new(); + + for _i in 0..FAILURE_THRESHOLD + 1 { + let metric = HealthMetric { + timestamp: Utc::now(), + component_name: "failing_component".to_string(), + metric_type: HealthMetricType::ErrorRate, + value: 100.0, + threshold: 10.0, + is_healthy: false, + }; + + let result = monitor.record_metric(metric).await; + assert!(result.is_ok()); + } + + let counts = monitor.failure_counts.lock().unwrap(); + assert!(counts.get("failing_component").unwrap_or(&0) >= &FAILURE_THRESHOLD); + } + + #[tokio::test] + async fn test_metrics_in_window() { + let monitor = HealthMonitor::new(); + + let old_metric = HealthMetric { + timestamp: Utc::now() - chrono::Duration::hours(2), + component_name: "test".to_string(), + metric_type: HealthMetricType::Latency, + value: 50.0, + threshold: 100.0, + is_healthy: true, + }; + + let new_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "test".to_string(), + metric_type: HealthMetricType::Latency, + value: 60.0, + threshold: 100.0, + is_healthy: true, + }; + + monitor.record_metric(old_metric).await.unwrap(); + monitor.record_metric(new_metric).await.unwrap(); + + let window_metrics = monitor.get_metrics_in_window(3600); + assert_eq!(window_metrics.len(), 1); + } + + #[tokio::test] + async fn test_failure_count_reset() { + let monitor = HealthMonitor::new(); + + let bad_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "test".to_string(), + metric_type: HealthMetricType::ErrorRate, + value: 100.0, + threshold: 10.0, + is_healthy: false, + }; + + monitor.record_metric(bad_metric).await.unwrap(); + + { + let counts = monitor.failure_counts.lock().unwrap(); + assert_eq!(counts.get("test"), Some(&1)); + } + + let good_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "test".to_string(), + metric_type: HealthMetricType::ErrorRate, + value: 5.0, + threshold: 10.0, + is_healthy: true, + }; + + monitor.record_metric(good_metric).await.unwrap(); + + { + let counts = monitor.failure_counts.lock().unwrap(); + assert_eq!(counts.get("test"), None); + } + } +} diff --git a/crates/mimi-core/src/lib.rs b/crates/mimi-core/src/lib.rs index 68e00b9..d27f60d 100644 --- a/crates/mimi-core/src/lib.rs +++ b/crates/mimi-core/src/lib.rs @@ -5,15 +5,21 @@ pub mod config; pub mod error; +pub mod health_monitor; pub mod message; +pub mod pandora_client; pub mod routing; pub mod serialization; pub mod state_machine; +pub mod zenoh_bus; pub use error::{Error, Result}; +pub use health_monitor::{HealthMetric, HealthMetricType, HealthMonitor}; +pub use pandora_client::{FailurePattern, Neo4jConfig, PandoraClient, StateHistoryRecord}; pub use routing::{MessageRouter, RoutingError, Topic, TopicPattern}; pub use serialization::{MessageSerializer, SerializationError}; -pub use state_machine::{MimiState, StateManager}; +pub use state_machine::{ComponentHealthCheck, MimiState, StateManager}; +pub use zenoh_bus::{StateChangeMessage, ZenohBusAdapter, ZenohConfig}; /// Core version pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/mimi-core/src/message.rs b/crates/mimi-core/src/message.rs index 79797cd..8baf57f 100644 --- a/crates/mimi-core/src/message.rs +++ b/crates/mimi-core/src/message.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -10,6 +11,15 @@ pub struct Message { pub payload: serde_json::Value, } +/// Task message for Zenoh bus +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskMessage { + pub id: String, + pub payload: String, + pub priority: u8, + pub created_at: DateTime, +} + impl Message { pub fn new( source: impl Into, diff --git a/crates/mimi-core/src/pandora_client.rs b/crates/mimi-core/src/pandora_client.rs new file mode 100644 index 0000000..ff77ac9 --- /dev/null +++ b/crates/mimi-core/src/pandora_client.rs @@ -0,0 +1,151 @@ +//! Pandora Neo4j Integration (Mock Implementation) +//! +//! Provides selective state persistence to Neo4j graph database. +//! This is a mock implementation for the interface. + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, info}; + +use crate::state_machine::MimiState; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Neo4jConfig { + pub uri: String, + pub username: String, + pub password: String, + pub database: String, +} + +impl Default for Neo4jConfig { + fn default() -> Self { + Self { + uri: "bolt://localhost:7687".to_string(), + username: "neo4j".to_string(), + password: "password".to_string(), + database: "neo4j".to_string(), + } + } +} + +pub struct PandoraClient { + #[allow(dead_code)] + config: Neo4jConfig, +} + +impl PandoraClient { + pub async fn new() -> Result { + Self::with_config(Neo4jConfig::default()).await + } + + pub async fn with_config(config: Neo4jConfig) -> Result { + info!("Creating Pandora client (mock) with URI: {}", config.uri); + Ok(Self { config }) + } + + pub async fn persist_critical_state( + &self, + state: MimiState, + timestamp: DateTime, + metadata: serde_json::Value, + ) -> Result { + let node_id = uuid::Uuid::new_v4().to_string(); + + debug!( + "Persisting critical state (mock): {:?} at {} -> node {}", + state, timestamp, node_id + ); + + let _ = (state, timestamp, metadata); + Ok(node_id) + } + + pub async fn query_state_history( + &self, + from: DateTime, + to: DateTime, + state_filter: Option, + ) -> Result> { + debug!( + "Querying state history (mock): {} to {} filter={:?}", + from, to, state_filter + ); + + Ok(vec![]) + } + + pub async fn query_failure_patterns(&self, window_hours: u32) -> Result> { + debug!("Querying failure patterns (mock): window={}h", window_hours); + + Ok(vec![]) + } + + pub async fn close(self) -> Result<()> { + info!("Closing Pandora client (mock)"); + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateHistoryRecord { + pub node_id: String, + pub state: String, + pub timestamp: DateTime, + pub metadata: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FailurePattern { + pub pattern_type: String, + pub frequency: u32, + pub last_occurrence: DateTime, + pub states_involved: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_create_pandora_client() { + let result = PandoraClient::new().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_persist_critical_state() { + let client = PandoraClient::new().await.expect("Failed to create client"); + + let state = MimiState::CriticalError; + let timestamp = Utc::now(); + let metadata = serde_json::json!({"error": "test error"}); + + let result = client + .persist_critical_state(state, timestamp, metadata) + .await; + + assert!(result.is_ok()); + let node_id = result.unwrap(); + assert!(!node_id.is_empty()); + } + + #[tokio::test] + async fn test_query_state_history() { + let client = PandoraClient::new().await.expect("Failed to create client"); + + let from = Utc::now() - chrono::Duration::hours(24); + let to = Utc::now(); + + let result = client.query_state_history(from, to, None).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_query_failure_patterns() { + let client = PandoraClient::new().await.expect("Failed to create client"); + + let result = client.query_failure_patterns(24).await; + assert!(result.is_ok()); + } +} diff --git a/crates/mimi-core/src/state_machine.rs b/crates/mimi-core/src/state_machine.rs index b727733..20d7662 100644 --- a/crates/mimi-core/src/state_machine.rs +++ b/crates/mimi-core/src/state_machine.rs @@ -3,7 +3,16 @@ //! Implements the 10-state finite state machine for Mimi orchestrator core lifecycle. //! Provides async execution, guard conditions, error recovery, and message bus integration. +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tokio::time::{sleep, timeout}; +use uuid::Uuid; /// Mimi system states #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -30,16 +39,59 @@ pub enum MimiState { Shutdown, } +/// Task wrapper for priority queue ordering +#[derive(Clone)] +struct PrioritizedTask { + task: Task, + sequence: u64, +} + +impl PartialEq for PrioritizedTask { + fn eq(&self, other: &Self) -> bool { + self.task.priority == other.task.priority && self.sequence == other.sequence + } +} + +impl Eq for PrioritizedTask {} + +impl PartialOrd for PrioritizedTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PrioritizedTask { + fn cmp(&self, other: &Self) -> Ordering { + match self.task.priority.cmp(&other.task.priority) { + Ordering::Equal => other.sequence.cmp(&self.sequence), + other_ord => other_ord, + } + } +} + /// State manager with thread-safe access pub struct StateManager { state: Arc>, + component_health: Arc>>, + task_queue: Arc>>, + queue_capacity: usize, + sequence_counter: Arc>, } impl StateManager { /// Create new state manager starting in Idle state pub fn new() -> Self { + Self::with_capacity(1000) + } + + /// Create state manager with custom queue capacity + pub fn with_capacity(capacity: usize) -> Self { Self { state: Arc::new(Mutex::new(MimiState::Idle)), + component_health: Arc::new(Mutex::new(None)), + task_queue: Arc::new(Mutex::new(BinaryHeap::new())), + queue_capacity: capacity, + sequence_counter: Arc::new(Mutex::new(0)), } } @@ -47,6 +99,242 @@ impl StateManager { pub fn current_state(&self) -> MimiState { *self.state.lock().unwrap() } + + /// Enqueue task with priority ordering + pub fn enqueue_task(&self, task: Task) -> Result<()> { + let mut queue = self.task_queue.lock().unwrap(); + + if queue.len() >= self.queue_capacity { + return Err(anyhow!( + "Task queue full (capacity: {})", + self.queue_capacity + )); + } + + let mut counter = self.sequence_counter.lock().unwrap(); + let sequence = *counter; + *counter += 1; + + queue.push(PrioritizedTask { task, sequence }); + + Ok(()) + } + + /// Dequeue highest priority task (FIFO within priority) + pub fn dequeue_task(&self) -> Result { + let mut queue = self.task_queue.lock().unwrap(); + + queue + .pop() + .map(|pt| pt.task) + .ok_or_else(|| anyhow!("Task queue is empty")) + } + + /// Get current queue size + pub fn queue_size(&self) -> usize { + self.task_queue.lock().unwrap().len() + } + + /// Update component health and trigger escalation if needed + pub fn update_component_health(&self, health: ComponentHealthCheck) -> Result<()> { + let mut health_guard = self.component_health.lock().unwrap(); + *health_guard = Some(health); + + if health.needs_recovery() { + drop(health_guard); + self.force_error_state(MimiState::Recovering); + } else if health.needs_degraded() { + drop(health_guard); + self.force_error_state(MimiState::Degraded); + } + + Ok(()) + } + + /// Transition to new state with validation + pub fn transition_to(&self, new_state: MimiState) -> Result<()> { + let health_guard = self.component_health.lock().unwrap(); + + if let Some(health) = *health_guard { + if health.needs_recovery() { + drop(health_guard); + self.force_error_state(MimiState::Recovering); + return Ok(()); + } else if health.needs_degraded() { + drop(health_guard); + self.force_error_state(MimiState::Degraded); + return Ok(()); + } + } + drop(health_guard); + + let mut state = self.state.lock().unwrap(); + let current = *state; + + let transition = StateTransition::new(current, new_state); + + if !transition.is_valid() { + return Err(anyhow!( + "Invalid state transition: {:?} -> {:?}", + current, + new_state + )); + } + + log::info!("State transition: {:?} -> {:?}", current, new_state); + *state = new_state; + + Ok(()) + } + + /// Check component health and transition if needed + pub fn check_and_transition( + &self, + target_state: MimiState, + health: &ComponentHealth, + ) -> Result<()> { + if !TransitionGuard::check_component_health(health) { + log::warn!("Component health check failed, transitioning to Degraded"); + return self.transition_to(MimiState::Degraded); + } + + self.transition_to(target_state) + } + + /// Force transition to error state (bypasses validation) + pub fn force_error_state(&self, error_state: MimiState) { + let mut state = self.state.lock().unwrap(); + + log::error!("Forcing error state: {:?}", error_state); + *state = error_state; + } + + /// Execute next task from queue + pub async fn execute_next_task(&self) -> Result<()> { + let task = self.dequeue_task()?; + + log::info!("Executing task: {} ({})", task.name, task.id); + + match task.execution_model { + ExecutionModel::Blocking => self.execute_blocking_task(task).await, + ExecutionModel::Async => self.execute_async_task(task).await, + } + } + + /// Execute task in blocking mode (for fast operations <500ms) + async fn execute_blocking_task(&self, task: Task) -> Result<()> { + self.transition_to(MimiState::Executing)?; + + let task_name = task.name.clone(); + let result = timeout(task.timeout, async { + tokio::task::spawn_blocking(move || { + log::debug!("Blocking task {} executing", task.name); + Ok::<(), anyhow::Error>(()) + }) + .await? + }) + .await; + + match result { + Ok(Ok(())) => { + log::info!("Task {} completed successfully", task_name); + self.transition_to(MimiState::Responding)?; + Ok(()) + }, + Ok(Err(e)) => { + log::error!("Task {} failed: {}", task_name, e); + Err(e) + }, + Err(_) => { + log::error!("Task {} timed out", task_name); + Err(anyhow!("Task execution timeout")) + }, + } + } + + /// Execute task in async mode (for long operations >500ms) + async fn execute_async_task(&self, task: Task) -> Result<()> { + self.transition_to(MimiState::Executing)?; + + let task_name = task.name.clone(); + let task_timeout = task.timeout; + + let result = timeout(task_timeout, async move { + log::debug!("Async task {} executing", task.name); + + sleep(Duration::from_millis(100)).await; + + Ok::<(), anyhow::Error>(()) + }) + .await; + + match result { + Ok(Ok(())) => { + log::info!("Task {} completed successfully", task_name); + self.transition_to(MimiState::Responding)?; + Ok(()) + }, + Ok(Err(e)) => { + log::error!("Task {} failed: {}", task_name, e); + Err(e) + }, + Err(_) => { + log::error!("Task {} timed out", task_name); + Err(anyhow!("Task execution timeout")) + }, + } + } + + /// Execute task with retry logic + pub async fn execute_with_retry(&self) -> Result<()> { + let mut task = self.dequeue_task()?; + let retry_strategy = RetryStrategy::exponential_with_jitter(); + + loop { + let result = match task.execution_model { + ExecutionModel::Blocking => self.execute_blocking_task(task.clone()).await, + ExecutionModel::Async => self.execute_async_task(task.clone()).await, + }; + + match result { + Ok(()) => { + log::info!( + "Task {} succeeded after {} retries", + task.name, + task.retries + ); + return Ok(()); + }, + Err(e) => { + if !task.can_retry() { + log::error!( + "Task {} failed after {} retries: {}", + task.name, + task.max_retries, + e + ); + return Err(anyhow!( + "Task failed after {} retries: {}", + task.max_retries, + e + )); + } + + task.increment_retry(); + let delay = retry_strategy.next_delay(task.retries - 1); + + log::warn!( + "Task {} failed (attempt {}), retrying in {:?}", + task.name, + task.retries, + delay + ); + + sleep(delay).await; + }, + } + } + } } impl Default for StateManager { @@ -54,3 +342,383 @@ impl Default for StateManager { Self::new() } } + +/// Component health metrics for guard conditions +#[derive(Debug, Clone, Copy)] +pub struct ComponentHealth { + pub latency_ms: u64, + pub memory_usage_percent: u8, + pub last_heartbeat_secs: u64, +} + +/// Component health check with thresholds +#[derive(Debug, Clone, Copy)] +pub struct ComponentHealthCheck { + latency_ms: u64, + memory_percent: u8, + heartbeat_age_secs: u64, +} + +impl ComponentHealthCheck { + const LATENCY_THRESHOLD_MS: u64 = 5000; + const MEMORY_THRESHOLD_PERCENT: u8 = 80; + const HEARTBEAT_THRESHOLD_SECS: u64 = 30; + + pub fn new(latency_ms: u64, memory_percent: u8, heartbeat_age_secs: u64) -> Self { + Self { + latency_ms, + memory_percent, + heartbeat_age_secs, + } + } + + pub fn is_healthy(&self) -> bool { + self.latency_ms <= Self::LATENCY_THRESHOLD_MS + && self.memory_percent <= Self::MEMORY_THRESHOLD_PERCENT + && self.heartbeat_age_secs <= Self::HEARTBEAT_THRESHOLD_SECS + } + + pub fn needs_recovery(&self) -> bool { + self.heartbeat_age_secs > Self::HEARTBEAT_THRESHOLD_SECS + } + + pub fn needs_degraded(&self) -> bool { + (self.latency_ms > Self::LATENCY_THRESHOLD_MS + || self.memory_percent > Self::MEMORY_THRESHOLD_PERCENT) + && self.heartbeat_age_secs <= Self::HEARTBEAT_THRESHOLD_SECS + } +} + +/// State transition representation +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct StateTransition { + pub from: MimiState, + pub to: MimiState, +} + +impl StateTransition { + /// Create new state transition + pub fn new(from: MimiState, to: MimiState) -> Self { + Self { from, to } + } + + /// Check if transition is valid according to FSM rules + pub fn is_valid(&self) -> bool { + use MimiState::*; + + if self.from == self.to { + return true; + } + + matches!( + (self.from, self.to), + // Normal flow + |(Idle, Listening)| (Listening, Processing) + | (Processing, Executing) + | (Executing, Responding) + | (Responding, Idle) + + // Recovery paths + | (Degraded, Recovering) + | (FailedComponent, Recovering) + | (Recovering, Idle) + + // Error escalation from any state + | (_, Degraded) + | (_, FailedComponent) + | (_, CriticalError) + + // Shutdown from any state + | (_, Shutdown) + ) + } +} + +/// Guard condition evaluator for state transitions +pub struct TransitionGuard; + +impl TransitionGuard { + /// Latency threshold: 5 seconds + const LATENCY_THRESHOLD_MS: u64 = 5000; + + /// Memory usage threshold: 80% + const MEMORY_THRESHOLD_PERCENT: u8 = 80; + + /// Heartbeat timeout: 30 seconds + const HEARTBEAT_TIMEOUT_SECS: u64 = 30; + + /// Check if component health is within acceptable thresholds + pub fn check_component_health(health: &ComponentHealth) -> bool { + health.latency_ms <= Self::LATENCY_THRESHOLD_MS + && health.memory_usage_percent <= Self::MEMORY_THRESHOLD_PERCENT + && health.last_heartbeat_secs <= Self::HEARTBEAT_TIMEOUT_SECS + } + + /// Check if task queue has capacity + pub fn check_queue_capacity(current: usize, max: usize) -> bool { + current < max + } + + /// Check if task timeout is within bounds + pub fn check_task_timeout(timeout: &Duration, max: &Duration) -> bool { + timeout <= max + } +} + +/// Task priority levels +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum TaskPriority { + Low = 0, + Normal = 1, + High = 2, + Critical = 3, +} + +/// Task types matching IntentType from schema.fbs +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum TaskType { + Query, + Execute, + SkillPublish, + StateUpdate, + MemoryUpdate, + ErrorReport, + Control, +} + +/// Execution model for task processing +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum ExecutionModel { + /// Synchronous blocking execution (<500ms expected) + Blocking, + /// Asynchronous with callback (>500ms expected) + Async, +} + +/// Task representation with full lifecycle metadata +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + pub id: Uuid, + pub task_type: TaskType, + pub name: String, + pub priority: TaskPriority, + pub payload: Vec, + pub timeout: Duration, + pub retries: u32, + pub max_retries: u32, + pub created_at: DateTime, + pub execution_model: ExecutionModel, +} + +impl Task { + /// Create new task with defaults + pub fn new(task_type: TaskType, name: &str) -> Self { + Self { + id: Uuid::new_v4(), + task_type, + name: name.to_string(), + priority: TaskPriority::Normal, + payload: Vec::new(), + timeout: Duration::from_secs(30), + retries: 0, + max_retries: 3, + created_at: Utc::now(), + execution_model: ExecutionModel::Async, + } + } + + /// Set task priority (builder pattern) + pub fn with_priority(mut self, priority: TaskPriority) -> Self { + self.priority = priority; + self + } + + /// Set timeout (builder pattern) + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + /// Set payload (builder pattern) + pub fn with_payload(mut self, payload: Vec) -> Self { + self.payload = payload; + self + } + + /// Set execution model (builder pattern) + pub fn with_execution_model(mut self, model: ExecutionModel) -> Self { + self.execution_model = model; + self + } + + /// Check if task can be retried + pub fn can_retry(&self) -> bool { + self.retries < self.max_retries + } + + /// Increment retry counter + pub fn increment_retry(&mut self) { + self.retries += 1; + } +} + +/// Circuit breaker states +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CircuitState { + /// Circuit closed, requests flow normally + Closed, + /// Circuit open, requests rejected immediately + Open, + /// Circuit half-open, testing if service recovered + HalfOpen, +} + +/// Circuit breaker for preventing cascade failures +pub struct CircuitBreaker { + state: Arc>, + failure_count: Arc>, + failure_threshold: u32, + timeout: Duration, + last_failure_time: Arc>>, +} + +impl CircuitBreaker { + /// Create new circuit breaker + pub fn new(failure_threshold: u32, timeout: Duration) -> Self { + Self { + state: Arc::new(Mutex::new(CircuitState::Closed)), + failure_count: Arc::new(Mutex::new(0)), + failure_threshold, + timeout, + last_failure_time: Arc::new(Mutex::new(None)), + } + } + + /// Get current circuit state + pub fn state(&self) -> CircuitState { + let state = *self.state.lock().unwrap(); + + if state == CircuitState::Open { + let last_failure = self.last_failure_time.lock().unwrap(); + + if let Some(time) = *last_failure { + if time.elapsed() >= self.timeout { + let mut state_guard = self.state.lock().unwrap(); + *state_guard = CircuitState::HalfOpen; + return CircuitState::HalfOpen; + } + } + } + + state + } + + /// Record successful execution + pub fn record_success(&self) { + let current_state = self.state(); + + if current_state == CircuitState::HalfOpen { + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Closed; + + let mut count = self.failure_count.lock().unwrap(); + *count = 0; + + log::info!("Circuit breaker closed after successful test"); + } + } + + /// Record failed execution + pub fn record_failure(&self) { + let mut count = self.failure_count.lock().unwrap(); + *count += 1; + + let mut last_failure = self.last_failure_time.lock().unwrap(); + *last_failure = Some(Instant::now()); + + if *count >= self.failure_threshold { + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Open; + + log::warn!( + "Circuit breaker opened after {} failures", + self.failure_threshold + ); + } + } + + /// Check if request should be allowed + pub fn allow_request(&self) -> bool { + let state = self.state(); + + match state { + CircuitState::Closed => true, + CircuitState::Open => false, + CircuitState::HalfOpen => true, + } + } + + /// Reset circuit breaker to closed state + pub fn reset(&self) { + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Closed; + + let mut count = self.failure_count.lock().unwrap(); + *count = 0; + } +} + +/// Retry strategy with exponential backoff +#[derive(Debug, Clone)] +pub struct RetryStrategy { + base_delay_ms: u64, + max_delay_ms: u64, + jitter_enabled: bool, + jitter_factor: f64, +} + +impl RetryStrategy { + /// Create exponential backoff strategy (100ms -> 5s) + pub fn exponential() -> Self { + Self { + base_delay_ms: 100, + max_delay_ms: 5000, + jitter_enabled: false, + jitter_factor: 0.0, + } + } + + /// Create exponential backoff with 20% jitter + pub fn exponential_with_jitter() -> Self { + Self { + base_delay_ms: 100, + max_delay_ms: 5000, + jitter_enabled: true, + jitter_factor: 0.2, + } + } + + /// Calculate delay for retry attempt + pub fn next_delay(&self, retry_count: u32) -> Duration { + let base_delay = self.base_delay_ms * 2_u64.pow(retry_count); + let capped_delay = base_delay.min(self.max_delay_ms); + + if self.jitter_enabled { + let jitter_range = (capped_delay as f64 * self.jitter_factor) as u64; + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(0..=jitter_range * 2); + let with_jitter = + (capped_delay as i64 - jitter_range as i64 + jitter as i64).max(0) as u64; + + Duration::from_millis(with_jitter) + } else { + Duration::from_millis(capped_delay) + } + } +} + +impl Default for RetryStrategy { + fn default() -> Self { + Self::exponential_with_jitter() + } +} diff --git a/crates/mimi-core/src/zenoh_bus.rs b/crates/mimi-core/src/zenoh_bus.rs new file mode 100644 index 0000000..46e04d8 --- /dev/null +++ b/crates/mimi-core/src/zenoh_bus.rs @@ -0,0 +1,178 @@ +//! Zenoh Message Bus Adapter (Mock Implementation) +//! +//! Provides integration interface for Zenoh distributed message passing. +//! This is a mock implementation that can be replaced with actual Zenoh integration. + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tracing::{debug, info}; + +use crate::message::TaskMessage; +use crate::state_machine::MimiState; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ZenohConfig { + pub mode: String, + pub connect: Vec, + pub listen: Vec, +} + +impl Default for ZenohConfig { + fn default() -> Self { + Self { + mode: "peer".to_string(), + connect: vec![], + listen: vec![], + } + } +} + +pub struct ZenohBusAdapter { + #[allow(dead_code)] + config: ZenohConfig, + task_key_expr: String, + #[allow(dead_code)] + state_key_expr: String, +} + +impl ZenohBusAdapter { + pub async fn new() -> Result { + Self::with_config(ZenohConfig::default()).await + } + + pub async fn with_config(config: ZenohConfig) -> Result { + info!( + "Creating Zenoh bus adapter (mock) with mode: {}", + config.mode + ); + + Ok(Self { + config, + task_key_expr: "mimi/tasks/**".to_string(), + state_key_expr: "mimi/state/**".to_string(), + }) + } + + pub async fn subscribe_tasks(&self) -> Result> { + let (tx, rx) = mpsc::channel(100); + + info!("Subscribed to Zenoh key (mock): {}", self.task_key_expr); + + tokio::spawn(async move { + debug!("Mock task subscriber spawned"); + drop(tx); + }); + + Ok(rx) + } + + pub async fn publish_state_change( + &self, + from_state: MimiState, + to_state: MimiState, + timestamp: chrono::DateTime, + ) -> Result<()> { + let state_name = format!("{:?}", to_state).to_lowercase(); + let key = format!("mimi/state/{}", state_name); + + let state_msg = StateChangeMessage { + from_state: format!("{:?}", from_state), + to_state: format!("{:?}", to_state), + timestamp: timestamp.to_rfc3339(), + }; + + debug!( + "Published state change (mock): {:?} -> {:?} on key: {}", + from_state, to_state, key + ); + let _ = state_msg; + Ok(()) + } + + #[allow(dead_code)] + fn deserialize_task(bytes: &[u8]) -> Result { + let task_msg: TaskMessage = serde_json::from_slice(bytes)?; + Ok(task_msg) + } + + pub async fn close(self) -> Result<()> { + info!("Closing Zenoh session (mock)"); + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateChangeMessage { + pub from_state: String, + pub to_state: String, + pub timestamp: String, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tokio::time::timeout; + + #[tokio::test] + async fn test_create_zenoh_adapter() { + let result = ZenohBusAdapter::new().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_subscribe_receives_task() { + let adapter = ZenohBusAdapter::new() + .await + .expect("Failed to create adapter"); + let mut rx = adapter + .subscribe_tasks() + .await + .expect("Failed to subscribe"); + + match timeout(Duration::from_millis(100), rx.recv()).await { + Ok(None) => { + assert!(true); + }, + Ok(Some(_)) => panic!("Unexpected task received"), + Err(_) => { + assert!(true); + }, + } + } + + #[tokio::test] + async fn test_publish_state_change() { + let adapter = ZenohBusAdapter::new() + .await + .expect("Failed to create adapter"); + + let from_state = MimiState::Idle; + let to_state = MimiState::Listening; + let timestamp = chrono::Utc::now(); + + let result = adapter + .publish_state_change(from_state, to_state, timestamp) + .await; + + assert!(result.is_ok()); + } + + #[test] + fn test_deserialize_task() { + let task = TaskMessage { + id: "test-123".to_string(), + payload: "test_data".to_string(), + priority: 5, + created_at: chrono::Utc::now(), + }; + + let json = serde_json::to_vec(&task).unwrap(); + let deserialized = ZenohBusAdapter::deserialize_task(&json).unwrap(); + + assert_eq!(deserialized.id, task.id); + assert_eq!(deserialized.payload, task.payload); + assert_eq!(deserialized.priority, task.priority); + } +} diff --git a/crates/mimi-core/tests/acceptance_tests.rs b/crates/mimi-core/tests/acceptance_tests.rs new file mode 100644 index 0000000..351d968 --- /dev/null +++ b/crates/mimi-core/tests/acceptance_tests.rs @@ -0,0 +1,290 @@ +//! Acceptance Tests for State Machine +//! +//! High-level end-to-end scenarios validating system behavior + +use chrono::Utc; +use mimi_core::state_machine::*; +use mimi_core::{HealthMetric, HealthMetricType, HealthMonitor}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +// ============================================================================ +// Scenario 1: Happy Path - Complete Task Execution +// ============================================================================ + +#[tokio::test] +async fn acceptance_happy_path_complete_workflow() { + let manager = Arc::new(StateManager::new()); + let health_monitor = Arc::new(HealthMonitor::new()); + + manager.transition_to(MimiState::Listening).unwrap(); + + let task = Task::new(TaskType::Query, "user_query") + .with_priority(TaskPriority::Normal) + .with_execution_model(ExecutionModel::Blocking) + .with_payload(b"What is the weather?".to_vec()); + + manager.enqueue_task(task).unwrap(); + + manager.transition_to(MimiState::Processing).unwrap(); + + let metric = HealthMetric { + timestamp: Utc::now(), + component_name: "beatrice".to_string(), + metric_type: HealthMetricType::Latency, + value: 100.0, + threshold: 5000.0, + is_healthy: true, + }; + health_monitor.record_metric(metric).await.unwrap(); + + manager.execute_next_task().await.unwrap(); + + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); + assert_eq!(manager.queue_size(), 0); + + let recent_metrics = health_monitor.get_recent_metrics(10); + assert!(!recent_metrics.is_empty()); +} + +// ============================================================================ +// Scenario 2: Component Failure Recovery +// ============================================================================ + +#[tokio::test] +async fn acceptance_component_failure_and_recovery() { + let manager = Arc::new(StateManager::new()); + let health_monitor = Arc::new(HealthMonitor::new()); + let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(5))); + + manager.transition_to(MimiState::Listening).unwrap(); + + let failure_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "pandora".to_string(), + metric_type: HealthMetricType::Latency, + value: 7000.0, + threshold: 5000.0, + is_healthy: false, + }; + health_monitor.record_metric(failure_metric).await.unwrap(); + + manager.force_error_state(MimiState::FailedComponent); + + circuit_breaker.record_failure(); + circuit_breaker.record_failure(); + circuit_breaker.record_failure(); + + assert_eq!(circuit_breaker.state(), CircuitState::Open); + + manager.transition_to(MimiState::Recovering).unwrap(); + + let recovery_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "pandora".to_string(), + metric_type: HealthMetricType::Latency, + value: 200.0, + threshold: 5000.0, + is_healthy: true, + }; + health_monitor.record_metric(recovery_metric).await.unwrap(); + + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); +} + +// ============================================================================ +// Scenario 3: Cascade Fallback with Retries +// ============================================================================ + +#[tokio::test] +async fn acceptance_cascade_fallback_retry_strategy() { + let manager = Arc::new(StateManager::new()); + let retry_strategy = RetryStrategy::exponential_with_jitter(); + + manager.transition_to(MimiState::Listening).unwrap(); + + let mut task = Task::new(TaskType::Execute, "complex_api_call") + .with_priority(TaskPriority::High) + .with_execution_model(ExecutionModel::Async) + .with_timeout(Duration::from_secs(10)); + + task.max_retries = 5; + + manager.enqueue_task(task.clone()).unwrap(); + + let mut retry_count = 0; + loop { + let result = manager.execute_next_task().await; + + if result.is_ok() { + break; + } + + if retry_count >= task.max_retries { + manager.force_error_state(MimiState::Degraded); + break; + } + + let delay = retry_strategy.next_delay(retry_count); + sleep(delay).await; + + retry_count += 1; + manager.enqueue_task(task.clone()).unwrap(); + } + + let state = manager.current_state(); + assert!(state == MimiState::Idle || state == MimiState::Degraded); +} + +// ============================================================================ +// Scenario 4: Graceful Shutdown +// ============================================================================ + +#[tokio::test] +async fn acceptance_graceful_shutdown_with_pending_tasks() { + let manager = Arc::new(StateManager::new()); + + manager.transition_to(MimiState::Listening).unwrap(); + + for i in 0..5 { + let task = Task::new(TaskType::Execute, &format!("task_{}", i)) + .with_priority(TaskPriority::Normal); + manager.enqueue_task(task).unwrap(); + } + + assert_eq!(manager.queue_size(), 5); + + manager.transition_to(MimiState::Shutdown).unwrap(); + + while manager.queue_size() > 0 { + let result = manager.execute_next_task().await; + + if result.is_err() { + break; + } + } + + assert_eq!(manager.current_state(), MimiState::Shutdown); +} + +// ============================================================================ +// Scenario 5: Chaos Engineering - Multiple Simultaneous Failures +// ============================================================================ + +#[tokio::test] +async fn acceptance_chaos_multiple_failures() { + let manager = Arc::new(StateManager::with_capacity(100)); + let health_monitor = Arc::new(HealthMonitor::new()); + let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(2))); + + manager.transition_to(MimiState::Listening).unwrap(); + + for i in 0..50 { + let task = Task::new(TaskType::Execute, &format!("burst_task_{}", i)) + .with_priority(TaskPriority::High); + let _ = manager.enqueue_task(task); + } + + let beatrice_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "beatrice".to_string(), + metric_type: HealthMetricType::Latency, + value: 8000.0, + threshold: 5000.0, + is_healthy: false, + }; + health_monitor.record_metric(beatrice_metric).await.unwrap(); + + let pandora_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "pandora".to_string(), + metric_type: HealthMetricType::MemoryUsage, + value: 95.0, + threshold: 80.0, + is_healthy: false, + }; + health_monitor.record_metric(pandora_metric).await.unwrap(); + + let echidna_metric = HealthMetric { + timestamp: Utc::now(), + component_name: "echidna".to_string(), + metric_type: HealthMetricType::Latency, + value: 6500.0, + threshold: 5000.0, + is_healthy: false, + }; + health_monitor.record_metric(echidna_metric).await.unwrap(); + + for _ in 0..3 { + circuit_breaker.record_failure(); + } + + assert_eq!(circuit_breaker.state(), CircuitState::Open); + + let unhealthy_metrics: Vec<_> = health_monitor + .get_recent_metrics(100) + .into_iter() + .filter(|m| !m.is_healthy) + .collect(); + assert!(!unhealthy_metrics.is_empty()); + + if unhealthy_metrics.len() >= 2 { + manager.force_error_state(MimiState::Degraded); + } + + assert!(!circuit_breaker.allow_request()); + + manager.transition_to(MimiState::Recovering).unwrap(); + + sleep(Duration::from_millis(100)).await; + + let beatrice_recovery = HealthMetric { + timestamp: Utc::now(), + component_name: "beatrice".to_string(), + metric_type: HealthMetricType::Latency, + value: 200.0, + threshold: 5000.0, + is_healthy: true, + }; + health_monitor + .record_metric(beatrice_recovery) + .await + .unwrap(); + + let pandora_recovery = HealthMetric { + timestamp: Utc::now(), + component_name: "pandora".to_string(), + metric_type: HealthMetricType::MemoryUsage, + value: 60.0, + threshold: 80.0, + is_healthy: true, + }; + health_monitor + .record_metric(pandora_recovery) + .await + .unwrap(); + + let echidna_recovery = HealthMetric { + timestamp: Utc::now(), + component_name: "echidna".to_string(), + metric_type: HealthMetricType::Latency, + value: 300.0, + threshold: 5000.0, + is_healthy: true, + }; + health_monitor + .record_metric(echidna_recovery) + .await + .unwrap(); + + sleep(Duration::from_secs(3)).await; + + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); +} diff --git a/crates/mimi-core/tests/integration_tests.rs b/crates/mimi-core/tests/integration_tests.rs new file mode 100644 index 0000000..f6a0b8b --- /dev/null +++ b/crates/mimi-core/tests/integration_tests.rs @@ -0,0 +1,217 @@ +//! State Machine Integration Tests + +use mimi_core::state_machine::*; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +// ============================================================================ +// Full Lifecycle Tests +// ============================================================================ + +#[tokio::test] +async fn test_full_task_lifecycle() { + let manager = Arc::new(StateManager::new()); + + // Idle -> Listening + manager.transition_to(MimiState::Listening).unwrap(); + assert_eq!(manager.current_state(), MimiState::Listening); + + // Queue task + let task = Task::new(TaskType::Execute, "lifecycle_test") + .with_execution_model(ExecutionModel::Blocking); + manager.enqueue_task(task).unwrap(); + + // Listening -> Processing + manager.transition_to(MimiState::Processing).unwrap(); + + // Processing -> Executing -> Responding + manager.execute_next_task().await.unwrap(); + + // Responding -> Idle + manager.transition_to(MimiState::Idle).unwrap(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[tokio::test] +async fn test_error_recovery_flow() { + let manager = Arc::new(StateManager::new()); + + manager.transition_to(MimiState::Listening).unwrap(); + + // Simulate component failure + manager.force_error_state(MimiState::FailedComponent); + assert_eq!(manager.current_state(), MimiState::FailedComponent); + + // Enter recovery + manager.transition_to(MimiState::Recovering).unwrap(); + assert_eq!(manager.current_state(), MimiState::Recovering); + + // Recover to Idle + manager.transition_to(MimiState::Idle).unwrap(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[tokio::test] +async fn test_degraded_mode_operation() { + let manager = Arc::new(StateManager::new()); + + manager.transition_to(MimiState::Listening).unwrap(); + + // Enter degraded mode + manager.transition_to(MimiState::Degraded).unwrap(); + + // Should still be able to queue tasks + let task = Task::new(TaskType::Query, "degraded_task").with_priority(TaskPriority::Low); + assert!(manager.enqueue_task(task).is_ok()); + + // Recover + manager.transition_to(MimiState::Recovering).unwrap(); + manager.transition_to(MimiState::Idle).unwrap(); +} + +// ============================================================================ +// Retry and Circuit Breaker Integration +// ============================================================================ + +#[tokio::test] +async fn test_circuit_breaker_prevents_overload() { + let breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(5))); + let manager = Arc::new(StateManager::new()); + + // Simulate 3 failures + for _ in 0..3 { + breaker.record_failure(); + } + + assert_eq!(breaker.state(), CircuitState::Open); + + // Circuit should block requests + assert!(!breaker.allow_request()); + + // Wait for half-open + sleep(Duration::from_secs(6)).await; + assert_eq!(breaker.state(), CircuitState::HalfOpen); + + // Test request allowed + assert!(breaker.allow_request()); +} + +#[tokio::test] +async fn test_task_queue_priority_under_load() { + let manager = Arc::new(StateManager::with_capacity(100)); + + // Queue mixed-priority tasks + for i in 0..50 { + let priority = match i % 3 { + 0 => TaskPriority::Critical, + 1 => TaskPriority::High, + _ => TaskPriority::Normal, + }; + + let task = Task::new(TaskType::Execute, &format!("task_{}", i)).with_priority(priority); + manager.enqueue_task(task).unwrap(); + } + + assert_eq!(manager.queue_size(), 50); + + // First dequeued should be critical + let first = manager.dequeue_task().unwrap(); + assert_eq!(first.priority, TaskPriority::Critical); +} + +// ============================================================================ +// Concurrent Access Tests +// ============================================================================ + +#[tokio::test] +async fn test_concurrent_task_enqueue() { + let manager = Arc::new(StateManager::new()); + + let mut handles = vec![]; + + for i in 0..10 { + let mgr = manager.clone(); + let handle = tokio::spawn(async move { + let task = Task::new(TaskType::Query, &format!("task_{}", i)); + mgr.enqueue_task(task).unwrap(); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + assert_eq!(manager.queue_size(), 10); +} + +#[tokio::test] +async fn test_concurrent_state_transitions() { + let manager = Arc::new(StateManager::new()); + + let mut handles = vec![]; + + for _ in 0..5 { + let mgr = manager.clone(); + let handle = tokio::spawn(async move { + let _ = mgr.transition_to(MimiState::Listening); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + // Should end up in Listening state + assert_eq!(manager.current_state(), MimiState::Listening); +} + +// ============================================================================ +// Performance Tests +// ============================================================================ + +#[tokio::test] +async fn test_high_throughput_task_processing() { + let manager = Arc::new(StateManager::with_capacity(10000)); + + // Enqueue 1000 tasks + for i in 0..1000 { + let task = + Task::new(TaskType::Query, &format!("task_{}", i)).with_priority(if i % 2 == 0 { + TaskPriority::High + } else { + TaskPriority::Normal + }); + manager.enqueue_task(task).unwrap(); + } + + assert_eq!(manager.queue_size(), 1000); + + // Dequeue all + for _ in 0..1000 { + assert!(manager.dequeue_task().is_ok()); + } + + assert_eq!(manager.queue_size(), 0); +} + +#[tokio::test] +async fn test_state_transition_sequence() { + let manager = Arc::new(StateManager::new()); + + let sequence = vec![ + MimiState::Listening, + MimiState::Processing, + MimiState::Executing, + MimiState::Responding, + MimiState::Idle, + ]; + + for state in sequence { + let result = manager.transition_to(state); + assert!(result.is_ok(), "Failed to transition to {:?}", state); + assert_eq!(manager.current_state(), state); + } +} diff --git a/crates/mimi-core/tests/state_machine_tests.rs b/crates/mimi-core/tests/state_machine_tests.rs index bf03148..dd5858c 100644 --- a/crates/mimi-core/tests/state_machine_tests.rs +++ b/crates/mimi-core/tests/state_machine_tests.rs @@ -1,10 +1,628 @@ //! State Machine Unit Tests -use mimi_core::state_machine::{MimiState, StateManager}; +use mimi_core::state_machine::{ + ComponentHealth, ComponentHealthCheck, MimiState, StateManager, StateTransition, + TransitionGuard, +}; #[test] fn test_initial_state_is_idle() { - // This will fail because StateManager doesn't exist yet let manager = StateManager::new(); assert_eq!(manager.current_state(), MimiState::Idle); } + +#[test] +fn test_valid_state_transition_idle_to_listening() { + let transition = StateTransition::new(MimiState::Idle, MimiState::Listening); + assert!(transition.is_valid()); +} + +#[test] +fn test_invalid_state_transition_idle_to_executing() { + let transition = StateTransition::new(MimiState::Idle, MimiState::Executing); + assert!(!transition.is_valid()); +} + +#[test] +fn test_guard_condition_healthy_component() { + let health = ComponentHealth { + latency_ms: 100, + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + assert!(TransitionGuard::check_component_health(&health)); +} + +#[test] +fn test_guard_condition_unhealthy_high_latency() { + let health = ComponentHealth { + latency_ms: 6000, + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + assert!(!TransitionGuard::check_component_health(&health)); +} + +#[test] +fn test_guard_condition_unhealthy_high_memory() { + let health = ComponentHealth { + latency_ms: 100, + memory_usage_percent: 85, + last_heartbeat_secs: 5, + }; + + assert!(!TransitionGuard::check_component_health(&health)); +} + +#[test] +fn test_transition_state_success() { + let manager = StateManager::new(); + + let result = manager.transition_to(MimiState::Listening); + assert!(result.is_ok()); + assert_eq!(manager.current_state(), MimiState::Listening); +} + +#[test] +fn test_transition_state_invalid() { + let manager = StateManager::new(); + + let result = manager.transition_to(MimiState::Executing); + assert!(result.is_err()); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[test] +fn test_transition_with_health_check() { + let manager = StateManager::new(); + + let unhealthy = ComponentHealth { + latency_ms: 6000, + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + let result = manager.check_and_transition(MimiState::Listening, &unhealthy); + assert!(result.is_ok()); + assert_eq!(manager.current_state(), MimiState::Degraded); +} + +#[test] +fn test_component_health_check_is_healthy() { + use mimi_core::state_machine::ComponentHealthCheck; + + let health = ComponentHealthCheck::new(100, 50, 5); + assert!(health.is_healthy()); +} + +#[test] +fn test_component_health_check_unhealthy_latency() { + use mimi_core::state_machine::ComponentHealthCheck; + + // Latency >5s = DEGRADED + let health = ComponentHealthCheck::new(6000, 50, 5); + assert!(!health.is_healthy()); +} + +#[test] +fn test_component_health_check_unhealthy_memory() { + use mimi_core::state_machine::ComponentHealthCheck; + + // Memory >80% = DEGRADED + let health = ComponentHealthCheck::new(100, 85, 5); + assert!(!health.is_healthy()); +} + +#[test] +fn test_component_health_check_unhealthy_heartbeat() { + use mimi_core::state_machine::ComponentHealthCheck; + + // Heartbeat missing >30s = RECOVERING + let health = ComponentHealthCheck::new(100, 50, 35); + assert!(!health.is_healthy()); +} + +#[test] +fn test_health_monitoring_auto_degrade() { + let manager = StateManager::new(); + + let healthy = ComponentHealthCheck::new(100, 50, 5); + manager.update_component_health(healthy).unwrap(); + manager.transition_to(MimiState::Listening).unwrap(); + assert_eq!(manager.current_state(), MimiState::Listening); + + let unhealthy = ComponentHealthCheck::new(6000, 50, 5); + manager.update_component_health(unhealthy).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Degraded); +} + +#[test] +fn test_health_monitoring_auto_recovering() { + let manager = StateManager::new(); + + let unhealthy = ComponentHealthCheck::new(100, 50, 35); + manager.update_component_health(unhealthy).unwrap(); + manager.transition_to(MimiState::Listening).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Recovering); +} + +// ============================================================================ +// Task Tests +// ============================================================================ + +use mimi_core::state_machine::{Task, TaskPriority, TaskType}; +use std::time::Duration; + +#[test] +fn test_task_creation_with_defaults() { + let task = Task::new(TaskType::Query, "test_task"); + + assert_eq!(task.task_type, TaskType::Query); + assert_eq!(task.priority, TaskPriority::Normal); + assert_eq!(task.retries, 0); + assert_eq!(task.max_retries, 3); + assert!(task.timeout.as_secs() == 30); +} + +#[test] +fn test_task_with_high_priority() { + let task = Task::new(TaskType::Execute, "critical_task") + .with_priority(TaskPriority::Critical) + .with_timeout(Duration::from_secs(60)); + + assert_eq!(task.priority, TaskPriority::Critical); + assert_eq!(task.timeout.as_secs(), 60); +} + +// ============================================================================ +// Task Queue Tests +// ============================================================================ + +#[test] +fn test_task_queue_fifo_within_priority() { + let manager = StateManager::new(); + + let task1 = Task::new(TaskType::Query, "query1").with_priority(TaskPriority::Normal); + let task2 = Task::new(TaskType::Execute, "exec1").with_priority(TaskPriority::High); + let task3 = Task::new(TaskType::Query, "query2").with_priority(TaskPriority::Normal); + + manager.enqueue_task(task1.clone()).unwrap(); + manager.enqueue_task(task2.clone()).unwrap(); + manager.enqueue_task(task3.clone()).unwrap(); + + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "exec1"); + + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "query1"); + + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "query2"); +} + +#[test] +fn test_task_queue_capacity_limit() { + let manager = StateManager::with_capacity(2); + + let task1 = Task::new(TaskType::Query, "task1"); + let task2 = Task::new(TaskType::Query, "task2"); + let task3 = Task::new(TaskType::Query, "task3"); + + assert!(manager.enqueue_task(task1).is_ok()); + assert!(manager.enqueue_task(task2).is_ok()); + + let result = manager.enqueue_task(task3); + assert!(result.is_err()); +} + +// ============================================================================ +// Circuit Breaker Tests +// ============================================================================ + +use mimi_core::state_machine::{CircuitBreaker, CircuitState}; + +#[test] +fn test_circuit_breaker_opens_after_failures() { + let breaker = CircuitBreaker::new(3, Duration::from_secs(10)); + + assert_eq!(breaker.state(), CircuitState::Closed); + + // Record 3 failures + breaker.record_failure(); + breaker.record_failure(); + breaker.record_failure(); + + // Should open after 3 failures + assert_eq!(breaker.state(), CircuitState::Open); +} + +#[test] +fn test_circuit_breaker_half_open_after_timeout() { + let breaker = CircuitBreaker::new(3, Duration::from_millis(100)); + + // Open the circuit + breaker.record_failure(); + breaker.record_failure(); + breaker.record_failure(); + + assert_eq!(breaker.state(), CircuitState::Open); + + // Wait for timeout + std::thread::sleep(Duration::from_millis(150)); + + // Should transition to HalfOpen + assert_eq!(breaker.state(), CircuitState::HalfOpen); +} + +#[test] +fn test_circuit_breaker_closes_on_success() { + let breaker = CircuitBreaker::new(3, Duration::from_millis(100)); + + // Open circuit + for _ in 0..3 { + breaker.record_failure(); + } + + // Wait for half-open + std::thread::sleep(Duration::from_millis(150)); + assert_eq!(breaker.state(), CircuitState::HalfOpen); + + // Success should close circuit + breaker.record_success(); + assert_eq!(breaker.state(), CircuitState::Closed); +} + +// ============================================================================ +// Async Task Execution Tests +// ============================================================================ + +use mimi_core::state_machine::ExecutionModel; + +#[tokio::test] +async fn test_execute_task_blocking_mode() { + let manager = StateManager::new(); + manager.transition_to(MimiState::Listening).unwrap(); + manager.transition_to(MimiState::Processing).unwrap(); + + let task = + Task::new(TaskType::Query, "fast_query").with_execution_model(ExecutionModel::Blocking); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_next_task().await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_execute_task_async_mode() { + let manager = StateManager::new(); + manager.transition_to(MimiState::Listening).unwrap(); + manager.transition_to(MimiState::Processing).unwrap(); + + let task = Task::new(TaskType::Execute, "slow_exec") + .with_execution_model(ExecutionModel::Async) + .with_timeout(Duration::from_secs(5)); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_next_task().await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_task_timeout_handling() { + let manager = StateManager::new(); + manager.transition_to(MimiState::Listening).unwrap(); + manager.transition_to(MimiState::Processing).unwrap(); + + let task = Task::new(TaskType::Execute, "timeout_task").with_timeout(Duration::from_millis(10)); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_next_task().await; + assert!(result.is_err()); +} + +// ============================================================================ +// Retry Strategy Tests +// ============================================================================ + +use mimi_core::state_machine::RetryStrategy; + +#[test] +fn test_exponential_backoff_sequence() { + let strategy = RetryStrategy::exponential(); + + let delay1 = strategy.next_delay(0); + assert_eq!(delay1.as_millis(), 100); + + let delay2 = strategy.next_delay(1); + assert_eq!(delay2.as_millis(), 200); + + let delay3 = strategy.next_delay(2); + assert_eq!(delay3.as_millis(), 400); + + let delay4 = strategy.next_delay(10); + assert_eq!(delay4.as_millis(), 5000); +} + +#[test] +fn test_retry_with_jitter() { + let strategy = RetryStrategy::exponential_with_jitter(); + + let delay = strategy.next_delay(2); + + assert!(delay.as_millis() >= 320); + assert!(delay.as_millis() <= 480); +} + +#[tokio::test] +async fn test_execute_with_retry_success() { + let manager = StateManager::new(); + manager.transition_to(MimiState::Listening).unwrap(); + manager.transition_to(MimiState::Processing).unwrap(); + + let task = + Task::new(TaskType::Execute, "retry_task").with_execution_model(ExecutionModel::Blocking); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_with_retry().await; + assert!(result.is_ok()); +} + +// ============================================================================ +// Additional State Tests +// ============================================================================ + +#[test] +fn test_all_state_variants() { + let states = vec![ + MimiState::Idle, + MimiState::Listening, + MimiState::Processing, + MimiState::Executing, + MimiState::Responding, + MimiState::Degraded, + MimiState::Recovering, + MimiState::FailedComponent, + MimiState::CriticalError, + MimiState::Shutdown, + ]; + + assert_eq!(states.len(), 10); +} + +#[test] +fn test_state_equality() { + assert_eq!(MimiState::Idle, MimiState::Idle); + assert_ne!(MimiState::Idle, MimiState::Listening); +} + +#[test] +fn test_all_valid_normal_flow_transitions() { + let transitions = vec![ + (MimiState::Idle, MimiState::Listening), + (MimiState::Listening, MimiState::Processing), + (MimiState::Processing, MimiState::Executing), + (MimiState::Executing, MimiState::Responding), + (MimiState::Responding, MimiState::Idle), + ]; + + for (from, to) in transitions { + let t = StateTransition::new(from, to); + assert!(t.is_valid(), "Expected {:?} -> {:?} to be valid", from, to); + } +} + +#[test] +fn test_error_escalation_from_any_state() { + let states = vec![ + MimiState::Idle, + MimiState::Listening, + MimiState::Processing, + MimiState::Executing, + MimiState::Responding, + ]; + + for state in states { + let t1 = StateTransition::new(state, MimiState::Degraded); + assert!(t1.is_valid()); + + let t2 = StateTransition::new(state, MimiState::FailedComponent); + assert!(t2.is_valid()); + + let t3 = StateTransition::new(state, MimiState::CriticalError); + assert!(t3.is_valid()); + } +} + +#[test] +fn test_recovery_paths() { + let t1 = StateTransition::new(MimiState::Degraded, MimiState::Recovering); + assert!(t1.is_valid()); + + let t2 = StateTransition::new(MimiState::FailedComponent, MimiState::Recovering); + assert!(t2.is_valid()); + + let t3 = StateTransition::new(MimiState::Recovering, MimiState::Idle); + assert!(t3.is_valid()); +} + +#[test] +fn test_invalid_transitions() { + let invalid = vec![ + (MimiState::Idle, MimiState::Processing), + (MimiState::Idle, MimiState::Executing), + (MimiState::Listening, MimiState::Responding), + (MimiState::Processing, MimiState::Idle), + ]; + + for (from, to) in invalid { + let t = StateTransition::new(from, to); + assert!( + !t.is_valid(), + "Expected {:?} -> {:?} to be invalid", + from, + to + ); + } +} + +#[test] +fn test_guard_all_thresholds() { + // All healthy + let h1 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 80, + last_heartbeat_secs: 30, + }; + assert!(TransitionGuard::check_component_health(&h1)); + + // Just over latency threshold + let h2 = ComponentHealth { + latency_ms: 5001, + memory_usage_percent: 80, + last_heartbeat_secs: 30, + }; + assert!(!TransitionGuard::check_component_health(&h2)); + + // Just over memory threshold + let h3 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 81, + last_heartbeat_secs: 30, + }; + assert!(!TransitionGuard::check_component_health(&h3)); + + // Just over heartbeat threshold + let h4 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 80, + last_heartbeat_secs: 31, + }; + assert!(!TransitionGuard::check_component_health(&h4)); +} + +#[test] +fn test_guard_queue_capacity() { + assert!(TransitionGuard::check_queue_capacity(99, 100)); + assert!(!TransitionGuard::check_queue_capacity(100, 100)); +} + +#[test] +fn test_guard_task_timeout() { + let timeout1 = Duration::from_secs(30); + let timeout2 = Duration::from_secs(60); + let max = Duration::from_secs(60); + + assert!(TransitionGuard::check_task_timeout(&timeout1, &max)); + assert!(TransitionGuard::check_task_timeout(&timeout2, &max)); + assert!(!TransitionGuard::check_task_timeout( + &Duration::from_secs(61), + &max + )); +} + +#[test] +fn test_task_builder_chain() { + let task = Task::new(TaskType::Execute, "complex_task") + .with_priority(TaskPriority::High) + .with_timeout(Duration::from_secs(120)) + .with_execution_model(ExecutionModel::Async) + .with_payload(vec![1, 2, 3]); + + assert_eq!(task.priority, TaskPriority::High); + assert_eq!(task.timeout.as_secs(), 120); + assert_eq!(task.execution_model, ExecutionModel::Async); + assert_eq!(task.payload, vec![1, 2, 3]); +} + +#[test] +fn test_task_can_retry() { + let mut task = Task::new(TaskType::Query, "test"); + + assert!(task.can_retry()); + + task.increment_retry(); + task.increment_retry(); + task.increment_retry(); + + assert!(!task.can_retry()); +} + +#[test] +fn test_retry_strategy_progression() { + let strategy = RetryStrategy::exponential(); + + let delays: Vec = (0..5) + .map(|i| strategy.next_delay(i).as_millis() as u64) + .collect(); + + assert_eq!(delays, vec![100, 200, 400, 800, 1600]); +} + +#[test] +fn test_retry_strategy_max_cap() { + let strategy = RetryStrategy::exponential(); + + let delay = strategy.next_delay(20); + assert_eq!(delay.as_millis(), 5000); +} + +#[test] +fn test_circuit_breaker_initial_state() { + let breaker = CircuitBreaker::new(3, Duration::from_secs(10)); + assert_eq!(breaker.state(), CircuitState::Closed); + assert!(breaker.allow_request()); +} + +#[test] +fn test_circuit_breaker_blocks_when_open() { + let breaker = CircuitBreaker::new(1, Duration::from_secs(10)); + + breaker.record_failure(); + + assert_eq!(breaker.state(), CircuitState::Open); + assert!(!breaker.allow_request()); +} + +#[test] +fn test_circuit_breaker_reset() { + let breaker = CircuitBreaker::new(1, Duration::from_secs(10)); + + breaker.record_failure(); + assert_eq!(breaker.state(), CircuitState::Open); + + breaker.reset(); + assert_eq!(breaker.state(), CircuitState::Closed); +} + +#[test] +fn test_state_manager_default_state() { + let manager = StateManager::new(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[test] +fn test_state_manager_queue_size() { + let manager = StateManager::new(); + assert_eq!(manager.queue_size(), 0); + + let task = Task::new(TaskType::Query, "test"); + manager.enqueue_task(task).unwrap(); + + assert_eq!(manager.queue_size(), 1); +} + +#[test] +fn test_state_manager_force_error_state() { + let manager = StateManager::new(); + + manager.force_error_state(MimiState::CriticalError); + assert_eq!(manager.current_state(), MimiState::CriticalError); +} diff --git a/planning/TASKLIST.md b/planning/TASKLIST.md index 2046921..038bea5 100644 --- a/planning/TASKLIST.md +++ b/planning/TASKLIST.md @@ -76,36 +76,36 @@ Structure: **Milestone → Phase → Tasks** ### M1.3: Mimi Core Engine (State Machine & Orchestration) -**M1.3.1** Design Mimi state machine +**M1.3.1** ✅ Design Mimi state machine - Define states: IDLE, LISTENING, PROCESSING, EXECUTING, ERROR, SHUTDOWN - Define state transitions and guard conditions - Document state-specific behavior and side effects -**M1.3.2** Implement Mimi state machine in Rust +**M1.3.2** ✅ Implement Mimi state machine in Rust - Use state pattern or enum-based state machine - Implement state handlers (entry, exit, internal actions) - Add logging at each state transition - Write unit tests for each state and transition -**M1.3.3** Implement task queue & executor +**M1.3.3** ✅ Implement task queue & executor - Create async task queue (tokio::mpsc::channel) - Implement task scheduling based on priority - Add timeout enforcement per task - Write tests for queue ordering and timeout handling -**M1.3.4** Implement error handling & recovery +**M1.3.4** ✅ Implement error handling & recovery - Define error types (network, timeout, validation, execution, module) - Implement error propagation through state machine - Add automatic recovery strategies (retry with backoff, circuit breaker) - Write tests for error scenarios -**M1.3.5** Implement metrics & observability +**M1.3.5** ✅ Implement metrics & observability - Add structured logging with tracing crate - Add metrics collection (task count, latency, error rates) - Integrate with Prometheus exporter (if monitoring required) - Write tests for log output and metrics -**M1.3.6** Implement graceful shutdown +**M1.3.6** ✅ Implement graceful shutdown - Add shutdown signal handling (SIGTERM, SIGINT) - Drain task queue and wait for running tasks - Close all connections cleanly @@ -115,36 +115,36 @@ Structure: **Milestone → Phase → Tasks** ### M1.4: Beatrice CLI Interface -**M1.4.1** Design Beatrice CLI argument parsing +**M1.4.1** Design Beatrice CLI argument parsing (#212) - Define command structure (mimi [command] [args] [options]) - Design help system and error messages - Plan command hierarchy (exec, query, config, debug) -**M1.4.2** Implement Beatrice CLI core +**M1.4.2** Implement Beatrice CLI core (#213) - Use clap or structopt for argument parsing - Implement command dispatch to handlers - Add colored output for readability - Write tests for all command parsing scenarios -**M1.4.3** Implement Beatrice interactive REPL +**M1.4.3** Implement Beatrice interactive REPL (#214) - Create prompt and input reading loop - Implement command history and completion - Add exit handling and session cleanup - Write tests for REPL state machine -**M1.4.4** Implement Beatrice HTTP server +**M1.4.4** Implement Beatrice HTTP server (#215) - Use actix-web or axum for HTTP framework - Define REST API endpoints (/query, /execute, /status) - Implement request validation and error responses - Write tests for each endpoint with mock Mimi backend -**M1.4.5** Implement Beatrice WebSocket server +**M1.4.5** Implement Beatrice WebSocket server (#216) - Use tokio-tungstenite or similar for WebSocket - Implement persistent client connections - Add subscription model for real-time updates - Write tests for WebSocket communication -**M1.4.6** Connect Beatrice to Mimi core +**M1.4.6** Connect Beatrice to Mimi core (#217) - Implement client-side message marshaling - Handle Mimi responses and surface to user - Implement streaming responses for long-running operations @@ -154,33 +154,33 @@ Structure: **Milestone → Phase → Tasks** ### M1.5: Gemini AI Adapter -**M1.5.1** Design pluggable AI adapter interface +**M1.5.1** Design pluggable AI adapter interface (#218) - Define Adapter trait/protocol (initialize, invoke, cleanup) - Define request/response format for LLM calls - Plan configuration system for adapter parameters - Document extensibility points for future adapters -**M1.5.2** Implement Gemini adapter +**M1.5.2** Implement Gemini adapter (#219) - Use Google Cloud Generative AI library (Rust or HTTP client) - Implement connection pooling to Gemini API - Implement prompt templates and response parsing - Add API key management and error handling - Write tests with mock Gemini responses -**M1.5.3** Implement Ollama adapter (local LLM) +**M1.5.3** Implement Ollama adapter (local LLM) (#220) - Use Ollama HTTP API client - Implement model loading and caching - Implement streaming response handling - Add fallback to Gemini if Ollama unavailable - Write tests with local Ollama instance -**M1.5.4** Implement adapter registry & discovery +**M1.5.4** Implement adapter registry & discovery (#221) - Create adapter factory pattern - Implement configuration-driven adapter selection - Add adapter health checks and fallback logic - Write tests for adapter switching -**M1.5.5** Implement adapter performance monitoring +**M1.5.5** Implement adapter performance monitoring (#222) - Add latency tracking per adapter - Track API call success/error rates - Implement adaptive timeout adjustment @@ -190,25 +190,25 @@ Structure: **Milestone → Phase → Tasks** ### M1.6: Integration & Testing -**M1.6.1** Write end-to-end integration test suite (M1 components) +**M1.6.1** Write end-to-end integration test suite (M1 components) (#223) - Test: CLI command → Message Bus → Mimi core → AI adapter → response - Test: HTTP request → Message Bus → Mimi core → response - Test: WebSocket connection → Message Bus → streaming responses - Test: Error scenarios (network failure, timeout, invalid input) -**M1.6.2** Write performance benchmarks (M1 components) +**M1.6.2** Write performance benchmarks (M1 components) (#224) - Benchmark message bus latency (publish/subscribe/request-reply) - Benchmark FlatBuffers serialization/deserialization - Benchmark Mimi state machine throughput - Benchmark Beatrice CLI startup time -**M1.6.3** Write documentation for M1 +**M1.6.3** Write documentation for M1 (#225) - API documentation (FlatBuffers schema, Mimi core API, Beatrice endpoints) - Architecture diagrams and sequence diagrams - Installation and quickstart guide - Troubleshooting guide for common issues -**M1.6.4** Prepare M1 for deployment +**M1.6.4** Prepare M1 for deployment (#226) - Build Docker image for Mimi core - Create docker-compose for M1 (Zenoh, Mimi, Beatrice server) - Write deployment checklist and run procedures