diff --git a/crates/mimi-core/src/lib.rs b/crates/mimi-core/src/lib.rs index 449b3f7..68e00b9 100644 --- a/crates/mimi-core/src/lib.rs +++ b/crates/mimi-core/src/lib.rs @@ -8,10 +8,12 @@ pub mod error; pub mod message; pub mod routing; pub mod serialization; +pub mod state_machine; pub use error::{Error, Result}; pub use routing::{MessageRouter, RoutingError, Topic, TopicPattern}; pub use serialization::{MessageSerializer, SerializationError}; +pub use state_machine::{MimiState, StateManager}; /// Core version pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/mimi-core/src/state_machine.rs b/crates/mimi-core/src/state_machine.rs new file mode 100644 index 0000000..b727733 --- /dev/null +++ b/crates/mimi-core/src/state_machine.rs @@ -0,0 +1,56 @@ +//! Mimi State Machine FSM +//! +//! Implements the 10-state finite state machine for Mimi orchestrator core lifecycle. +//! Provides async execution, guard conditions, error recovery, and message bus integration. + +use std::sync::{Arc, Mutex}; + +/// Mimi system states +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MimiState { + /// System idle, waiting for input + Idle, + /// Listening for user commands via Zenoh + Listening, + /// Processing intent classification + Processing, + /// Executing task via workers + Executing, + /// Generating response via Liliana + Responding, + /// Degraded mode (partial functionality) + Degraded, + /// Recovering from failure + Recovering, + /// Component failure detected + FailedComponent, + /// Critical error requiring intervention + CriticalError, + /// System shutdown in progress + Shutdown, +} + +/// State manager with thread-safe access +pub struct StateManager { + state: Arc>, +} + +impl StateManager { + /// Create new state manager starting in Idle state + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(MimiState::Idle)), + } + } + + /// Get current state + pub fn current_state(&self) -> MimiState { + *self.state.lock().unwrap() + } +} + +impl Default for StateManager { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/mimi-core/tests/state_machine_tests.rs b/crates/mimi-core/tests/state_machine_tests.rs new file mode 100644 index 0000000..bf03148 --- /dev/null +++ b/crates/mimi-core/tests/state_machine_tests.rs @@ -0,0 +1,10 @@ +//! State Machine Unit Tests + +use mimi_core::state_machine::{MimiState, StateManager}; + +#[test] +fn test_initial_state_is_idle() { + // This will fail because StateManager doesn't exist yet + let manager = StateManager::new(); + assert_eq!(manager.current_state(), MimiState::Idle); +} diff --git a/docs/plans/2026-04-17-M1.3.2-state-machine-implementation.md b/docs/plans/2026-04-17-M1.3.2-state-machine-implementation.md new file mode 100644 index 0000000..bec4dd9 --- /dev/null +++ b/docs/plans/2026-04-17-M1.3.2-state-machine-implementation.md @@ -0,0 +1,3098 @@ +# M1.3.2 Mimi State Machine FSM Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Implement the complete Mimi State Machine FSM in Rust with async execution, Zenoh integration, and comprehensive testing. + +**Architecture:** 10-state finite state machine with Arc> thread-safe pattern, Zenoh message bus integration, hybrid blocking/async execution, exponential backoff retry, circuit breaker pattern, and selective Neo4j persistence via Pandora. + +**Tech Stack:** Rust, tokio (async runtime), Zenoh (message bus), FlatBuffers (serialization), Neo4j (via Pandora), anyhow (error handling), uuid (task IDs), chrono (timestamps) + +--- + +## Task 1: Create state_machine.rs Module Stub + +**Files:** +- Create: `crates/mimi-core/src/state_machine.rs` +- Modify: `crates/mimi-core/src/lib.rs:10` + +**Step 1: Write the failing test** + +Create `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +//! State Machine Unit Tests + +use mimi_core::state_machine::{MimiState, StateManager}; + +#[test] +fn test_initial_state_is_idle() { + // This will fail because StateManager doesn't exist yet + let manager = StateManager::new(); + assert_eq!(manager.current_state(), MimiState::Idle); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test --test state_machine_tests --all-features` +Expected: FAIL with "no module named `state_machine`" + +**Step 3: Write minimal implementation** + +Create `crates/mimi-core/src/state_machine.rs`: + +```rust +//! Mimi State Machine FSM +//! +//! Implements the 10-state finite state machine for Mimi orchestrator core lifecycle. +//! Provides async execution, guard conditions, error recovery, and message bus integration. + +use std::sync::{Arc, Mutex}; + +/// Mimi system states +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MimiState { + /// System idle, waiting for input + Idle, + /// Listening for user commands via Zenoh + Listening, + /// Processing intent classification + Processing, + /// Executing task via workers + Executing, + /// Generating response via Liliana + Responding, + /// Degraded mode (partial functionality) + Degraded, + /// Recovering from failure + Recovering, + /// Component failure detected + FailedComponent, + /// Critical error requiring intervention + CriticalError, + /// System shutdown in progress + Shutdown, +} + +/// State manager with thread-safe access +pub struct StateManager { + state: Arc>, +} + +impl StateManager { + /// Create new state manager starting in Idle state + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(MimiState::Idle)), + } + } + + /// Get current state + pub fn current_state(&self) -> MimiState { + *self.state.lock().unwrap() + } +} + +impl Default for StateManager { + fn default() -> Self { + Self::new() + } +} +``` + +**Step 4: Run test to verify it passes** + +Modify `crates/mimi-core/src/lib.rs`: + +```rust +pub mod state_machine; +``` + +Add to exports: + +```rust +pub use state_machine::{MimiState, StateManager}; +``` + +Run: `cargo test --test state_machine_tests` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/src/lib.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add MimiState enum and StateManager stub" +``` + +--- + +## Task 2: Define Complete Task Struct + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:56` (after StateManager) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::{Task, TaskPriority, TaskType, ExecutionModel}; +use std::time::Duration; + +#[test] +fn test_task_creation_with_defaults() { + let task = Task::new(TaskType::Query, "test_task"); + + assert_eq!(task.task_type, TaskType::Query); + assert_eq!(task.priority, TaskPriority::Normal); + assert_eq!(task.retries, 0); + assert_eq!(task.max_retries, 3); + assert!(task.timeout.as_secs() == 30); +} + +#[test] +fn test_task_with_high_priority() { + let task = Task::new(TaskType::Execute, "critical_task") + .with_priority(TaskPriority::Critical) + .with_timeout(Duration::from_secs(60)); + + assert_eq!(task.priority, TaskPriority::Critical); + assert_eq!(task.timeout.as_secs(), 60); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_task_creation` +Expected: FAIL with "unresolved import `mimi_core::state_machine::Task`" + +**Step 3: Write minimal implementation** + +Add to `crates/mimi-core/src/state_machine.rs`: + +```rust +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use uuid::Uuid; + +/// Task priority levels +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum TaskPriority { + Low = 0, + Normal = 1, + High = 2, + Critical = 3, +} + +/// Task types matching IntentType from schema.fbs +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum TaskType { + Query, + Execute, + SkillPublish, + StateUpdate, + MemoryUpdate, + ErrorReport, + Control, +} + +/// Execution model for task processing +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum ExecutionModel { + /// Synchronous blocking execution (<500ms expected) + Blocking, + /// Asynchronous with callback (>500ms expected) + Async, +} + +/// Task representation with full lifecycle metadata +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + pub id: Uuid, + pub task_type: TaskType, + pub name: String, + pub priority: TaskPriority, + pub payload: Vec, + pub timeout: Duration, + pub retries: u32, + pub max_retries: u32, + pub created_at: DateTime, + pub execution_model: ExecutionModel, +} + +impl Task { + /// Create new task with defaults + pub fn new(task_type: TaskType, name: &str) -> Self { + Self { + id: Uuid::new_v4(), + task_type, + name: name.to_string(), + priority: TaskPriority::Normal, + payload: Vec::new(), + timeout: Duration::from_secs(30), + retries: 0, + max_retries: 3, + created_at: Utc::now(), + execution_model: ExecutionModel::Async, + } + } + + /// Set task priority (builder pattern) + pub fn with_priority(mut self, priority: TaskPriority) -> Self { + self.priority = priority; + self + } + + /// Set timeout (builder pattern) + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + /// Set payload (builder pattern) + pub fn with_payload(mut self, payload: Vec) -> Self { + self.payload = payload; + self + } + + /// Set execution model (builder pattern) + pub fn with_execution_model(mut self, model: ExecutionModel) -> Self { + self.execution_model = model; + self + } + + /// Check if task can be retried + pub fn can_retry(&self) -> bool { + self.retries < self.max_retries + } + + /// Increment retry counter + pub fn increment_retry(&mut self) { + self.retries += 1; + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_task_creation` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add Task struct with priority, retry, and execution model" +``` + +--- + +## Task 3: Implement Task Queue with Priority Ordering + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:50` (StateManager impl) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +#[test] +fn test_task_queue_fifo_within_priority() { + let manager = StateManager::new(); + + let task1 = Task::new(TaskType::Query, "query1") + .with_priority(TaskPriority::Normal); + let task2 = Task::new(TaskType::Execute, "exec1") + .with_priority(TaskPriority::High); + let task3 = Task::new(TaskType::Query, "query2") + .with_priority(TaskPriority::Normal); + + manager.enqueue_task(task1.clone()).unwrap(); + manager.enqueue_task(task2.clone()).unwrap(); + manager.enqueue_task(task3.clone()).unwrap(); + + // High priority should dequeue first + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "exec1"); + + // Then normal priority in FIFO order + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "query1"); + + let dequeued = manager.dequeue_task().unwrap(); + assert_eq!(dequeued.name, "query2"); +} + +#[test] +fn test_task_queue_capacity_limit() { + let manager = StateManager::with_capacity(2); + + let task1 = Task::new(TaskType::Query, "task1"); + let task2 = Task::new(TaskType::Query, "task2"); + let task3 = Task::new(TaskType::Query, "task3"); + + assert!(manager.enqueue_task(task1).is_ok()); + assert!(manager.enqueue_task(task2).is_ok()); + + // Third task should fail due to capacity + let result = manager.enqueue_task(task3); + assert!(result.is_err()); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_task_queue` +Expected: FAIL with "no method named `enqueue_task`" + +**Step 3: Write minimal implementation** + +Modify `crates/mimi-core/src/state_machine.rs`: + +Add imports: + +```rust +use std::collections::BinaryHeap; +use std::cmp::Ordering; +use anyhow::{anyhow, Result}; +``` + +Add wrapper for priority queue ordering: + +```rust +/// Task wrapper for priority queue ordering +#[derive(Clone)] +struct PrioritizedTask { + task: Task, + sequence: u64, // For FIFO within same priority +} + +impl PartialEq for PrioritizedTask { + fn eq(&self, other: &Self) -> bool { + self.task.priority == other.task.priority && self.sequence == other.sequence + } +} + +impl Eq for PrioritizedTask {} + +impl PartialOrd for PrioritizedTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PrioritizedTask { + fn cmp(&self, other: &Self) -> Ordering { + // Higher priority first + match other.task.priority.cmp(&self.task.priority) { + Ordering::Equal => { + // Same priority: FIFO (lower sequence first) + self.sequence.cmp(&other.sequence) + } + other_ord => other_ord, + } + } +} +``` + +Modify StateManager struct: + +```rust +/// State manager with thread-safe access +pub struct StateManager { + state: Arc>, + task_queue: Arc>>, + queue_capacity: usize, + sequence_counter: Arc>, +} + +impl StateManager { + /// Create new state manager starting in Idle state + pub fn new() -> Self { + Self::with_capacity(1000) // Default capacity + } + + /// Create state manager with custom queue capacity + pub fn with_capacity(capacity: usize) -> Self { + Self { + state: Arc::new(Mutex::new(MimiState::Idle)), + task_queue: Arc::new(Mutex::new(BinaryHeap::new())), + queue_capacity: capacity, + sequence_counter: Arc::new(Mutex::new(0)), + } + } + + /// Get current state + pub fn current_state(&self) -> MimiState { + *self.state.lock().unwrap() + } + + /// Enqueue task with priority ordering + pub fn enqueue_task(&self, task: Task) -> Result<()> { + let mut queue = self.task_queue.lock().unwrap(); + + if queue.len() >= self.queue_capacity { + return Err(anyhow!("Task queue full (capacity: {})", self.queue_capacity)); + } + + let mut counter = self.sequence_counter.lock().unwrap(); + let sequence = *counter; + *counter += 1; + + queue.push(PrioritizedTask { task, sequence }); + + Ok(()) + } + + /// Dequeue highest priority task (FIFO within priority) + pub fn dequeue_task(&self) -> Result { + let mut queue = self.task_queue.lock().unwrap(); + + queue.pop() + .map(|pt| pt.task) + .ok_or_else(|| anyhow!("Task queue is empty")) + } + + /// Get current queue size + pub fn queue_size(&self) -> usize { + self.task_queue.lock().unwrap().len() + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_task_queue` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): implement priority task queue with capacity limits" +``` + +--- + +## Task 4: Define State Transition Guard Conditions + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:200` (after StateManager impl) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::{StateTransition, TransitionGuard, ComponentHealth}; + +#[test] +fn test_valid_state_transition_idle_to_listening() { + let transition = StateTransition::new(MimiState::Idle, MimiState::Listening); + assert!(transition.is_valid()); +} + +#[test] +fn test_invalid_state_transition_idle_to_executing() { + let transition = StateTransition::new(MimiState::Idle, MimiState::Executing); + assert!(!transition.is_valid()); +} + +#[test] +fn test_guard_condition_healthy_component() { + let health = ComponentHealth { + latency_ms: 100, + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + assert!(TransitionGuard::check_component_health(&health)); +} + +#[test] +fn test_guard_condition_unhealthy_high_latency() { + let health = ComponentHealth { + latency_ms: 6000, // >5000ms threshold + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + assert!(!TransitionGuard::check_component_health(&health)); +} + +#[test] +fn test_guard_condition_unhealthy_high_memory() { + let health = ComponentHealth { + latency_ms: 100, + memory_usage_percent: 85, // >80% threshold + last_heartbeat_secs: 5, + }; + + assert!(!TransitionGuard::check_component_health(&health)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_valid_state_transition` +Expected: FAIL with "unresolved import" + +**Step 3: Write minimal implementation** + +Add to `crates/mimi-core/src/state_machine.rs`: + +```rust +/// Component health metrics for guard conditions +#[derive(Debug, Clone, Copy)] +pub struct ComponentHealth { + pub latency_ms: u64, + pub memory_usage_percent: u8, + pub last_heartbeat_secs: u64, +} + +/// State transition representation +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct StateTransition { + pub from: MimiState, + pub to: MimiState, +} + +impl StateTransition { + /// Create new state transition + pub fn new(from: MimiState, to: MimiState) -> Self { + Self { from, to } + } + + /// Check if transition is valid according to FSM rules + pub fn is_valid(&self) -> bool { + use MimiState::*; + + matches!( + (self.from, self.to), + // Normal flow + | (Idle, Listening) + | (Listening, Processing) + | (Processing, Executing) + | (Executing, Responding) + | (Responding, Idle) + + // Error escalation from any state + | (_, Degraded) + | (_, FailedComponent) + | (_, CriticalError) + + // Recovery paths + | (Degraded, Recovering) + | (FailedComponent, Recovering) + | (Recovering, Idle) + | (Recovering, Degraded) + + // Shutdown from any state + | (_, Shutdown) + + // Self-transition (no-op) + | (s1, s2) if s1 == s2 + ) + } +} + +/// Guard condition evaluator for state transitions +pub struct TransitionGuard; + +impl TransitionGuard { + /// Latency threshold: 5 seconds + const LATENCY_THRESHOLD_MS: u64 = 5000; + + /// Memory usage threshold: 80% + const MEMORY_THRESHOLD_PERCENT: u8 = 80; + + /// Heartbeat timeout: 30 seconds + const HEARTBEAT_TIMEOUT_SECS: u64 = 30; + + /// Check if component health is within acceptable thresholds + pub fn check_component_health(health: &ComponentHealth) -> bool { + health.latency_ms <= Self::LATENCY_THRESHOLD_MS + && health.memory_usage_percent <= Self::MEMORY_THRESHOLD_PERCENT + && health.last_heartbeat_secs <= Self::HEARTBEAT_TIMEOUT_SECS + } + + /// Check if task queue has capacity + pub fn check_queue_capacity(current: usize, max: usize) -> bool { + current < max + } + + /// Check if task timeout is within bounds + pub fn check_task_timeout(timeout: &Duration, max: &Duration) -> bool { + timeout <= max + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_valid_state_transition` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add state transition validation and guard conditions" +``` + +--- + +## Task 5: Implement State Transition Logic with Guards + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:100` (StateManager impl) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +#[test] +fn test_transition_state_success() { + let manager = StateManager::new(); + + // Idle -> Listening is valid + let result = manager.transition_to(MimiState::Listening); + assert!(result.is_ok()); + assert_eq!(manager.current_state(), MimiState::Listening); +} + +#[test] +fn test_transition_state_invalid() { + let manager = StateManager::new(); + + // Idle -> Executing is invalid + let result = manager.transition_to(MimiState::Executing); + assert!(result.is_err()); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[test] +fn test_transition_with_health_check() { + let manager = StateManager::new(); + + let unhealthy = ComponentHealth { + latency_ms: 6000, + memory_usage_percent: 50, + last_heartbeat_secs: 5, + }; + + // Should escalate to Degraded due to high latency + let result = manager.check_and_transition(MimiState::Listening, &unhealthy); + assert!(result.is_ok()); + assert_eq!(manager.current_state(), MimiState::Degraded); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_transition_state` +Expected: FAIL with "no method named `transition_to`" + +**Step 3: Write minimal implementation** + +Add to StateManager impl in `crates/mimi-core/src/state_machine.rs`: + +```rust +impl StateManager { + // ... existing methods ... + + /// Transition to new state with validation + pub fn transition_to(&self, new_state: MimiState) -> Result<()> { + let mut state = self.state.lock().unwrap(); + let current = *state; + + let transition = StateTransition::new(current, new_state); + + if !transition.is_valid() { + return Err(anyhow!( + "Invalid state transition: {:?} -> {:?}", + current, + new_state + )); + } + + log::info!("State transition: {:?} -> {:?}", current, new_state); + *state = new_state; + + Ok(()) + } + + /// Check component health and transition if needed + pub fn check_and_transition( + &self, + target_state: MimiState, + health: &ComponentHealth, + ) -> Result<()> { + if !TransitionGuard::check_component_health(health) { + log::warn!("Component health check failed, transitioning to Degraded"); + return self.transition_to(MimiState::Degraded); + } + + self.transition_to(target_state) + } + + /// Force transition to error state (bypasses validation) + pub fn force_error_state(&self, error_state: MimiState) { + let mut state = self.state.lock().unwrap(); + + log::error!("Forcing error state: {:?}", error_state); + *state = error_state; + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_transition_state` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): implement state transition logic with guard checks" +``` + +--- + +## Task 6: Implement Exponential Backoff Retry Strategy + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:400` (new module section) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::RetryStrategy; + +#[test] +fn test_exponential_backoff_sequence() { + let strategy = RetryStrategy::exponential(); + + // First retry: 100ms + let delay1 = strategy.next_delay(0); + assert_eq!(delay1.as_millis(), 100); + + // Second retry: 200ms + let delay2 = strategy.next_delay(1); + assert_eq!(delay2.as_millis(), 200); + + // Third retry: 400ms + let delay3 = strategy.next_delay(2); + assert_eq!(delay3.as_millis(), 400); + + // Fourth retry: capped at 5000ms + let delay4 = strategy.next_delay(10); + assert_eq!(delay4.as_millis(), 5000); +} + +#[test] +fn test_retry_with_jitter() { + let strategy = RetryStrategy::exponential_with_jitter(); + + let delay = strategy.next_delay(2); + + // Should be 400ms +/- 20% jitter (320ms - 480ms) + assert!(delay.as_millis() >= 320); + assert!(delay.as_millis() <= 480); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_exponential_backoff` +Expected: FAIL with "unresolved import" + +**Step 3: Write minimal implementation** + +Add to `crates/mimi-core/src/state_machine.rs`: + +```rust +use rand::Rng; + +/// Retry strategy with exponential backoff +#[derive(Debug, Clone)] +pub struct RetryStrategy { + base_delay_ms: u64, + max_delay_ms: u64, + jitter_enabled: bool, + jitter_factor: f64, // 0.0 - 1.0 +} + +impl RetryStrategy { + /// Create exponential backoff strategy (100ms -> 5s) + pub fn exponential() -> Self { + Self { + base_delay_ms: 100, + max_delay_ms: 5000, + jitter_enabled: false, + jitter_factor: 0.0, + } + } + + /// Create exponential backoff with 20% jitter + pub fn exponential_with_jitter() -> Self { + Self { + base_delay_ms: 100, + max_delay_ms: 5000, + jitter_enabled: true, + jitter_factor: 0.2, // +/- 20% + } + } + + /// Calculate delay for retry attempt + pub fn next_delay(&self, retry_count: u32) -> Duration { + let base_delay = self.base_delay_ms * 2_u64.pow(retry_count); + let capped_delay = base_delay.min(self.max_delay_ms); + + if self.jitter_enabled { + let jitter_range = (capped_delay as f64 * self.jitter_factor) as u64; + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(0..=jitter_range * 2); + let with_jitter = (capped_delay as i64 - jitter_range as i64 + jitter as i64) + .max(0) as u64; + + Duration::from_millis(with_jitter) + } else { + Duration::from_millis(capped_delay) + } + } +} + +impl Default for RetryStrategy { + fn default() -> Self { + Self::exponential_with_jitter() + } +} +``` + +**Step 4: Add rand dependency to Cargo.toml** + +Modify `crates/mimi-core/Cargo.toml`: + +```toml +rand = "0.8" +``` + +Run: `cargo test test_exponential_backoff` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/Cargo.toml crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add exponential backoff retry strategy with jitter" +``` + +--- + +## Task 7: Implement Circuit Breaker Pattern + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:500` (after RetryStrategy) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::{CircuitBreaker, CircuitState}; + +#[test] +fn test_circuit_breaker_opens_after_failures() { + let breaker = CircuitBreaker::new(3, Duration::from_secs(10)); + + assert_eq!(breaker.state(), CircuitState::Closed); + + // Record 3 failures + breaker.record_failure(); + breaker.record_failure(); + breaker.record_failure(); + + // Should open after 3 failures + assert_eq!(breaker.state(), CircuitState::Open); +} + +#[test] +fn test_circuit_breaker_half_open_after_timeout() { + let breaker = CircuitBreaker::new(3, Duration::from_millis(100)); + + // Open the circuit + breaker.record_failure(); + breaker.record_failure(); + breaker.record_failure(); + + assert_eq!(breaker.state(), CircuitState::Open); + + // Wait for timeout + std::thread::sleep(Duration::from_millis(150)); + + // Should transition to HalfOpen + assert_eq!(breaker.state(), CircuitState::HalfOpen); +} + +#[test] +fn test_circuit_breaker_closes_on_success() { + let breaker = CircuitBreaker::new(3, Duration::from_millis(100)); + + // Open circuit + for _ in 0..3 { + breaker.record_failure(); + } + + // Wait for half-open + std::thread::sleep(Duration::from_millis(150)); + assert_eq!(breaker.state(), CircuitState::HalfOpen); + + // Success should close circuit + breaker.record_success(); + assert_eq!(breaker.state(), CircuitState::Closed); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_circuit_breaker` +Expected: FAIL with "unresolved import" + +**Step 3: Write minimal implementation** + +Add to `crates/mimi-core/src/state_machine.rs`: + +```rust +use std::time::Instant; + +/// Circuit breaker states +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CircuitState { + /// Circuit closed, requests flow normally + Closed, + /// Circuit open, requests rejected immediately + Open, + /// Circuit half-open, testing if service recovered + HalfOpen, +} + +/// Circuit breaker for preventing cascade failures +pub struct CircuitBreaker { + state: Arc>, + failure_count: Arc>, + failure_threshold: u32, + timeout: Duration, + last_failure_time: Arc>>, +} + +impl CircuitBreaker { + /// Create new circuit breaker + pub fn new(failure_threshold: u32, timeout: Duration) -> Self { + Self { + state: Arc::new(Mutex::new(CircuitState::Closed)), + failure_count: Arc::new(Mutex::new(0)), + failure_threshold, + timeout, + last_failure_time: Arc::new(Mutex::new(None)), + } + } + + /// Get current circuit state + pub fn state(&self) -> CircuitState { + let state = *self.state.lock().unwrap(); + + // Check if we should transition from Open to HalfOpen + if state == CircuitState::Open { + let last_failure = self.last_failure_time.lock().unwrap(); + + if let Some(time) = *last_failure { + if time.elapsed() >= self.timeout { + let mut state_guard = self.state.lock().unwrap(); + *state_guard = CircuitState::HalfOpen; + return CircuitState::HalfOpen; + } + } + } + + state + } + + /// Record successful execution + pub fn record_success(&self) { + let current_state = self.state(); + + if current_state == CircuitState::HalfOpen { + // Success in half-open: close circuit + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Closed; + + let mut count = self.failure_count.lock().unwrap(); + *count = 0; + + log::info!("Circuit breaker closed after successful test"); + } + } + + /// Record failed execution + pub fn record_failure(&self) { + let mut count = self.failure_count.lock().unwrap(); + *count += 1; + + let mut last_failure = self.last_failure_time.lock().unwrap(); + *last_failure = Some(Instant::now()); + + if *count >= self.failure_threshold { + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Open; + + log::warn!("Circuit breaker opened after {} failures", self.failure_threshold); + } + } + + /// Check if request should be allowed + pub fn allow_request(&self) -> bool { + let state = self.state(); + + match state { + CircuitState::Closed => true, + CircuitState::Open => false, + CircuitState::HalfOpen => true, // Allow test request + } + } + + /// Reset circuit breaker to closed state + pub fn reset(&self) { + let mut state = self.state.lock().unwrap(); + *state = CircuitState::Closed; + + let mut count = self.failure_count.lock().unwrap(); + *count = 0; + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_circuit_breaker` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): implement circuit breaker pattern with Open/HalfOpen/Closed states" +``` + +--- + +## Task 8: Add Async Task Execution with Tokio + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:250` (StateManager impl) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use tokio; + +#[tokio::test] +async fn test_execute_task_blocking_mode() { + let manager = StateManager::new(); + + let task = Task::new(TaskType::Query, "fast_query") + .with_execution_model(ExecutionModel::Blocking); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_next_task().await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_execute_task_async_mode() { + let manager = StateManager::new(); + + let task = Task::new(TaskType::Execute, "slow_exec") + .with_execution_model(ExecutionModel::Async) + .with_timeout(Duration::from_secs(5)); + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_next_task().await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_task_timeout_handling() { + let manager = StateManager::new(); + + let task = Task::new(TaskType::Execute, "timeout_task") + .with_timeout(Duration::from_millis(10)); + + manager.enqueue_task(task).unwrap(); + + // Simulate long-running task + let result = manager.execute_next_task().await; + assert!(result.is_err()); // Should timeout +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_execute_task` +Expected: FAIL with "no method named `execute_next_task`" + +**Step 3: Write minimal implementation** + +Add to StateManager impl in `crates/mimi-core/src/state_machine.rs`: + +```rust +use tokio::time::{timeout, sleep}; + +impl StateManager { + // ... existing methods ... + + /// Execute next task from queue + pub async fn execute_next_task(&self) -> Result<()> { + let task = self.dequeue_task()?; + + log::info!("Executing task: {} ({})", task.name, task.id); + + match task.execution_model { + ExecutionModel::Blocking => { + self.execute_blocking_task(task).await + } + ExecutionModel::Async => { + self.execute_async_task(task).await + } + } + } + + /// Execute task in blocking mode (for fast operations <500ms) + async fn execute_blocking_task(&self, task: Task) -> Result<()> { + // Transition to Executing state + self.transition_to(MimiState::Executing)?; + + let result = timeout(task.timeout, async { + // Simulate task execution + tokio::task::spawn_blocking(move || { + log::debug!("Blocking task {} executing", task.name); + // Actual task logic would go here + Ok::<(), anyhow::Error>(()) + }).await? + }).await; + + match result { + Ok(Ok(())) => { + log::info!("Task {} completed successfully", task.name); + self.transition_to(MimiState::Responding)?; + Ok(()) + } + Ok(Err(e)) => { + log::error!("Task {} failed: {}", task.name, e); + Err(e) + } + Err(_) => { + log::error!("Task {} timed out", task.name); + Err(anyhow!("Task execution timeout")) + } + } + } + + /// Execute task in async mode (for long operations >500ms) + async fn execute_async_task(&self, task: Task) -> Result<()> { + // Transition to Executing state + self.transition_to(MimiState::Executing)?; + + let task_name = task.name.clone(); + let task_timeout = task.timeout; + + let result = timeout(task_timeout, async move { + log::debug!("Async task {} executing", task.name); + + // Simulate async work + sleep(Duration::from_millis(10)).await; + + Ok::<(), anyhow::Error>(()) + }).await; + + match result { + Ok(Ok(())) => { + log::info!("Task {} completed successfully", task_name); + self.transition_to(MimiState::Responding)?; + Ok(()) + } + Ok(Err(e)) => { + log::error!("Task {} failed: {}", task_name, e); + Err(e) + } + Err(_) => { + log::error!("Task {} timed out", task_name); + Err(anyhow!("Task execution timeout")) + } + } + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_execute_task` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add async task execution with blocking/async modes and timeout handling" +``` + +--- + +## Task 9: Integrate Retry Strategy with Task Execution + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:300` (StateManager impl) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +#[tokio::test] +async fn test_task_retry_on_failure() { + let manager = StateManager::new(); + + let task = Task::new(TaskType::Execute, "flaky_task") + .with_execution_model(ExecutionModel::Blocking); + + manager.enqueue_task(task.clone()).unwrap(); + + // First attempt will fail (simulated) + // Should retry with exponential backoff + let result = manager.execute_with_retry().await; + + // Check that retries were attempted + assert!(task.retries > 0); +} + +#[tokio::test] +async fn test_max_retries_exhausted() { + let manager = StateManager::new(); + + let mut task = Task::new(TaskType::Execute, "failing_task"); + task.max_retries = 2; + + manager.enqueue_task(task).unwrap(); + + let result = manager.execute_with_retry().await; + + // Should fail after exhausting retries + assert!(result.is_err()); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_task_retry` +Expected: FAIL with "no method named `execute_with_retry`" + +**Step 3: Write minimal implementation** + +Add to StateManager impl in `crates/mimi-core/src/state_machine.rs`: + +```rust +impl StateManager { + // ... existing methods ... + + /// Execute task with retry logic + pub async fn execute_with_retry(&self) -> Result<()> { + let mut task = self.dequeue_task()?; + let retry_strategy = RetryStrategy::exponential_with_jitter(); + + loop { + let result = match task.execution_model { + ExecutionModel::Blocking => self.execute_blocking_task(task.clone()).await, + ExecutionModel::Async => self.execute_async_task(task.clone()).await, + }; + + match result { + Ok(()) => { + log::info!("Task {} succeeded after {} retries", task.name, task.retries); + return Ok(()); + } + Err(e) => { + if !task.can_retry() { + log::error!( + "Task {} failed after {} retries: {}", + task.name, + task.max_retries, + e + ); + return Err(anyhow!( + "Task failed after {} retries: {}", + task.max_retries, + e + )); + } + + task.increment_retry(); + let delay = retry_strategy.next_delay(task.retries - 1); + + log::warn!( + "Task {} failed (attempt {}), retrying in {:?}", + task.name, + task.retries, + delay + ); + + sleep(delay).await; + } + } + } + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_task_retry` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): integrate exponential backoff retry with task execution" +``` + +--- + +## Task 10: Create Zenoh Message Bus Integration Module + +**Files:** +- Create: `crates/mimi-core/src/state_machine/zenoh_integration.rs` +- Modify: `crates/mimi-core/src/state_machine.rs:1` (module declaration) + +**Step 1: Write the failing test** + +Create `crates/mimi-core/tests/zenoh_integration_tests.rs`: + +```rust +//! Zenoh Integration Tests + +use mimi_core::state_machine::{StateManager, Task, TaskType}; + +#[tokio::test] +async fn test_zenoh_subscriber_receives_task() { + let manager = StateManager::new(); + + // Start Zenoh subscriber in background + let subscriber_handle = manager.start_zenoh_subscriber("mimi/tasks").await.unwrap(); + + // Give subscriber time to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Publish task via Zenoh + let task = Task::new(TaskType::Query, "test_task"); + manager.publish_task_via_zenoh(&task).await.unwrap(); + + // Wait for task to be enqueued + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Verify task was received and queued + assert_eq!(manager.queue_size(), 1); + + subscriber_handle.abort(); +} + +#[tokio::test] +async fn test_zenoh_publish_state_change() { + let manager = StateManager::new(); + + // Start state change publisher + manager.start_state_change_publisher("mimi/state").await.unwrap(); + + // Trigger state change + manager.transition_to(mimi_core::state_machine::MimiState::Listening).unwrap(); + + // Verify event was published (would need Zenoh subscriber in real test) + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test --test zenoh_integration_tests` +Expected: FAIL with "no method named `start_zenoh_subscriber`" + +**Step 3: Add zenoh dependency** + +Modify `crates/mimi-core/Cargo.toml`: + +```toml +zenoh = { version = "0.11", features = ["default"] } +``` + +**Step 4: Write minimal implementation** + +Create `crates/mimi-core/src/state_machine/zenoh_integration.rs`: + +```rust +//! Zenoh message bus integration for state machine + +use super::{Task, TaskType, MimiState}; +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; +use zenoh::prelude::*; + +/// Zenoh configuration for state machine +pub struct ZenohConfig { + pub session: zenoh::Session, +} + +impl ZenohConfig { + /// Create new Zenoh config with session + pub async fn new() -> Result { + let session = zenoh::open(zenoh::Config::default()) + .await + .map_err(|e| anyhow!("Failed to open Zenoh session: {}", e))?; + + Ok(Self { session }) + } +} + +/// State change event for Zenoh publication +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateChangeEvent { + pub from_state: String, + pub to_state: String, + pub timestamp: i64, +} + +/// Task message for Zenoh +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskMessage { + pub task_id: String, + pub task_type: String, + pub name: String, + pub priority: u8, + pub payload: Vec, +} + +impl From for TaskMessage { + fn from(task: Task) -> Self { + Self { + task_id: task.id.to_string(), + task_type: format!("{:?}", task.task_type), + name: task.name, + priority: task.priority as u8, + payload: task.payload, + } + } +} +``` + +**Step 5: Add Zenoh methods to StateManager** + +Modify `crates/mimi-core/src/state_machine.rs`: + +Add module declaration at top: + +```rust +pub mod zenoh_integration; +pub use zenoh_integration::{ZenohConfig, StateChangeEvent, TaskMessage}; +``` + +Add to StateManager struct: + +```rust +pub struct StateManager { + state: Arc>, + task_queue: Arc>>, + queue_capacity: usize, + sequence_counter: Arc>, + zenoh_config: Option>, +} +``` + +Add Zenoh methods to StateManager impl: + +```rust +impl StateManager { + // ... existing methods ... + + /// Initialize Zenoh integration + pub async fn init_zenoh(&mut self) -> Result<()> { + let config = ZenohConfig::new().await?; + self.zenoh_config = Some(Arc::new(config)); + Ok(()) + } + + /// Start Zenoh subscriber for tasks + pub async fn start_zenoh_subscriber(&self, topic: &str) -> Result> { + let config = self.zenoh_config.as_ref() + .ok_or_else(|| anyhow!("Zenoh not initialized"))?; + + let subscriber = config.session + .declare_subscriber(topic) + .await + .map_err(|e| anyhow!("Failed to create subscriber: {}", e))?; + + let manager_clone = Self { + state: self.state.clone(), + task_queue: self.task_queue.clone(), + queue_capacity: self.queue_capacity, + sequence_counter: self.sequence_counter.clone(), + zenoh_config: self.zenoh_config.clone(), + }; + + let handle = tokio::spawn(async move { + while let Ok(sample) = subscriber.recv_async().await { + if let Ok(task_msg) = serde_json::from_slice::(&sample.payload().to_bytes()) { + log::debug!("Received task via Zenoh: {}", task_msg.name); + + // Convert TaskMessage back to Task + // (simplified for this implementation) + let task = Task::new(TaskType::Query, &task_msg.name); + + if let Err(e) = manager_clone.enqueue_task(task) { + log::error!("Failed to enqueue task from Zenoh: {}", e); + } + } + } + }); + + Ok(handle) + } + + /// Publish task via Zenoh + pub async fn publish_task_via_zenoh(&self, task: &Task) -> Result<()> { + let config = self.zenoh_config.as_ref() + .ok_or_else(|| anyhow!("Zenoh not initialized"))?; + + let task_msg = TaskMessage::from(task.clone()); + let payload = serde_json::to_vec(&task_msg)?; + + config.session + .put("mimi/tasks", payload) + .await + .map_err(|e| anyhow!("Failed to publish task: {}", e))?; + + Ok(()) + } + + /// Start state change publisher + pub async fn start_state_change_publisher(&self, topic: &str) -> Result<()> { + log::info!("State change publisher initialized on topic: {}", topic); + // Publisher setup would go here + Ok(()) + } +} +``` + +**Step 6: Run test to verify it passes** + +Run: `cargo test --test zenoh_integration_tests` +Expected: PASS (may skip if Zenoh daemon not running) + +**Step 7: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/src/state_machine/zenoh_integration.rs crates/mimi-core/Cargo.toml crates/mimi-core/tests/zenoh_integration_tests.rs +git commit -m "feat(state-machine): add Zenoh message bus integration for task pub/sub" +``` + +--- + +## Task 11: Implement State Persistence via Pandora (Neo4j) + +**Files:** +- Create: `crates/mimi-core/src/state_machine/pandora_integration.rs` +- Modify: `crates/mimi-core/src/state_machine.rs:5` (module declaration) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::{PandoraClient, StateTransition}; + +#[tokio::test] +async fn test_persist_state_transition() { + let client = PandoraClient::new("bolt://localhost:7687").await.unwrap(); + + let transition = StateTransition::new( + mimi_core::state_machine::MimiState::Idle, + mimi_core::state_machine::MimiState::Listening + ); + + let result = client.persist_transition(&transition).await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_query_state_history() { + let client = PandoraClient::new("bolt://localhost:7687").await.unwrap(); + + let history = client.query_state_history(10).await.unwrap(); + assert!(history.len() <= 10); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_persist_state_transition` +Expected: FAIL with "unresolved import" + +**Step 3: Add Neo4j dependency** + +Modify `crates/mimi-core/Cargo.toml`: + +```toml +neo4rs = "0.7" +``` + +**Step 4: Write minimal implementation** + +Create `crates/mimi-core/src/state_machine/pandora_integration.rs`: + +```rust +//! Pandora (Neo4j) integration for selective state persistence + +use super::{MimiState, StateTransition, Task}; +use anyhow::{anyhow, Result}; +use neo4rs::{Graph, Query}; + +/// Pandora client for Neo4j operations +pub struct PandoraClient { + graph: Graph, +} + +impl PandoraClient { + /// Create new Pandora client + pub async fn new(uri: &str) -> Result { + let graph = Graph::new(uri, "neo4j", "password") + .await + .map_err(|e| anyhow!("Failed to connect to Neo4j: {}", e))?; + + Ok(Self { graph }) + } + + /// Persist state transition (selective - only high-value transitions) + pub async fn persist_transition(&self, transition: &StateTransition) -> Result<()> { + // Only persist certain transitions (avoid noise) + if !Self::should_persist(transition) { + return Ok(()); + } + + let query = Query::new( + r#" + CREATE (sc:StateChange { + from_state: $from_state, + to_state: $to_state, + timestamp: timestamp() + }) + RETURN sc + "# + ) + .param("from_state", format!("{:?}", transition.from)) + .param("to_state", format!("{:?}", transition.to)); + + self.graph.run(query).await + .map_err(|e| anyhow!("Failed to persist transition: {}", e))?; + + log::debug!("Persisted transition: {:?} -> {:?}", transition.from, transition.to); + + Ok(()) + } + + /// Check if transition should be persisted + fn should_persist(transition: &StateTransition) -> bool { + use MimiState::*; + + // Persist error states, recovery, and key lifecycle events + matches!( + transition.to, + Degraded | FailedComponent | CriticalError | Recovering | Shutdown + ) || matches!( + (transition.from, transition.to), + (Idle, Listening) | (Responding, Idle) + ) + } + + /// Query recent state history + pub async fn query_state_history(&self, limit: usize) -> Result> { + let query = Query::new( + r#" + MATCH (sc:StateChange) + RETURN sc.from_state, sc.to_state, sc.timestamp + ORDER BY sc.timestamp DESC + LIMIT $limit + "# + ) + .param("limit", limit as i64); + + let mut result = self.graph.execute(query).await + .map_err(|e| anyhow!("Failed to query history: {}", e))?; + + let mut transitions = Vec::new(); + + while let Some(row) = result.next().await + .map_err(|e| anyhow!("Failed to fetch row: {}", e))? { + + let from_state: String = row.get("sc.from_state") + .map_err(|e| anyhow!("Failed to get from_state: {}", e))?; + let to_state: String = row.get("sc.to_state") + .map_err(|e| anyhow!("Failed to get to_state: {}", e))?; + + // Parse state strings back to enum (simplified) + // In production, store as enum ordinal or use proper deserialization + log::debug!("Fetched transition: {} -> {}", from_state, to_state); + } + + Ok(transitions) + } + + /// Persist task execution record + pub async fn persist_task(&self, task: &Task) -> Result<()> { + let query = Query::new( + r#" + CREATE (t:Task { + id: $id, + name: $name, + task_type: $task_type, + priority: $priority, + retries: $retries, + created_at: $created_at + }) + RETURN t + "# + ) + .param("id", task.id.to_string()) + .param("name", task.name.clone()) + .param("task_type", format!("{:?}", task.task_type)) + .param("priority", task.priority as i64) + .param("retries", task.retries as i64) + .param("created_at", task.created_at.timestamp()); + + self.graph.run(query).await + .map_err(|e| anyhow!("Failed to persist task: {}", e))?; + + Ok(()) + } +} +``` + +**Step 5: Add Pandora integration to StateManager** + +Modify `crates/mimi-core/src/state_machine.rs`: + +Add module declaration: + +```rust +pub mod pandora_integration; +pub use pandora_integration::PandoraClient; +``` + +Add to StateManager struct: + +```rust +pandora_client: Option>, +``` + +Add Pandora methods to StateManager impl: + +```rust +impl StateManager { + // ... existing methods ... + + /// Initialize Pandora integration + pub async fn init_pandora(&mut self, uri: &str) -> Result<()> { + let client = PandoraClient::new(uri).await?; + self.pandora_client = Some(Arc::new(client)); + Ok(()) + } + + /// Transition with Pandora persistence + pub async fn transition_to_with_persistence(&self, new_state: MimiState) -> Result<()> { + let current = self.current_state(); + let transition = StateTransition::new(current, new_state); + + // Validate and perform transition + self.transition_to(new_state)?; + + // Persist to Pandora if configured + if let Some(client) = &self.pandora_client { + client.persist_transition(&transition).await?; + } + + Ok(()) + } +} +``` + +**Step 6: Run test to verify it passes** + +Run: `cargo test test_persist_state_transition` +Expected: PASS (or skip if Neo4j not available) + +**Step 7: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/src/state_machine/pandora_integration.rs crates/mimi-core/Cargo.toml crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): add Pandora Neo4j integration for selective state persistence" +``` + +--- + +## Task 12: Implement Component Health Monitoring + +**Files:** +- Modify: `crates/mimi-core/src/state_machine.rs:600` (new module section) + +**Step 1: Write the failing test** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +use mimi_core::state_machine::HealthMonitor; + +#[tokio::test] +async fn test_health_monitor_tracks_component() { + let monitor = HealthMonitor::new(); + + monitor.register_component("beatrice").await.unwrap(); + monitor.update_heartbeat("beatrice").await.unwrap(); + + let health = monitor.get_component_health("beatrice").await.unwrap(); + assert!(health.last_heartbeat_secs < 1); +} + +#[tokio::test] +async fn test_health_monitor_detects_unhealthy() { + let monitor = HealthMonitor::new(); + + monitor.register_component("pandora").await.unwrap(); + + // Simulate high latency + monitor.record_latency("pandora", 6000).await.unwrap(); + + let health = monitor.get_component_health("pandora").await.unwrap(); + assert!(!mimi_core::state_machine::TransitionGuard::check_component_health(&health)); +} + +#[tokio::test] +async fn test_health_monitor_auto_escalation() { + let monitor = HealthMonitor::new(); + let manager = StateManager::new(); + + monitor.register_component("echidna").await.unwrap(); + + // Simulate heartbeat timeout + tokio::time::sleep(tokio::time::Duration::from_secs(31)).await; + + let should_escalate = monitor.check_escalation("echidna").await.unwrap(); + assert!(should_escalate); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `cargo test test_health_monitor` +Expected: FAIL with "unresolved import" + +**Step 3: Write minimal implementation** + +Add to `crates/mimi-core/src/state_machine.rs`: + +```rust +use std::collections::HashMap; +use std::time::Instant; + +/// Component health monitor +pub struct HealthMonitor { + components: Arc>>, +} + +/// Internal component health state with tracking +#[derive(Debug, Clone)] +struct ComponentHealthState { + pub latency_ms: u64, + pub memory_usage_percent: u8, + pub last_heartbeat: Instant, +} + +impl ComponentHealthState { + fn new() -> Self { + Self { + latency_ms: 0, + memory_usage_percent: 0, + last_heartbeat: Instant::now(), + } + } + + fn to_health(&self) -> ComponentHealth { + ComponentHealth { + latency_ms: self.latency_ms, + memory_usage_percent: self.memory_usage_percent, + last_heartbeat_secs: self.last_heartbeat.elapsed().as_secs(), + } + } +} + +impl HealthMonitor { + /// Create new health monitor + pub fn new() -> Self { + Self { + components: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Register component for monitoring + pub async fn register_component(&self, name: &str) -> Result<()> { + let mut components = self.components.lock().unwrap(); + components.insert(name.to_string(), ComponentHealthState::new()); + + log::info!("Registered component for health monitoring: {}", name); + Ok(()) + } + + /// Update heartbeat for component + pub async fn update_heartbeat(&self, name: &str) -> Result<()> { + let mut components = self.components.lock().unwrap(); + + let state = components.get_mut(name) + .ok_or_else(|| anyhow!("Component not registered: {}", name))?; + + state.last_heartbeat = Instant::now(); + + Ok(()) + } + + /// Record latency measurement + pub async fn record_latency(&self, name: &str, latency_ms: u64) -> Result<()> { + let mut components = self.components.lock().unwrap(); + + let state = components.get_mut(name) + .ok_or_else(|| anyhow!("Component not registered: {}", name))?; + + state.latency_ms = latency_ms; + + Ok(()) + } + + /// Record memory usage + pub async fn record_memory(&self, name: &str, usage_percent: u8) -> Result<()> { + let mut components = self.components.lock().unwrap(); + + let state = components.get_mut(name) + .ok_or_else(|| anyhow!("Component not registered: {}", name))?; + + state.memory_usage_percent = usage_percent; + + Ok(()) + } + + /// Get component health snapshot + pub async fn get_component_health(&self, name: &str) -> Result { + let components = self.components.lock().unwrap(); + + let state = components.get(name) + .ok_or_else(|| anyhow!("Component not registered: {}", name))?; + + Ok(state.to_health()) + } + + /// Check if component health requires escalation + pub async fn check_escalation(&self, name: &str) -> Result { + let health = self.get_component_health(name).await?; + Ok(!TransitionGuard::check_component_health(&health)) + } + + /// Get all unhealthy components + pub async fn get_unhealthy_components(&self) -> Vec { + let components = self.components.lock().unwrap(); + + components.iter() + .filter(|(_, state)| { + !TransitionGuard::check_component_health(&state.to_health()) + }) + .map(|(name, _)| name.clone()) + .collect() + } +} + +impl Default for HealthMonitor { + fn default() -> Self { + Self::new() + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `cargo test test_health_monitor` +Expected: PASS + +**Step 5: Commit** + +```bash +git add crates/mimi-core/src/state_machine.rs crates/mimi-core/tests/state_machine_tests.rs +git commit -m "feat(state-machine): implement component health monitoring with auto-escalation detection" +``` + +--- + +## Task 13: Add Comprehensive Unit Tests (40+ tests) + +**Files:** +- Modify: `crates/mimi-core/tests/state_machine_tests.rs:200` (expand tests) + +**Step 1: Write comprehensive test suite** + +Add to `crates/mimi-core/tests/state_machine_tests.rs`: + +```rust +// ============================================================================ +// State Tests +// ============================================================================ + +#[test] +fn test_all_state_variants() { + let states = vec![ + MimiState::Idle, + MimiState::Listening, + MimiState::Processing, + MimiState::Executing, + MimiState::Responding, + MimiState::Degraded, + MimiState::Recovering, + MimiState::FailedComponent, + MimiState::CriticalError, + MimiState::Shutdown, + ]; + + assert_eq!(states.len(), 10); +} + +#[test] +fn test_state_equality() { + assert_eq!(MimiState::Idle, MimiState::Idle); + assert_ne!(MimiState::Idle, MimiState::Listening); +} + +// ============================================================================ +// Transition Tests +// ============================================================================ + +#[test] +fn test_all_valid_normal_flow_transitions() { + let transitions = vec![ + (MimiState::Idle, MimiState::Listening), + (MimiState::Listening, MimiState::Processing), + (MimiState::Processing, MimiState::Executing), + (MimiState::Executing, MimiState::Responding), + (MimiState::Responding, MimiState::Idle), + ]; + + for (from, to) in transitions { + let t = StateTransition::new(from, to); + assert!(t.is_valid(), "Expected {:?} -> {:?} to be valid", from, to); + } +} + +#[test] +fn test_error_escalation_from_any_state() { + let states = vec![ + MimiState::Idle, + MimiState::Listening, + MimiState::Processing, + MimiState::Executing, + MimiState::Responding, + ]; + + for state in states { + let t1 = StateTransition::new(state, MimiState::Degraded); + assert!(t1.is_valid()); + + let t2 = StateTransition::new(state, MimiState::FailedComponent); + assert!(t2.is_valid()); + + let t3 = StateTransition::new(state, MimiState::CriticalError); + assert!(t3.is_valid()); + } +} + +#[test] +fn test_recovery_paths() { + let t1 = StateTransition::new(MimiState::Degraded, MimiState::Recovering); + assert!(t1.is_valid()); + + let t2 = StateTransition::new(MimiState::FailedComponent, MimiState::Recovering); + assert!(t2.is_valid()); + + let t3 = StateTransition::new(MimiState::Recovering, MimiState::Idle); + assert!(t3.is_valid()); +} + +#[test] +fn test_invalid_transitions() { + let invalid = vec![ + (MimiState::Idle, MimiState::Processing), + (MimiState::Idle, MimiState::Executing), + (MimiState::Listening, MimiState::Responding), + (MimiState::Processing, MimiState::Idle), + ]; + + for (from, to) in invalid { + let t = StateTransition::new(from, to); + assert!(!t.is_valid(), "Expected {:?} -> {:?} to be invalid", from, to); + } +} + +// ============================================================================ +// Task Priority Tests +// ============================================================================ + +#[test] +fn test_task_priority_ordering() { + assert!(TaskPriority::Critical > TaskPriority::High); + assert!(TaskPriority::High > TaskPriority::Normal); + assert!(TaskPriority::Normal > TaskPriority::Low); +} + +#[test] +fn test_task_priority_values() { + assert_eq!(TaskPriority::Low as u8, 0); + assert_eq!(TaskPriority::Normal as u8, 1); + assert_eq!(TaskPriority::High as u8, 2); + assert_eq!(TaskPriority::Critical as u8, 3); +} + +// ============================================================================ +// Guard Condition Tests +// ============================================================================ + +#[test] +fn test_guard_all_thresholds() { + // All healthy + let h1 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 80, + last_heartbeat_secs: 30, + }; + assert!(TransitionGuard::check_component_health(&h1)); + + // Just over latency threshold + let h2 = ComponentHealth { + latency_ms: 5001, + memory_usage_percent: 80, + last_heartbeat_secs: 30, + }; + assert!(!TransitionGuard::check_component_health(&h2)); + + // Just over memory threshold + let h3 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 81, + last_heartbeat_secs: 30, + }; + assert!(!TransitionGuard::check_component_health(&h3)); + + // Just over heartbeat threshold + let h4 = ComponentHealth { + latency_ms: 5000, + memory_usage_percent: 80, + last_heartbeat_secs: 31, + }; + assert!(!TransitionGuard::check_component_health(&h4)); +} + +#[test] +fn test_guard_queue_capacity() { + assert!(TransitionGuard::check_queue_capacity(99, 100)); + assert!(!TransitionGuard::check_queue_capacity(100, 100)); +} + +#[test] +fn test_guard_task_timeout() { + let timeout1 = Duration::from_secs(30); + let timeout2 = Duration::from_secs(60); + let max = Duration::from_secs(60); + + assert!(TransitionGuard::check_task_timeout(&timeout1, &max)); + assert!(TransitionGuard::check_task_timeout(&timeout2, &max)); + assert!(!TransitionGuard::check_task_timeout(&Duration::from_secs(61), &max)); +} + +// ============================================================================ +// Task Builder Pattern Tests +// ============================================================================ + +#[test] +fn test_task_builder_chain() { + let task = Task::new(TaskType::Execute, "complex_task") + .with_priority(TaskPriority::High) + .with_timeout(Duration::from_secs(120)) + .with_execution_model(ExecutionModel::Async) + .with_payload(vec![1, 2, 3]); + + assert_eq!(task.priority, TaskPriority::High); + assert_eq!(task.timeout.as_secs(), 120); + assert_eq!(task.execution_model, ExecutionModel::Async); + assert_eq!(task.payload, vec![1, 2, 3]); +} + +// ============================================================================ +// Retry Tests +// ============================================================================ + +#[test] +fn test_task_can_retry() { + let mut task = Task::new(TaskType::Query, "test"); + + assert!(task.can_retry()); + + task.increment_retry(); + task.increment_retry(); + task.increment_retry(); + + assert!(!task.can_retry()); +} + +#[test] +fn test_retry_strategy_progression() { + let strategy = RetryStrategy::exponential(); + + let delays: Vec = (0..5) + .map(|i| strategy.next_delay(i).as_millis() as u64) + .collect(); + + // 100, 200, 400, 800, 1600 + assert_eq!(delays, vec![100, 200, 400, 800, 1600]); +} + +#[test] +fn test_retry_strategy_max_cap() { + let strategy = RetryStrategy::exponential(); + + let delay = strategy.next_delay(20); + assert_eq!(delay.as_millis(), 5000); // Capped at 5s +} + +// ============================================================================ +// Circuit Breaker Tests +// ============================================================================ + +#[test] +fn test_circuit_breaker_initial_state() { + let breaker = CircuitBreaker::new(3, Duration::from_secs(10)); + assert_eq!(breaker.state(), CircuitState::Closed); + assert!(breaker.allow_request()); +} + +#[test] +fn test_circuit_breaker_blocks_when_open() { + let breaker = CircuitBreaker::new(1, Duration::from_secs(10)); + + breaker.record_failure(); + + assert_eq!(breaker.state(), CircuitState::Open); + assert!(!breaker.allow_request()); +} + +#[test] +fn test_circuit_breaker_reset() { + let breaker = CircuitBreaker::new(1, Duration::from_secs(10)); + + breaker.record_failure(); + assert_eq!(breaker.state(), CircuitState::Open); + + breaker.reset(); + assert_eq!(breaker.state(), CircuitState::Closed); +} + +// ============================================================================ +// Queue Tests +// ============================================================================ + +#[test] +fn test_queue_empty_dequeue_fails() { + let manager = StateManager::new(); + let result = manager.dequeue_task(); + assert!(result.is_err()); +} + +#[test] +fn test_queue_mixed_priorities() { + let manager = StateManager::new(); + + let low = Task::new(TaskType::Query, "low").with_priority(TaskPriority::Low); + let normal = Task::new(TaskType::Query, "normal").with_priority(TaskPriority::Normal); + let high = Task::new(TaskType::Query, "high").with_priority(TaskPriority::High); + let critical = Task::new(TaskType::Query, "critical").with_priority(TaskPriority::Critical); + + manager.enqueue_task(low).unwrap(); + manager.enqueue_task(normal).unwrap(); + manager.enqueue_task(high).unwrap(); + manager.enqueue_task(critical).unwrap(); + + assert_eq!(manager.dequeue_task().unwrap().name, "critical"); + assert_eq!(manager.dequeue_task().unwrap().name, "high"); + assert_eq!(manager.dequeue_task().unwrap().name, "normal"); + assert_eq!(manager.dequeue_task().unwrap().name, "low"); +} + +// ============================================================================ +// StateManager Tests +// ============================================================================ + +#[test] +fn test_state_manager_default_state() { + let manager = StateManager::new(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[test] +fn test_state_manager_queue_size() { + let manager = StateManager::new(); + assert_eq!(manager.queue_size(), 0); + + let task = Task::new(TaskType::Query, "test"); + manager.enqueue_task(task).unwrap(); + + assert_eq!(manager.queue_size(), 1); +} + +#[test] +fn test_state_manager_force_error_state() { + let manager = StateManager::new(); + + manager.force_error_state(MimiState::CriticalError); + assert_eq!(manager.current_state(), MimiState::CriticalError); +} +``` + +**Step 2: Run all tests** + +Run: `cargo test --test state_machine_tests` +Expected: PASS (40+ tests) + +**Step 3: Verify test coverage** + +Run: `cargo tarpaulin --test state_machine_tests` +Expected: >80% coverage + +**Step 4: Commit** + +```bash +git add crates/mimi-core/tests/state_machine_tests.rs +git commit -m "test(state-machine): add comprehensive unit test suite with 40+ tests" +``` + +--- + +## Task 14: Add Integration Tests (15+ tests) + +**Files:** +- Expand: `crates/mimi-core/tests/zenoh_integration_tests.rs` + +**Step 1: Write integration test suite** + +Expand `crates/mimi-core/tests/zenoh_integration_tests.rs`: + +```rust +//! State Machine Integration Tests + +use mimi_core::state_machine::*; +use std::time::Duration; +use tokio::time::sleep; + +// ============================================================================ +// Full Lifecycle Tests +// ============================================================================ + +#[tokio::test] +async fn test_full_task_lifecycle() { + let manager = StateManager::new(); + + // Idle -> Listening + manager.transition_to(MimiState::Listening).unwrap(); + assert_eq!(manager.current_state(), MimiState::Listening); + + // Queue task + let task = Task::new(TaskType::Execute, "lifecycle_test") + .with_execution_model(ExecutionModel::Blocking); + manager.enqueue_task(task).unwrap(); + + // Listening -> Processing + manager.transition_to(MimiState::Processing).unwrap(); + + // Processing -> Executing -> Responding + manager.execute_next_task().await.unwrap(); + + // Responding -> Idle + manager.transition_to(MimiState::Idle).unwrap(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[tokio::test] +async fn test_error_recovery_flow() { + let manager = StateManager::new(); + + manager.transition_to(MimiState::Listening).unwrap(); + + // Simulate component failure + manager.force_error_state(MimiState::FailedComponent); + assert_eq!(manager.current_state(), MimiState::FailedComponent); + + // Enter recovery + manager.transition_to(MimiState::Recovering).unwrap(); + assert_eq!(manager.current_state(), MimiState::Recovering); + + // Recover to Idle + manager.transition_to(MimiState::Idle).unwrap(); + assert_eq!(manager.current_state(), MimiState::Idle); +} + +#[tokio::test] +async fn test_degraded_mode_operation() { + let manager = StateManager::new(); + + manager.transition_to(MimiState::Listening).unwrap(); + + // Enter degraded mode + manager.transition_to(MimiState::Degraded).unwrap(); + + // Should still be able to queue tasks + let task = Task::new(TaskType::Query, "degraded_task") + .with_priority(TaskPriority::Low); + assert!(manager.enqueue_task(task).is_ok()); + + // Recover + manager.transition_to(MimiState::Recovering).unwrap(); + manager.transition_to(MimiState::Idle).unwrap(); +} + +// ============================================================================ +// Health Monitoring Integration +// ============================================================================ + +#[tokio::test] +async fn test_health_monitor_integration() { + let monitor = HealthMonitor::new(); + let manager = StateManager::new(); + + monitor.register_component("beatrice").await.unwrap(); + monitor.register_component("pandora").await.unwrap(); + monitor.register_component("echidna").await.unwrap(); + + // Update heartbeats + for component in &["beatrice", "pandora", "echidna"] { + monitor.update_heartbeat(component).await.unwrap(); + } + + // All should be healthy + let unhealthy = monitor.get_unhealthy_components().await; + assert!(unhealthy.is_empty()); + + // Simulate pandora latency spike + monitor.record_latency("pandora", 6000).await.unwrap(); + + // Should detect unhealthy component + let unhealthy = monitor.get_unhealthy_components().await; + assert_eq!(unhealthy, vec!["pandora"]); +} + +#[tokio::test] +async fn test_health_check_triggers_state_change() { + let manager = StateManager::new(); + + manager.transition_to(MimiState::Listening).unwrap(); + + let unhealthy = ComponentHealth { + latency_ms: 7000, + memory_usage_percent: 90, + last_heartbeat_secs: 35, + }; + + // Should escalate to Degraded + manager.check_and_transition(MimiState::Processing, &unhealthy) + .unwrap(); + + assert_eq!(manager.current_state(), MimiState::Degraded); +} + +// ============================================================================ +// Retry and Circuit Breaker Integration +// ============================================================================ + +#[tokio::test] +async fn test_retry_integration_with_backoff() { + let manager = StateManager::new(); + + let task = Task::new(TaskType::Execute, "retry_task") + .with_execution_model(ExecutionModel::Blocking); + + manager.enqueue_task(task).unwrap(); + + // This will retry internally (implementation detail) + let start = std::time::Instant::now(); + let _ = manager.execute_with_retry().await; + let elapsed = start.elapsed(); + + // Should have taken time due to retries + // (Actual test would mock failure) +} + +#[tokio::test] +async fn test_circuit_breaker_prevents_overload() { + let breaker = CircuitBreaker::new(3, Duration::from_secs(5)); + let manager = StateManager::new(); + + // Simulate 3 failures + for _ in 0..3 { + breaker.record_failure(); + } + + assert_eq!(breaker.state(), CircuitState::Open); + + // Circuit should block requests + assert!(!breaker.allow_request()); + + // Wait for half-open + sleep(Duration::from_secs(6)).await; + assert_eq!(breaker.state(), CircuitState::HalfOpen); + + // Test request allowed + assert!(breaker.allow_request()); +} + +// ============================================================================ +// Message Bus Integration (Zenoh) +// ============================================================================ + +#[tokio::test] +#[ignore] // Requires Zenoh daemon +async fn test_zenoh_full_pub_sub_cycle() { + let mut manager = StateManager::new(); + manager.init_zenoh().await.unwrap(); + + let subscriber = manager.start_zenoh_subscriber("mimi/tasks").await.unwrap(); + + sleep(Duration::from_millis(200)).await; + + let task = Task::new(TaskType::Query, "zenoh_task"); + manager.publish_task_via_zenoh(&task).await.unwrap(); + + sleep(Duration::from_millis(200)).await; + + // Verify task was received and queued + assert!(manager.queue_size() > 0); + + subscriber.abort(); +} + +#[tokio::test] +#[ignore] // Requires Zenoh daemon +async fn test_zenoh_state_change_broadcast() { + let mut manager = StateManager::new(); + manager.init_zenoh().await.unwrap(); + + manager.start_state_change_publisher("mimi/state").await.unwrap(); + + manager.transition_to(MimiState::Listening).unwrap(); + manager.transition_to(MimiState::Processing).unwrap(); + + sleep(Duration::from_millis(100)).await; + + // State changes should have been published +} + +// ============================================================================ +// Pandora Integration +// ============================================================================ + +#[tokio::test] +#[ignore] // Requires Neo4j +async fn test_pandora_selective_persistence() { + let mut manager = StateManager::new(); + manager.init_pandora("bolt://localhost:7687").await.unwrap(); + + // Normal transition (not persisted) + manager.transition_to_with_persistence(MimiState::Listening).await.unwrap(); + + // Error transition (persisted) + manager.transition_to_with_persistence(MimiState::Degraded).await.unwrap(); + + // Recovery transition (persisted) + manager.transition_to_with_persistence(MimiState::Recovering).await.unwrap(); +} + +#[tokio::test] +#[ignore] // Requires Neo4j +async fn test_pandora_query_history() { + let client = PandoraClient::new("bolt://localhost:7687").await.unwrap(); + + let transition1 = StateTransition::new(MimiState::Idle, MimiState::Listening); + let transition2 = StateTransition::new(MimiState::Listening, MimiState::Degraded); + + client.persist_transition(&transition1).await.unwrap(); + client.persist_transition(&transition2).await.unwrap(); + + let history = client.query_state_history(5).await.unwrap(); + assert!(history.len() >= 1); // At least Degraded transition persisted +} + +// ============================================================================ +// Concurrent Access Tests +// ============================================================================ + +#[tokio::test] +async fn test_concurrent_task_enqueue() { + let manager = Arc::new(StateManager::new()); + + let mut handles = vec![]; + + for i in 0..10 { + let mgr = manager.clone(); + let handle = tokio::spawn(async move { + let task = Task::new(TaskType::Query, &format!("task_{}", i)); + mgr.enqueue_task(task).unwrap(); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + assert_eq!(manager.queue_size(), 10); +} + +#[tokio::test] +async fn test_concurrent_state_transitions() { + let manager = Arc::new(StateManager::new()); + + let mut handles = vec![]; + + for _ in 0..5 { + let mgr = manager.clone(); + let handle = tokio::spawn(async move { + let _ = mgr.transition_to(MimiState::Listening); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + // Should end up in Listening state + assert_eq!(manager.current_state(), MimiState::Listening); +} + +// ============================================================================ +// Performance Tests +// ============================================================================ + +#[tokio::test] +async fn test_high_throughput_task_processing() { + let manager = StateManager::with_capacity(10000); + + // Enqueue 1000 tasks + for i in 0..1000 { + let task = Task::new(TaskType::Query, &format!("task_{}", i)) + .with_priority(if i % 2 == 0 { TaskPriority::High } else { TaskPriority::Normal }); + manager.enqueue_task(task).unwrap(); + } + + assert_eq!(manager.queue_size(), 1000); + + // Dequeue all + for _ in 0..1000 { + assert!(manager.dequeue_task().is_ok()); + } + + assert_eq!(manager.queue_size(), 0); +} +``` + +**Step 2: Run integration tests** + +Run: `cargo test --test zenoh_integration_tests` +Expected: PASS (excluding ignored tests) + +**Step 3: Run with ignored tests (if infrastructure available)** + +Run: `cargo test --test zenoh_integration_tests -- --ignored` +Expected: PASS (with Zenoh + Neo4j running) + +**Step 4: Commit** + +```bash +git add crates/mimi-core/tests/zenoh_integration_tests.rs +git commit -m "test(state-machine): add 15+ integration tests for lifecycle, health, persistence" +``` + +--- + +## Task 15: Add Acceptance Tests (5 scenarios) + +**Files:** +- Create: `crates/mimi-core/tests/acceptance_tests.rs` + +**Step 1: Write acceptance test suite** + +Create `crates/mimi-core/tests/acceptance_tests.rs`: + +```rust +//! Acceptance Tests for State Machine +//! +//! High-level end-to-end scenarios validating system behavior + +use mimi_core::state_machine::*; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +// ============================================================================ +// Scenario 1: Happy Path - Complete Task Execution +// ============================================================================ + +#[tokio::test] +async fn acceptance_happy_path_complete_workflow() { + // GIVEN: A fresh state manager with health monitoring + let manager = Arc::new(StateManager::new()); + let health_monitor = Arc::new(HealthMonitor::new()); + + health_monitor.register_component("beatrice").await.unwrap(); + health_monitor.register_component("pandora").await.unwrap(); + + // WHEN: User sends a query + manager.transition_to(MimiState::Listening).unwrap(); + + let task = Task::new(TaskType::Query, "user_query") + .with_priority(TaskPriority::Normal) + .with_execution_model(ExecutionModel::Blocking) + .with_payload(b"What is the weather?".to_vec()); + + manager.enqueue_task(task).unwrap(); + + // System processes the query + manager.transition_to(MimiState::Processing).unwrap(); + + // Components report healthy status + health_monitor.update_heartbeat("beatrice").await.unwrap(); + health_monitor.update_heartbeat("pandora").await.unwrap(); + + // Execute the task + manager.execute_next_task().await.unwrap(); + + // THEN: System returns to Idle state successfully + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); + assert_eq!(manager.queue_size(), 0); + + let unhealthy = health_monitor.get_unhealthy_components().await; + assert!(unhealthy.is_empty()); +} + +// ============================================================================ +// Scenario 2: Component Failure Recovery +// ============================================================================ + +#[tokio::test] +async fn acceptance_component_failure_and_recovery() { + // GIVEN: Running system with monitored components + let manager = Arc::new(StateManager::new()); + let health_monitor = Arc::new(HealthMonitor::new()); + let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(5))); + + health_monitor.register_component("pandora").await.unwrap(); + + manager.transition_to(MimiState::Listening).unwrap(); + + // WHEN: Pandora component fails (high latency) + health_monitor.record_latency("pandora", 7000).await.unwrap(); + + let should_escalate = health_monitor.check_escalation("pandora").await.unwrap(); + assert!(should_escalate); + + // System detects failure and escalates to Degraded + manager.force_error_state(MimiState::FailedComponent); + + circuit_breaker.record_failure(); + circuit_breaker.record_failure(); + circuit_breaker.record_failure(); + + assert_eq!(circuit_breaker.state(), CircuitState::Open); + + // THEN: System enters recovery mode + manager.transition_to(MimiState::Recovering).unwrap(); + + // Component recovers + health_monitor.record_latency("pandora", 200).await.unwrap(); + + let health = health_monitor.get_component_health("pandora").await.unwrap(); + assert!(TransitionGuard::check_component_health(&health)); + + // System returns to normal operation + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); +} + +// ============================================================================ +// Scenario 3: Cascade Fallback with Retries +// ============================================================================ + +#[tokio::test] +async fn acceptance_cascade_fallback_retry_strategy() { + // GIVEN: System with retry strategy configured + let manager = Arc::new(StateManager::new()); + let retry_strategy = RetryStrategy::exponential_with_jitter(); + + manager.transition_to(MimiState::Listening).unwrap(); + + // WHEN: A complex task is submitted that may fail + let mut task = Task::new(TaskType::Execute, "complex_api_call") + .with_priority(TaskPriority::High) + .with_execution_model(ExecutionModel::Async) + .with_timeout(Duration::from_secs(10)); + + task.max_retries = 5; + + manager.enqueue_task(task.clone()).unwrap(); + + // Simulate retry attempts + let mut retry_count = 0; + loop { + let result = manager.execute_next_task().await; + + if result.is_ok() { + break; + } + + if retry_count >= task.max_retries { + // THEN: After exhausting retries, fail gracefully + manager.force_error_state(MimiState::Degraded); + break; + } + + let delay = retry_strategy.next_delay(retry_count); + sleep(delay).await; + + retry_count += 1; + manager.enqueue_task(task.clone()).unwrap(); + } + + // System should be in either Idle (success) or Degraded (failure) + let state = manager.current_state(); + assert!(state == MimiState::Idle || state == MimiState::Degraded); +} + +// ============================================================================ +// Scenario 4: Graceful Shutdown +// ============================================================================ + +#[tokio::test] +async fn acceptance_graceful_shutdown_with_pending_tasks() { + // GIVEN: System with tasks in queue + let manager = Arc::new(StateManager::new()); + + manager.transition_to(MimiState::Listening).unwrap(); + + for i in 0..5 { + let task = Task::new(TaskType::Execute, &format!("task_{}", i)) + .with_priority(TaskPriority::Normal); + manager.enqueue_task(task).unwrap(); + } + + assert_eq!(manager.queue_size(), 5); + + // WHEN: Shutdown signal received + manager.transition_to(MimiState::Shutdown).unwrap(); + + // THEN: Process remaining tasks before shutdown + while manager.queue_size() > 0 { + let result = manager.execute_next_task().await; + + // Should complete or timeout gracefully + if result.is_err() { + break; + } + } + + assert_eq!(manager.current_state(), MimiState::Shutdown); +} + +// ============================================================================ +// Scenario 5: Chaos Engineering - Multiple Simultaneous Failures +// ============================================================================ + +#[tokio::test] +async fn acceptance_chaos_multiple_failures() { + // GIVEN: System under load with multiple components + let manager = Arc::new(StateManager::with_capacity(100)); + let health_monitor = Arc::new(HealthMonitor::new()); + let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(2))); + + health_monitor.register_component("beatrice").await.unwrap(); + health_monitor.register_component("pandora").await.unwrap(); + health_monitor.register_component("echidna").await.unwrap(); + + manager.transition_to(MimiState::Listening).unwrap(); + + // WHEN: Multiple simultaneous failures occur + + // 1. Queue fills up rapidly + for i in 0..50 { + let task = Task::new(TaskType::Execute, &format!("burst_task_{}", i)) + .with_priority(TaskPriority::High); + let _ = manager.enqueue_task(task); + } + + // 2. Components report unhealthy + health_monitor.record_latency("beatrice", 8000).await.unwrap(); + health_monitor.record_memory("pandora", 95).await.unwrap(); + health_monitor.record_latency("echidna", 6500).await.unwrap(); + + // 3. Circuit breaker trips + for _ in 0..3 { + circuit_breaker.record_failure(); + } + + assert_eq!(circuit_breaker.state(), CircuitState::Open); + + // THEN: System handles gracefully + + // System should escalate to appropriate error state + let unhealthy = health_monitor.get_unhealthy_components().await; + assert!(!unhealthy.is_empty()); + + if unhealthy.len() >= 2 { + manager.force_error_state(MimiState::CriticalError); + } else { + manager.force_error_state(MimiState::Degraded); + } + + // Circuit breaker prevents overload + assert!(!circuit_breaker.allow_request()); + + // Recovery process + manager.transition_to(MimiState::Recovering).unwrap(); + + // Components stabilize + sleep(Duration::from_millis(100)).await; + + health_monitor.record_latency("beatrice", 200).await.unwrap(); + health_monitor.record_memory("pandora", 60).await.unwrap(); + health_monitor.record_latency("echidna", 300).await.unwrap(); + + // Circuit breaker resets after timeout + sleep(Duration::from_secs(3)).await; + + // System returns to normal + manager.transition_to(MimiState::Idle).unwrap(); + + assert_eq!(manager.current_state(), MimiState::Idle); +} +``` + +**Step 2: Run acceptance tests** + +Run: `cargo test --test acceptance_tests` +Expected: PASS (5 scenarios) + +**Step 3: Generate test report** + +Run: `cargo test --test acceptance_tests -- --nocapture` +Expected: Detailed output for each scenario + +**Step 4: Commit** + +```bash +git add crates/mimi-core/tests/acceptance_tests.rs +git commit -m "test(state-machine): add 5 acceptance test scenarios covering happy path, recovery, chaos" +``` + +--- + +## Final Verification + +**Step 1: Run all tests** + +Run: `cargo test --all` +Expected: 60+ tests PASS + +**Step 2: Check code coverage** + +Run: `cargo tarpaulin --workspace` +Expected: >90% coverage on state_machine module + +**Step 3: Run clippy** + +Run: `cargo clippy --all-targets --all-features` +Expected: No warnings + +**Step 4: Build release** + +Run: `cargo build --release` +Expected: SUCCESS + +**Step 5: Generate documentation** + +Run: `cargo doc --no-deps --open` +Expected: Full API docs for state_machine module + +--- + +## Summary + +**Implementation Complete:** +- ✅ 10-state FSM with validation +- ✅ Task queue with priority ordering +- ✅ Guard conditions for state transitions +- ✅ Exponential backoff retry strategy +- ✅ Circuit breaker pattern +- ✅ Async/blocking task execution +- ✅ Zenoh message bus integration +- ✅ Pandora Neo4j persistence +- ✅ Component health monitoring +- ✅ 40+ unit tests +- ✅ 15+ integration tests +- ✅ 5 acceptance test scenarios +- ✅ 95%+ code coverage + +**Files Created/Modified:** +- `crates/mimi-core/src/state_machine.rs` (main module, ~800 lines) +- `crates/mimi-core/src/state_machine/zenoh_integration.rs` (~150 lines) +- `crates/mimi-core/src/state_machine/pandora_integration.rs` (~120 lines) +- `crates/mimi-core/tests/state_machine_tests.rs` (~600 lines) +- `crates/mimi-core/tests/zenoh_integration_tests.rs` (~400 lines) +- `crates/mimi-core/tests/acceptance_tests.rs` (~300 lines) +- `crates/mimi-core/Cargo.toml` (added dependencies) +- `crates/mimi-core/src/lib.rs` (module exports) + +**Total Lines:** ~2370 lines (implementation + tests) + +**Next Steps:** +1. Review this plan for approval +2. Execute with superpowers:executing-plans skill +3. Integration with M1.2.6 Zenoh bus +4. Performance benchmarking +5. Production deployment