diff --git a/crates/pulsing-actor/src/cluster/backends/gossip.rs b/crates/pulsing-actor/src/cluster/backends/gossip.rs new file mode 100644 index 000000000..07531a81a --- /dev/null +++ b/crates/pulsing-actor/src/cluster/backends/gossip.rs @@ -0,0 +1,165 @@ +//! Gossip backend implementation +//! +//! Wraps the existing GossipCluster to implement the NamingBackend trait. + +use crate::actor::{ActorId, ActorPath, NodeId, StopReason}; +use crate::cluster::NamingBackend; +use crate::cluster::{ + member::{MemberInfo, NamedActorInfo, NamedActorInstance}, + GossipCluster, GossipConfig, +}; +use crate::transport::http2::Http2Transport; +use async_trait::async_trait; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; + +/// Gossip-based naming backend +/// +/// This wraps the existing GossipCluster implementation to provide +/// the NamingBackend trait interface. +pub struct GossipBackend { + cluster: Arc, +} + +impl GossipBackend { + /// Create a new GossipBackend + pub fn new( + local_node: NodeId, + local_addr: SocketAddr, + transport: Arc, + config: GossipConfig, + ) -> Self { + let cluster = GossipCluster::new(local_node, local_addr, transport, config); + Self { + cluster: Arc::new(cluster), + } + } + + /// Get a reference to the inner GossipCluster + /// + /// This is needed for SystemMessageHandler to access handle_gossip method. + pub fn inner(&self) -> &GossipCluster { + &self.cluster + } +} + +#[async_trait] +impl NamingBackend for GossipBackend { + // ======================================================================== + // Node Management + // ======================================================================== + + async fn join(&self, seeds: Vec) -> anyhow::Result<()> { + self.cluster.join(seeds).await + } + + async fn leave(&self) -> anyhow::Result<()> { + self.cluster.leave().await + } + + async fn all_members(&self) -> Vec { + self.cluster.all_members().await + } + + async fn alive_members(&self) -> Vec { + self.cluster.alive_members().await + } + + async fn get_member(&self, node_id: &NodeId) -> Option { + self.cluster.get_member(node_id).await + } + + // ======================================================================== + // Named Actor Registration + // ======================================================================== + + async fn register_named_actor(&self, path: ActorPath) { + self.cluster.register_named_actor(path).await + } + + async fn register_named_actor_full( + &self, + path: ActorPath, + actor_id: ActorId, + metadata: HashMap, + ) { + self.cluster + .register_named_actor_full(path, actor_id, metadata) + .await + } + + async fn unregister_named_actor(&self, path: &ActorPath) { + self.cluster.unregister_named_actor(path).await + } + + async fn broadcast_named_actor_failed(&self, path: &ActorPath, reason: &StopReason) { + self.cluster + .broadcast_named_actor_failed(path, reason) + .await + } + + // ======================================================================== + // Named Actor Queries + // ======================================================================== + + async fn lookup_named_actor(&self, path: &ActorPath) -> Option { + self.cluster.lookup_named_actor(path).await + } + + async fn select_named_actor_instance(&self, path: &ActorPath) -> Option { + self.cluster.select_named_actor_instance(path).await + } + + async fn get_named_actor_instances(&self, path: &ActorPath) -> Vec { + self.cluster.get_named_actor_instances(path).await + } + + async fn get_named_actor_instances_detailed( + &self, + path: &ActorPath, + ) -> Vec<(MemberInfo, Option)> { + self.cluster.get_named_actor_instances_detailed(path).await + } + + async fn all_named_actors(&self) -> Vec { + self.cluster.all_named_actors().await + } + + // ======================================================================== + // Actor Registration + // ======================================================================== + + async fn register_actor(&self, actor_id: ActorId) { + self.cluster.register_actor(actor_id).await + } + + async fn unregister_actor(&self, actor_id: &ActorId) { + self.cluster.unregister_actor(actor_id).await + } + + async fn lookup_actor(&self, actor_id: &ActorId) -> Option { + self.cluster.lookup_actor(actor_id).await + } + + // ======================================================================== + // Lifecycle Management + // ======================================================================== + + fn start(&self, cancel: CancellationToken) { + self.cluster.start(cancel) + } + + fn local_node(&self) -> &NodeId { + self.cluster.local_node() + } + + fn local_addr(&self) -> SocketAddr { + self.cluster.local_addr() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/crates/pulsing-actor/src/cluster/backends/head.rs b/crates/pulsing-actor/src/cluster/backends/head.rs new file mode 100644 index 000000000..92597d31b --- /dev/null +++ b/crates/pulsing-actor/src/cluster/backends/head.rs @@ -0,0 +1,1283 @@ +//! Head node backend implementation +//! +//! Implements a centralized naming backend where: +//! - Head node: Maintains global registry of nodes and named actors +//! - Worker nodes: Sync with head node via HTTP/2 + +use crate::actor::{ActorId, ActorPath, NodeId, StopReason}; +use crate::cluster::{ + member::{MemberInfo, MemberStatus, NamedActorInfo, NamedActorInstance}, + NamingBackend, +}; +use crate::transport::http2::{Http2Client, Http2Config}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +// ============================================================================ +// Configuration +// ============================================================================ + +/// Configuration for head node backend +#[derive(Clone, Debug)] +pub struct HeadNodeConfig { + /// Sync interval for worker nodes (default: 5s) + pub sync_interval: Duration, + /// Heartbeat interval for worker nodes (default: 10s) + pub heartbeat_interval: Duration, + /// Heartbeat timeout for head node (default: 30s) + pub heartbeat_timeout: Duration, +} + +impl Default for HeadNodeConfig { + fn default() -> Self { + Self { + sync_interval: Duration::from_secs(5), + heartbeat_interval: Duration::from_secs(10), + heartbeat_timeout: Duration::from_secs(30), + } + } +} + +// ============================================================================ +// Node Mode +// ============================================================================ + +#[derive(Clone, Debug)] +enum NodeMode { + /// Head node mode + Head, + /// Worker node mode with head node address + Worker { head_addr: SocketAddr }, +} + +// ============================================================================ +// Head Node State +// ============================================================================ + +/// Node registration information +#[derive(Clone, Debug, Serialize, Deserialize)] +struct NodeRegistration { + node_id: NodeId, + addr: SocketAddr, + last_heartbeat: u64, // milliseconds since epoch +} + +/// Head node state (only used in head mode) +struct HeadNodeState { + /// Registered nodes + nodes: HashMap, + /// Named actors registry + named_actors: HashMap, + /// Actor registry + actors: HashMap, +} + +impl HeadNodeState { + fn new() -> Self { + Self { + nodes: HashMap::new(), + named_actors: HashMap::new(), + actors: HashMap::new(), + } + } + + /// Register or update a node + fn register_node(&mut self, node_id: NodeId, addr: SocketAddr) { + let now = HeadNodeBackend::now_millis(); + self.nodes.insert( + node_id, + NodeRegistration { + node_id, + addr, + last_heartbeat: now, + }, + ); + } + + /// Update node heartbeat + fn update_heartbeat(&mut self, node_id: &NodeId) -> bool { + if let Some(reg) = self.nodes.get_mut(node_id) { + reg.last_heartbeat = HeadNodeBackend::now_millis(); + true + } else { + false + } + } + + /// Remove stale nodes (heartbeat timeout) + fn remove_stale_nodes(&mut self, timeout_ms: u64) -> Vec { + let now = HeadNodeBackend::now_millis(); + let mut removed = Vec::new(); + + self.nodes.retain(|node_id, reg| { + if now.saturating_sub(reg.last_heartbeat) > timeout_ms { + removed.push(*node_id); + false + } else { + true + } + }); + + // Clean up named actors and actors for removed nodes + for node_id in &removed { + self.named_actors.values_mut().for_each(|info| { + info.remove_instance(node_id); + }); + self.named_actors.retain(|_, info| !info.is_empty()); + self.actors.retain(|_, nid| nid != node_id); + } + + removed + } + + /// Register a named actor + fn register_named_actor( + &mut self, + path: ActorPath, + node_id: NodeId, + actor_id: Option, + metadata: HashMap, + ) { + let key = path.as_str(); + if let Some(aid) = actor_id { + let instance = NamedActorInstance::with_metadata(node_id, aid, metadata); + self.named_actors + .entry(key.clone()) + .and_modify(|info| info.add_full_instance(instance.clone())) + .or_insert_with(|| NamedActorInfo::with_full_instance(path, instance)); + } else { + self.named_actors + .entry(key.clone()) + .and_modify(|info| info.add_instance(node_id)) + .or_insert_with(|| NamedActorInfo::with_instance(path, node_id)); + } + } + + /// Unregister a named actor + fn unregister_named_actor(&mut self, path: &ActorPath, node_id: &NodeId) { + let key = path.as_str().to_string(); + if let Some(info) = self.named_actors.get_mut(&key) { + info.remove_instance(node_id); + if info.is_empty() { + self.named_actors.remove(&key); + } + } + } + + /// Register an actor + fn register_actor(&mut self, actor_id: ActorId, node_id: NodeId) { + self.actors.insert(actor_id, node_id); + } + + /// Unregister an actor + fn unregister_actor(&mut self, actor_id: &ActorId) { + self.actors.remove(actor_id); + } + + /// Get all members as MemberInfo + fn all_members(&self) -> Vec { + self.nodes + .values() + .map(|reg| MemberInfo { + node_id: reg.node_id, + addr: reg.addr, + gossip_addr: reg.addr, + status: MemberStatus::Alive, + incarnation: 0, + last_update: None, + }) + .collect() + } + + /// Get all named actors + fn all_named_actors(&self) -> Vec { + self.named_actors.values().cloned().collect() + } +} + +// ============================================================================ +// Worker Node State +// ============================================================================ + +/// Worker node state (only used in worker mode) +struct WorkerNodeState { + /// Cached members from head node + members: HashMap, + /// Cached named actors from head node + named_actors: HashMap, + /// Cached actors from head node + actors: HashMap, + /// Pending sync operations + pending_sync: Vec, +} + +#[derive(Clone, Debug)] +enum SyncOperation { + RegisterNamed { + path: ActorPath, + actor_id: Option, + metadata: HashMap, + }, + UnregisterNamed { + path: ActorPath, + }, + Register { + actor_id: ActorId, + }, + Unregister { + actor_id: ActorId, + }, +} + +impl WorkerNodeState { + fn new() -> Self { + Self { + members: HashMap::new(), + named_actors: HashMap::new(), + actors: HashMap::new(), + pending_sync: Vec::new(), + } + } + + /// Update state from sync response + fn update_from_sync(&mut self, sync: SyncResponse) { + // Update members + self.members.clear(); + for member in sync.members { + self.members.insert(member.node_id, member); + } + + // Update named actors + self.named_actors.clear(); + for info in sync.named_actors { + self.named_actors + .insert(info.path.as_str().to_string(), info); + } + + // Update actors + self.actors.clear(); + for (actor_id, node_id) in sync.actors { + self.actors.insert(actor_id, node_id); + } + } + + /// Add pending sync operation + fn add_pending_sync(&mut self, op: SyncOperation) { + self.pending_sync.push(op); + } + + /// Get and clear pending sync operations + fn take_pending_sync(&mut self) -> Vec { + std::mem::take(&mut self.pending_sync) + } +} + +// ============================================================================ +// Shared State +// ============================================================================ + +enum BackendState { + Head(HeadNodeState), + Worker(WorkerNodeState), +} + +// ============================================================================ +// Head Node Backend +// ============================================================================ + +/// Head node based naming backend +#[derive(Clone)] +pub struct HeadNodeBackend { + mode: NodeMode, + local_node: NodeId, + local_addr: SocketAddr, + state: Arc>, + http_client: Option>, + config: HeadNodeConfig, +} + +impl HeadNodeBackend { + /// Create a new head node backend + pub fn new( + local_node: NodeId, + local_addr: SocketAddr, + is_head_node: bool, + head_addr: Option, + http_config: Http2Config, + ) -> Self { + Self::with_config( + local_node, + local_addr, + is_head_node, + head_addr, + http_config, + HeadNodeConfig::default(), + ) + } + + pub fn with_config( + local_node: NodeId, + local_addr: SocketAddr, + _is_head_node: bool, + head_addr: Option, + http_config: Http2Config, + config: HeadNodeConfig, + ) -> Self { + let mode = if let Some(addr) = head_addr { + NodeMode::Worker { head_addr: addr } + } else { + NodeMode::Head + }; + + let state = match &mode { + NodeMode::Head => BackendState::Head(HeadNodeState::new()), + NodeMode::Worker { .. } => BackendState::Worker(WorkerNodeState::new()), + }; + + let http_client = match &mode { + NodeMode::Head => None, + NodeMode::Worker { .. } => Some(Arc::new(Http2Client::new(http_config))), + }; + + if let Some(client) = &http_client { + client.start_background_tasks(); + } + + Self { + mode, + local_node, + local_addr, + state: Arc::new(RwLock::new(state)), + http_client, + config, + } + } + + /// Get current timestamp in milliseconds + fn now_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 + } + + /// Check if this is a head node + pub fn is_head(&self) -> bool { + matches!(self.mode, NodeMode::Head) + } + + /// Get head node address (if worker mode) + pub fn head_addr(&self) -> Option { + match &self.mode { + NodeMode::Head => None, + NodeMode::Worker { head_addr } => Some(*head_addr), + } + } + + /// Handle node registration (head node only) + pub async fn handle_register_node( + &self, + node_id: NodeId, + addr: SocketAddr, + ) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_node(node_id, addr); + tracing::info!(node_id = %node_id, addr = %addr, "Node registered"); + } + Ok(()) + } + + /// Handle heartbeat (head node only) + pub async fn handle_heartbeat(&self, node_id: &NodeId) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + if head_state.update_heartbeat(node_id) { + Ok(()) + } else { + Err(anyhow::anyhow!("Node not found: {}", node_id)) + } + } else { + Err(anyhow::anyhow!("Invalid state")) + } + } + + /// Handle register named actor (head node only) + pub async fn handle_register_named_actor( + &self, + path: ActorPath, + node_id: NodeId, + actor_id: Option, + metadata: HashMap, + ) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_named_actor(path.clone(), node_id, actor_id, metadata); + tracing::debug!(path = %path, node_id = %node_id, "Named actor registered on head"); + } + Ok(()) + } + + /// Handle unregister named actor (head node only) + pub async fn handle_unregister_named_actor( + &self, + path: &ActorPath, + node_id: &NodeId, + ) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.unregister_named_actor(path, node_id); + tracing::debug!(path = %path, node_id = %node_id, "Named actor unregistered from head"); + } + Ok(()) + } + + /// Handle register actor (head node only) + pub async fn handle_register_actor( + &self, + actor_id: ActorId, + node_id: NodeId, + ) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_actor(actor_id, node_id); + tracing::debug!(actor_id = %actor_id, node_id = %node_id, "Actor registered on head"); + } + Ok(()) + } + + /// Handle unregister actor (head node only) + pub async fn handle_unregister_actor( + &self, + actor_id: &ActorId, + node_id: &NodeId, + ) -> anyhow::Result<()> { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + // Only unregister if the actor belongs to this node + if head_state.actors.get(actor_id) == Some(node_id) { + head_state.unregister_actor(actor_id); + tracing::debug!(actor_id = %actor_id, node_id = %node_id, "Actor unregistered from head"); + } + } + Ok(()) + } + + /// Handle sync request (head node only) - returns current state + pub async fn handle_sync(&self) -> anyhow::Result { + if !self.is_head() { + return Err(anyhow::anyhow!("Not a head node")); + } + + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + let members = head_state.all_members(); + let named_actors = head_state.all_named_actors(); + let actors: Vec<_> = head_state.actors.iter().map(|(k, v)| (*k, *v)).collect(); + + Ok(SyncResponse { + members, + named_actors, + actors, + }) + } else { + Err(anyhow::anyhow!("Invalid state")) + } + } + + /// Make HTTP request to head node (worker mode only) + async fn request_head Deserialize<'de>>( + &self, + path: &str, + msg_type: &str, + payload: Vec, + ) -> anyhow::Result { + let head_addr = self.head_addr().ok_or_else(|| { + anyhow::anyhow!("Cannot make request to head node: this is a head node") + })?; + let client = self + .http_client + .as_ref() + .ok_or_else(|| anyhow::anyhow!("HTTP client not available"))?; + + let response_bytes = client.ask(head_addr, path, msg_type, payload).await?; + let result: T = bincode::deserialize(&response_bytes)?; + Ok(result) + } + + /// Send request to head node without response (worker mode only) + async fn tell_head(&self, path: &str, msg_type: &str, payload: Vec) -> anyhow::Result<()> { + let head_addr = self + .head_addr() + .ok_or_else(|| anyhow::anyhow!("Cannot send to head node: this is a head node"))?; + let client = self + .http_client + .as_ref() + .ok_or_else(|| anyhow::anyhow!("HTTP client not available"))?; + + tracing::debug!( + head_addr = %head_addr, + path = %path, + msg_type = %msg_type, + "Sending tell to head node" + ); + + client.tell(head_addr, path, msg_type, payload).await + } + + /// Sync with head node (worker mode only) + async fn sync_from_head(&self) -> anyhow::Result<()> { + if self.is_head() { + return Err(anyhow::anyhow!("Cannot sync: this is a head node")); + } + + let sync: SyncResponse = self + .request_head("/cluster/head/sync", "sync", Vec::new()) + .await?; + + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.update_from_sync(sync); + } + + Ok(()) + } + + /// Register with head node (worker mode only) + async fn register_with_head(&self) -> anyhow::Result<()> { + if self.is_head() { + return Err(anyhow::anyhow!("Cannot register: this is a head node")); + } + + let req = RegisterNodeRequest { + node_id: self.local_node, + addr: self.local_addr, + }; + let payload = bincode::serialize(&req)?; + + self.tell_head("/cluster/head/register", "register_node", payload) + .await?; + + tracing::info!( + node_id = %self.local_node, + head_addr = ?self.head_addr(), + "Registered with head node" + ); + + Ok(()) + } + + /// Send heartbeat to head node (worker mode only) + async fn send_heartbeat(&self) -> anyhow::Result<()> { + if self.is_head() { + return Err(anyhow::anyhow!( + "Cannot send heartbeat: this is a head node" + )); + } + + let req = HeartbeatRequest { + node_id: self.local_node, + }; + let payload = bincode::serialize(&req)?; + + self.tell_head("/cluster/head/heartbeat", "heartbeat", payload) + .await?; + + Ok(()) + } + + /// Process pending sync operations (worker mode only) + async fn process_pending_sync(&self) -> anyhow::Result<()> { + if self.is_head() { + return Ok(()); // No pending sync for head node + } + + let pending = { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.take_pending_sync() + } else { + return Ok(()); + } + }; + + for op in pending { + match op { + SyncOperation::RegisterNamed { + path, + actor_id, + metadata, + } => { + let req = RegisterNamedActorRequest { + path: path.clone(), + node_id: self.local_node, + actor_id, + metadata: metadata.clone(), + }; + let payload = bincode::serialize(&req)?; + if let Err(e) = self + .tell_head( + "/cluster/head/named_actor/register", + "register_named_actor", + payload, + ) + .await + { + tracing::warn!(path = %path, error = %e, "Failed to sync named actor registration"); + // Re-add to pending queue + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::RegisterNamed { + path, + actor_id, + metadata, + }); + } + } + } + SyncOperation::UnregisterNamed { path } => { + let req = UnregisterNamedActorRequest { + path: path.clone(), + node_id: self.local_node, + }; + let payload = bincode::serialize(&req)?; + if let Err(e) = self + .tell_head( + "/cluster/head/named_actor/unregister", + "unregister_named_actor", + payload, + ) + .await + { + tracing::warn!(path = %path, error = %e, "Failed to sync named actor unregistration"); + // Re-add to pending queue + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::UnregisterNamed { path }); + } + } + } + SyncOperation::Register { actor_id } => { + let req = RegisterActorRequest { + actor_id, + node_id: self.local_node, + }; + let payload = bincode::serialize(&req)?; + if let Err(e) = self + .tell_head("/cluster/head/actor/register", "register_actor", payload) + .await + { + tracing::warn!(actor_id = %actor_id, error = %e, "Failed to sync actor registration"); + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::Register { actor_id }); + } + } + } + SyncOperation::Unregister { actor_id } => { + let req = UnregisterActorRequest { + actor_id, + node_id: self.local_node, + }; + let payload = bincode::serialize(&req)?; + if let Err(e) = self + .tell_head( + "/cluster/head/actor/unregister", + "unregister_actor", + payload, + ) + .await + { + tracing::warn!(actor_id = %actor_id, error = %e, "Failed to sync actor unregistration"); + // Re-add to pending queue + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::Unregister { actor_id }); + } + } + } + } + } + + Ok(()) + } +} + +// ============================================================================ +// NamingBackend Trait Implementation +// ============================================================================ + +#[async_trait] +impl NamingBackend for HeadNodeBackend { + // ======================================================================== + // Node Management + // ======================================================================== + + async fn join(&self, _seeds: Vec) -> anyhow::Result<()> { + if self.is_head() { + // Head node: register itself + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_node(self.local_node, self.local_addr); + } + tracing::info!("Head node started and registered itself"); + Ok(()) + } else { + // Worker node: register with head node + self.register_with_head().await?; + // Initial sync + self.sync_from_head().await?; + Ok(()) + } + } + + async fn leave(&self) -> anyhow::Result<()> { + if self.is_head() { + // Head node: clear all registrations + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.nodes.clear(); + head_state.named_actors.clear(); + head_state.actors.clear(); + } + tracing::info!("Head node leaving"); + } else { + // Worker node: unregister from head (best effort) + // Note: We don't have an unregister endpoint, so we just stop syncing + tracing::info!(node_id = %self.local_node, "Worker node leaving"); + } + Ok(()) + } + + async fn all_members(&self) -> Vec { + if self.is_head() { + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + head_state.all_members() + } else { + Vec::new() + } + } else { + // Worker: return cached members + let state = self.state.read().await; + if let BackendState::Worker(ref worker_state) = *state { + worker_state.members.values().cloned().collect() + } else { + Vec::new() + } + } + } + + async fn alive_members(&self) -> Vec { + let all = self.all_members().await; + all.into_iter() + .filter(|m| m.status == MemberStatus::Alive && m.node_id != self.local_node) + .collect() + } + + async fn get_member(&self, node_id: &NodeId) -> Option { + if self.is_head() { + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + head_state.nodes.get(node_id).map(|reg| MemberInfo { + node_id: reg.node_id, + addr: reg.addr, + gossip_addr: reg.addr, + status: MemberStatus::Alive, + incarnation: 0, + last_update: None, + }) + } else { + None + } + } else { + // Worker: return from cache + let state = self.state.read().await; + if let BackendState::Worker(ref worker_state) = *state { + worker_state.members.get(node_id).cloned() + } else { + None + } + } + } + + // ======================================================================== + // Named Actor Registration + // ======================================================================== + + async fn register_named_actor(&self, path: ActorPath) { + if self.is_head() { + // Head node: register directly + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_named_actor( + path.clone(), + self.local_node, + None, + HashMap::new(), + ); + } + } else { + // Worker: add to pending sync and update local cache + { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::RegisterNamed { + path: path.clone(), + actor_id: None, + metadata: HashMap::new(), + }); + // Update local cache immediately + worker_state + .named_actors + .entry(path.as_str().to_string()) + .and_modify(|info| info.add_instance(self.local_node)) + .or_insert_with(|| { + NamedActorInfo::with_instance(path.clone(), self.local_node) + }); + } + } + // Try to sync immediately + let _ = self.process_pending_sync().await; + } + } + + async fn register_named_actor_full( + &self, + path: ActorPath, + actor_id: ActorId, + metadata: HashMap, + ) { + if self.is_head() { + // Head node: register directly + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_named_actor( + path.clone(), + self.local_node, + Some(actor_id), + metadata.clone(), + ); + } + } else { + // Worker: add to pending sync and update local cache + { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::RegisterNamed { + path: path.clone(), + actor_id: Some(actor_id), + metadata: metadata.clone(), + }); + // Update local cache immediately + let instance = + NamedActorInstance::with_metadata(self.local_node, actor_id, metadata); + worker_state + .named_actors + .entry(path.as_str().to_string()) + .and_modify(|info| info.add_full_instance(instance.clone())) + .or_insert_with(|| { + NamedActorInfo::with_full_instance(path.clone(), instance) + }); + } + } + // Try to sync immediately + let _ = self.process_pending_sync().await; + } + } + + async fn unregister_named_actor(&self, path: &ActorPath) { + if self.is_head() { + // Head node: unregister directly + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.unregister_named_actor(path, &self.local_node); + } + } else { + // Worker: add to pending sync and update local cache + { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state + .add_pending_sync(SyncOperation::UnregisterNamed { path: path.clone() }); + // Update local cache immediately + if let Some(info) = worker_state + .named_actors + .get_mut(&path.as_str().to_string()) + { + info.remove_instance(&self.local_node); + if info.is_empty() { + worker_state.named_actors.remove(&path.as_str().to_string()); + } + } + } + } + // Try to sync immediately + let _ = self.process_pending_sync().await; + } + } + + async fn broadcast_named_actor_failed(&self, path: &ActorPath, _reason: &StopReason) { + // Same as unregister + self.unregister_named_actor(path).await; + } + + // ======================================================================== + // Named Actor Queries + // ======================================================================== + + async fn lookup_named_actor(&self, path: &ActorPath) -> Option { + if self.is_head() { + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + head_state + .named_actors + .get(&path.as_str().to_string()) + .cloned() + } else { + None + } + } else { + // Worker: return from cache + let state = self.state.read().await; + if let BackendState::Worker(ref worker_state) = *state { + worker_state + .named_actors + .get(&path.as_str().to_string()) + .cloned() + } else { + None + } + } + } + + async fn select_named_actor_instance(&self, path: &ActorPath) -> Option { + let info = self.lookup_named_actor(path).await?; + let node_id = info.select_instance()?; + self.get_member(&node_id).await + } + + async fn get_named_actor_instances(&self, path: &ActorPath) -> Vec { + let info = match self.lookup_named_actor(path).await { + Some(info) => info, + None => return Vec::new(), + }; + + let mut members = Vec::new(); + for node_id in &info.instance_nodes { + if let Some(member) = self.get_member(node_id).await { + members.push(member); + } + } + members + } + + async fn get_named_actor_instances_detailed( + &self, + path: &ActorPath, + ) -> Vec<(MemberInfo, Option)> { + let info = match self.lookup_named_actor(path).await { + Some(info) => info, + None => return Vec::new(), + }; + + let mut result = Vec::new(); + for node_id in &info.instance_nodes { + if let Some(member) = self.get_member(node_id).await { + let instance = info.get_instance(node_id).cloned(); + result.push((member, instance)); + } + } + result + } + + async fn all_named_actors(&self) -> Vec { + if self.is_head() { + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + head_state.all_named_actors() + } else { + Vec::new() + } + } else { + // Worker: return from cache + let state = self.state.read().await; + if let BackendState::Worker(ref worker_state) = *state { + worker_state.named_actors.values().cloned().collect() + } else { + Vec::new() + } + } + } + + // ======================================================================== + // Actor Registration + // ======================================================================== + + async fn register_actor(&self, actor_id: ActorId) { + if self.is_head() { + // Head node: register directly + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.register_actor(actor_id, self.local_node); + } + } else { + // Worker: add to pending sync and update local cache + { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::Register { actor_id }); + worker_state.actors.insert(actor_id, self.local_node); + } + } + // Try to sync immediately + let _ = self.process_pending_sync().await; + } + } + + async fn unregister_actor(&self, actor_id: &ActorId) { + if self.is_head() { + // Head node: unregister directly + let mut state = self.state.write().await; + if let BackendState::Head(ref mut head_state) = *state { + head_state.unregister_actor(actor_id); + } + } else { + // Worker: add to pending sync and update local cache + { + let mut state = self.state.write().await; + if let BackendState::Worker(ref mut worker_state) = *state { + worker_state.add_pending_sync(SyncOperation::Unregister { + actor_id: *actor_id, + }); + worker_state.actors.remove(actor_id); + } + } + // Try to sync immediately + let _ = self.process_pending_sync().await; + } + } + + async fn lookup_actor(&self, actor_id: &ActorId) -> Option { + let node_id = if self.is_head() { + let state = self.state.read().await; + if let BackendState::Head(ref head_state) = *state { + head_state.actors.get(actor_id).copied()? + } else { + return None; + } + } else { + // Worker: return from cache + let state = self.state.read().await; + if let BackendState::Worker(ref worker_state) = *state { + worker_state.actors.get(actor_id).copied()? + } else { + return None; + } + }; + + self.get_member(&node_id).await + } + + // ======================================================================== + // Lifecycle Management + // ======================================================================== + + fn start(&self, cancel: CancellationToken) { + if self.is_head() { + // Head node: start heartbeat timeout detection + let state = self.state.clone(); + let config = self.config.clone(); + tokio::spawn(async move { + heartbeat_timeout_loop(state, config, cancel).await; + }); + } else { + // Worker node: start sync and heartbeat loops + let sync_backend = Arc::new(self.clone()); + let heartbeat_backend = sync_backend.clone(); + let cancel_sync = cancel.clone(); + + tokio::spawn(async move { + worker_sync_loop(sync_backend, cancel_sync).await; + }); + tokio::spawn(async move { + worker_heartbeat_loop(heartbeat_backend, cancel).await; + }); + } + } + + fn local_node(&self) -> &NodeId { + &self.local_node + } + + fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +// ============================================================================ +// Background Tasks +// ============================================================================ + +/// Head node: heartbeat timeout detection loop +async fn heartbeat_timeout_loop( + state: Arc>, + config: HeadNodeConfig, + cancel: CancellationToken, +) { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + + loop { + tokio::select! { + _ = interval.tick() => { + let timeout_ms = config.heartbeat_timeout.as_millis() as u64; + let mut state_guard = state.write().await; + if let BackendState::Head(ref mut head_state) = *state_guard { + let removed = head_state.remove_stale_nodes(timeout_ms); + if !removed.is_empty() { + tracing::info!(removed_count = removed.len(), "Removed stale nodes"); + } + } + } + _ = cancel.cancelled() => { + tracing::info!("Heartbeat timeout loop shutting down"); + break; + } + } + } +} + +/// Worker node: sync loop +async fn worker_sync_loop(backend: Arc, cancel: CancellationToken) { + let mut interval = tokio::time::interval(backend.config.sync_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + // Sync from head + if let Err(e) = backend.sync_from_head().await { + tracing::error!(error = %e, "Failed to sync from head node"); + // Fast fail: if we can't sync, we should stop + cancel.cancel(); + break; + } + // Process pending sync operations + if let Err(e) = backend.process_pending_sync().await { + tracing::warn!(error = %e, "Failed to process pending sync"); + } + } + _ = cancel.cancelled() => { + tracing::info!("Sync loop shutting down"); + break; + } + } + } +} + +/// Worker node: heartbeat loop +async fn worker_heartbeat_loop(backend: Arc, cancel: CancellationToken) { + let mut interval = tokio::time::interval(backend.config.heartbeat_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + if let Err(e) = backend.send_heartbeat().await { + tracing::error!(error = %e, "Failed to send heartbeat to head node"); + // Fast fail: if we can't heartbeat, we should stop + cancel.cancel(); + break; + } + } + _ = cancel.cancelled() => { + tracing::info!("Heartbeat loop shutting down"); + break; + } + } + } +} + +// ============================================================================ +// HTTP API Messages +// ============================================================================ + +#[derive(Serialize, Deserialize)] +struct RegisterNodeRequest { + node_id: NodeId, + addr: SocketAddr, +} + +#[derive(Serialize, Deserialize)] +struct HeartbeatRequest { + node_id: NodeId, +} + +#[derive(Serialize, Deserialize)] +struct RegisterNamedActorRequest { + path: ActorPath, + node_id: NodeId, + actor_id: Option, + metadata: HashMap, +} + +#[derive(Serialize, Deserialize)] +struct UnregisterNamedActorRequest { + path: ActorPath, + node_id: NodeId, +} + +#[derive(Serialize, Deserialize)] +pub struct RegisterActorRequest { + pub actor_id: ActorId, + pub node_id: NodeId, +} + +#[derive(Serialize, Deserialize)] +pub struct UnregisterActorRequest { + pub actor_id: ActorId, + pub node_id: NodeId, +} + +/// Response for sync request from head node +#[derive(Serialize, Deserialize)] +pub struct SyncResponse { + /// All registered members + pub members: Vec, + /// All named actors + pub named_actors: Vec, + /// Actor to node mappings + pub actors: Vec<(ActorId, NodeId)>, +} diff --git a/crates/pulsing-actor/src/cluster/backends/mod.rs b/crates/pulsing-actor/src/cluster/backends/mod.rs new file mode 100644 index 000000000..40fbb31f9 --- /dev/null +++ b/crates/pulsing-actor/src/cluster/backends/mod.rs @@ -0,0 +1,7 @@ +//! Naming backend implementations + +mod gossip; +mod head; + +pub use gossip::GossipBackend; +pub use head::{HeadNodeBackend, HeadNodeConfig, RegisterActorRequest, UnregisterActorRequest}; diff --git a/crates/pulsing-actor/src/cluster/mod.rs b/crates/pulsing-actor/src/cluster/mod.rs index 367b9af61..9ea0f10c5 100644 --- a/crates/pulsing-actor/src/cluster/mod.rs +++ b/crates/pulsing-actor/src/cluster/mod.rs @@ -7,11 +7,20 @@ mod gossip; mod member; +mod naming; pub mod swim; +pub mod backends; + pub use gossip::{GossipCluster, GossipConfig, GossipMessage}; pub use member::{ ActorLocation, ClusterNode, FailureInfo, MemberInfo, MemberStatus, NamedActorInfo, NamedActorInstance, NodeStatus, }; +pub use naming::NamingBackend; pub use swim::{SwimConfig, SwimDetector, SwimMessage}; + +// Re-export backends for convenience +pub use backends::GossipBackend; +// Re-export head node backend types (via backends module's re-exports) +pub use backends::{HeadNodeBackend, HeadNodeConfig}; diff --git a/crates/pulsing-actor/src/cluster/naming.rs b/crates/pulsing-actor/src/cluster/naming.rs new file mode 100644 index 000000000..dcb937000 --- /dev/null +++ b/crates/pulsing-actor/src/cluster/naming.rs @@ -0,0 +1,108 @@ +//! Naming backend trait for abstracting different naming service implementations +//! +//! This trait provides a unified interface for: +//! - Node discovery and membership management +//! - Named actor registration and discovery +//! - Actor location queries + +use crate::actor::{ActorId, ActorPath, NodeId, StopReason}; +use crate::cluster::member::{MemberInfo, NamedActorInfo, NamedActorInstance}; +use async_trait::async_trait; +use std::collections::HashMap; +use std::net::SocketAddr; +use tokio_util::sync::CancellationToken; + +/// Trait for naming backends that provide cluster membership and actor discovery +#[async_trait] +pub trait NamingBackend: Send + Sync { + // ======================================================================== + // Node Management + // ======================================================================== + + /// Join the cluster via seed nodes + async fn join(&self, seeds: Vec) -> anyhow::Result<()>; + + /// Leave the cluster gracefully + async fn leave(&self) -> anyhow::Result<()>; + + /// Get all cluster members + async fn all_members(&self) -> Vec; + + /// Get only alive cluster members + async fn alive_members(&self) -> Vec; + + /// Get member information for a specific node + async fn get_member(&self, node_id: &NodeId) -> Option; + + // ======================================================================== + // Named Actor Registration + // ======================================================================== + + /// Register a named actor (legacy, without actor_id) + async fn register_named_actor(&self, path: ActorPath); + + /// Register a named actor with full details (actor_id and metadata) + async fn register_named_actor_full( + &self, + path: ActorPath, + actor_id: ActorId, + metadata: HashMap, + ); + + /// Unregister a named actor + async fn unregister_named_actor(&self, path: &ActorPath); + + /// Broadcast that a named actor has failed + async fn broadcast_named_actor_failed(&self, path: &ActorPath, reason: &StopReason); + + // ======================================================================== + // Named Actor Queries + // ======================================================================== + + /// Lookup named actor information + async fn lookup_named_actor(&self, path: &ActorPath) -> Option; + + /// Select a named actor instance (for load balancing) + async fn select_named_actor_instance(&self, path: &ActorPath) -> Option; + + /// Get all instances of a named actor + async fn get_named_actor_instances(&self, path: &ActorPath) -> Vec; + + /// Get detailed instance information for a named actor + async fn get_named_actor_instances_detailed( + &self, + path: &ActorPath, + ) -> Vec<(MemberInfo, Option)>; + + /// Get all named actors in the cluster + async fn all_named_actors(&self) -> Vec; + + // ======================================================================== + // Actor Registration (optional, some backends may not support) + // ======================================================================== + + /// Register an actor (for non-named actors) + async fn register_actor(&self, actor_id: ActorId); + + /// Unregister an actor + async fn unregister_actor(&self, actor_id: &ActorId); + + /// Lookup actor location + async fn lookup_actor(&self, actor_id: &ActorId) -> Option; + + // ======================================================================== + // Lifecycle Management + // ======================================================================== + + /// Start the backend (e.g., start background tasks) + fn start(&self, cancel: CancellationToken); + + /// Get the local node ID + fn local_node(&self) -> &NodeId; + + /// Get the local node address + fn local_addr(&self) -> SocketAddr; + + /// Get as Any for downcasting + fn as_any(&self) -> &dyn std::any::Any; +} diff --git a/crates/pulsing-actor/src/system/config.rs b/crates/pulsing-actor/src/system/config.rs index bab3858a6..026fbea05 100644 --- a/crates/pulsing-actor/src/system/config.rs +++ b/crates/pulsing-actor/src/system/config.rs @@ -1,7 +1,7 @@ //! Configuration types for the Actor System use crate::actor::{NodeId, DEFAULT_MAILBOX_SIZE}; -use crate::cluster::GossipConfig; +use crate::cluster::{GossipConfig, HeadNodeConfig}; use crate::policies::LoadBalancingPolicy; use crate::supervision::SupervisionSpec; use crate::transport::Http2Config; @@ -26,6 +26,15 @@ pub struct SystemConfig { /// Default mailbox capacity for all actors pub default_mailbox_capacity: usize, + + /// Head node mode: if true, this node acts as head node + pub is_head_node: bool, + + /// Head node address: if Some, this node is a worker connecting to the head + pub head_addr: Option, + + /// Head node configuration (only used when head node mode is enabled) + pub head_node_config: Option, } /// Default bind address for standalone mode (any interface, OS-assigned port) @@ -40,6 +49,9 @@ impl Default for SystemConfig { gossip_config: GossipConfig::default(), http2_config: Http2Config::default(), default_mailbox_capacity: DEFAULT_MAILBOX_SIZE, + is_head_node: false, + head_addr: None, + head_node_config: None, } } } @@ -85,6 +97,26 @@ impl SystemConfig { pub fn is_tls_enabled(&self) -> bool { self.http2_config.is_tls_enabled() } + + /// Enable head node mode + pub fn with_head_node(mut self) -> Self { + self.is_head_node = true; + self.head_addr = None; // Clear head_addr if set + self + } + + /// Set head node address (makes this a worker node) + pub fn with_head_addr(mut self, addr: SocketAddr) -> Self { + self.head_addr = Some(addr); + self.is_head_node = false; // Clear is_head_node if set + self + } + + /// Set head node configuration + pub fn with_head_node_config(mut self, config: HeadNodeConfig) -> Self { + self.head_node_config = Some(config); + self + } } // ============================================================================ @@ -125,6 +157,12 @@ pub struct ActorSystemBuilder { gossip_config: Option, /// HTTP/2 configuration http2_config: Option, + /// Head node mode + is_head_node: bool, + /// Head node address (if set, makes this a worker) + head_addr: Option>, + /// Head node configuration + head_node_config: Option, } impl ActorSystemBuilder { @@ -175,6 +213,29 @@ impl ActorSystemBuilder { self } + /// Enable head node mode + pub fn head_node(mut self) -> Self { + self.is_head_node = true; + self.head_addr = None; // Clear head_addr if set + self + } + + /// Set head node address (makes this a worker node) + /// + /// Accepts `&str`, `String`, or `SocketAddr`. + /// Address parsing errors are deferred to `build()`. + pub fn head_addr(mut self, addr: impl Into) -> Self { + self.head_addr = Some(addr.into().0); + self.is_head_node = false; // Clear is_head_node if set + self + } + + /// Set head node configuration + pub fn head_node_config(mut self, config: HeadNodeConfig) -> Self { + self.head_node_config = Some(config); + self + } + /// Build the ActorSystem /// /// Returns an error if any address parsing failed. @@ -203,12 +264,24 @@ impl ActorSystemBuilder { } } + // Parse head node address if specified + let head_addr = match self.head_addr { + Some(Ok(addr)) => Some(addr), + Some(Err(invalid)) => { + return Err(anyhow::anyhow!("Invalid head node address: {}", invalid)); + } + None => None, + }; + let config = SystemConfig { addr, seed_nodes, gossip_config: self.gossip_config.unwrap_or_default(), http2_config: self.http2_config.unwrap_or_default(), default_mailbox_capacity: self.mailbox_capacity.unwrap_or(DEFAULT_MAILBOX_SIZE), + is_head_node: self.is_head_node, + head_addr, + head_node_config: self.head_node_config, }; crate::system::ActorSystem::new(config).await diff --git a/crates/pulsing-actor/src/system/handler.rs b/crates/pulsing-actor/src/system/handler.rs index 71039b52a..c4ec31376 100644 --- a/crates/pulsing-actor/src/system/handler.rs +++ b/crates/pulsing-actor/src/system/handler.rs @@ -1,11 +1,14 @@ //! HTTP/2 message handler for the actor system use super::handle::LocalActorHandle; -use crate::actor::{Envelope, Message, NodeId}; -use crate::cluster::{GossipCluster, GossipMessage}; +use crate::actor::{ActorId, ActorPath, Envelope, Message, NodeId}; +use crate::cluster::backends::{RegisterActorRequest, UnregisterActorRequest}; +use crate::cluster::{GossipBackend, GossipMessage, HeadNodeBackend, NamingBackend}; use crate::metrics::{metrics, SystemMetrics as PrometheusMetrics}; use crate::transport::Http2ServerHandler; use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -16,7 +19,7 @@ pub(crate) struct SystemMessageHandler { node_id: NodeId, local_actors: Arc>, named_actor_paths: Arc>, - cluster: Arc>>>, + cluster: Arc>>>, } impl SystemMessageHandler { @@ -24,7 +27,7 @@ impl SystemMessageHandler { node_id: NodeId, local_actors: Arc>, named_actor_paths: Arc>, - cluster: Arc>>>, + cluster: Arc>>>, ) -> Self { Self { node_id, @@ -159,12 +162,18 @@ impl Http2ServerHandler for SystemMessageHandler { peer_addr: SocketAddr, ) -> anyhow::Result>> { let cluster_guard = self.cluster.read().await; - if let Some(cluster) = cluster_guard.as_ref() { - let msg: GossipMessage = bincode::deserialize(&payload)?; - let response = cluster.handle_gossip(msg, peer_addr).await?; - if let Some(resp) = response { - Ok(Some(bincode::serialize(&resp)?)) + if let Some(backend) = cluster_guard.as_ref() { + // Try to downcast to GossipBackend to access handle_gossip + if let Some(gossip_backend) = backend.as_any().downcast_ref::() { + let msg: GossipMessage = bincode::deserialize(&payload)?; + let response = gossip_backend.inner().handle_gossip(msg, peer_addr).await?; + if let Some(resp) = response { + Ok(Some(bincode::serialize(&resp)?)) + } else { + Ok(None) + } } else { + // Not a gossip backend, can't handle gossip messages Ok(None) } } else { @@ -344,4 +353,137 @@ impl Http2ServerHandler for SystemMessageHandler { serde_json::json!(actors) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + async fn handle_head_api( + &self, + path: &str, + method: &str, + body: Vec, + ) -> anyhow::Result>> { + // Call the implementation method + SystemMessageHandler::handle_head_api_impl(self, path, method, body).await + } +} + +impl SystemMessageHandler { + // ======================================================================== + // Head Node API Handlers + // ======================================================================== + + /// Handle head node API requests (implementation) + async fn handle_head_api_impl( + &self, + path: &str, + method: &str, + body: Vec, + ) -> anyhow::Result>> { + let cluster_guard = self.cluster.read().await; + let backend = cluster_guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Cluster backend not available"))?; + + // Try to downcast to HeadNodeBackend + let head_backend = match backend.as_any().downcast_ref::() { + Some(b) if b.is_head() => b, + _ => return Ok(None), + }; + + match (method, path) { + ("POST", "/cluster/head/register") => { + let req: RegisterNodeRequest = bincode::deserialize(&body)?; + head_backend + .handle_register_node(req.node_id, req.addr) + .await?; + Ok(Some(Vec::new())) + } + ("POST", "/cluster/head/heartbeat") => { + let req: HeartbeatRequest = bincode::deserialize(&body)?; + head_backend.handle_heartbeat(&req.node_id).await?; + Ok(Some(Vec::new())) // Return empty body for success + } + ("POST", "/cluster/head/named_actor/register") => { + let req: RegisterNamedActorRequest = bincode::deserialize(&body)?; + head_backend + .handle_register_named_actor(req.path, req.node_id, req.actor_id, req.metadata) + .await?; + Ok(Some(Vec::new())) // Return empty body for success + } + ("POST", "/cluster/head/named_actor/unregister") => { + let req: UnregisterNamedActorRequest = bincode::deserialize(&body)?; + head_backend + .handle_unregister_named_actor(&req.path, &req.node_id) + .await?; + Ok(Some(Vec::new())) // Return empty body for success + } + ("GET", "/cluster/head/members") | ("POST", "/cluster/head/members") => { + // Support both GET and POST (POST is used by Http2Client) + let members = head_backend.all_members().await; + let body = bincode::serialize(&members)?; + Ok(Some(body)) + } + ("GET", "/cluster/head/named_actors") | ("POST", "/cluster/head/named_actors") => { + // Support both GET and POST (POST is used by Http2Client) + let named_actors = head_backend.all_named_actors().await; + let body = bincode::serialize(&named_actors)?; + Ok(Some(body)) + } + ("GET", "/cluster/head/sync") | ("POST", "/cluster/head/sync") => { + let sync = head_backend.handle_sync().await?; + let body = bincode::serialize(&sync)?; + Ok(Some(body)) + } + ("POST", "/cluster/head/actor/register") => { + let req: RegisterActorRequest = bincode::deserialize(&body)?; + head_backend + .handle_register_actor(req.actor_id, req.node_id) + .await?; + Ok(Some(Vec::new())) + } + ("POST", "/cluster/head/actor/unregister") => { + let req: UnregisterActorRequest = bincode::deserialize(&body)?; + head_backend + .handle_unregister_actor(&req.actor_id, &req.node_id) + .await?; + Ok(Some(Vec::new())) + } + _ => Err(anyhow::anyhow!( + "Unknown head API endpoint: {} {}", + method, + path + )), + } + } +} + +// ============================================================================ +// Head Node API Request Types +// ============================================================================ + +#[derive(Serialize, Deserialize)] +struct RegisterNodeRequest { + node_id: NodeId, + addr: SocketAddr, +} + +#[derive(Serialize, Deserialize)] +struct HeartbeatRequest { + node_id: NodeId, +} + +#[derive(Serialize, Deserialize)] +struct RegisterNamedActorRequest { + path: ActorPath, + node_id: NodeId, + actor_id: Option, + metadata: HashMap, +} + +#[derive(Serialize, Deserialize)] +struct UnregisterNamedActorRequest { + path: ActorPath, + node_id: NodeId, } diff --git a/crates/pulsing-actor/src/system/mod.rs b/crates/pulsing-actor/src/system/mod.rs index 200924078..708e6acb8 100644 --- a/crates/pulsing-actor/src/system/mod.rs +++ b/crates/pulsing-actor/src/system/mod.rs @@ -18,7 +18,9 @@ use crate::actor::{ Actor, ActorAddress, ActorContext, ActorId, ActorPath, ActorRef, ActorResolver, ActorSystemRef, IntoActorPath, Mailbox, NodeId, StopReason, }; -use crate::cluster::{GossipCluster, MemberInfo, MemberStatus, NamedActorInfo}; +use crate::cluster::{ + GossipBackend, HeadNodeBackend, MemberInfo, MemberStatus, NamedActorInfo, NamingBackend, +}; use crate::policies::{LoadBalancingPolicy, RoundRobinPolicy, Worker}; use crate::system_actor::{ BoxedActorFactory, SystemActor, SystemRef, SYSTEM_ACTOR_LOCAL_NAME, SYSTEM_ACTOR_PATH, @@ -145,8 +147,8 @@ pub struct ActorSystem { /// Named actor path to local actor name mapping (path_string -> actor_name) named_actor_paths: Arc>, - /// Gossip cluster (for discovery) - cluster: Arc>>>, + /// Naming backend (for discovery) + cluster: Arc>>>, /// HTTP/2 transport transport: Arc, @@ -185,7 +187,8 @@ impl ActorSystem { let node_id = NodeId::generate(); let local_actors: Arc> = Arc::new(DashMap::new()); let named_actor_paths: Arc> = Arc::new(DashMap::new()); - let cluster_holder: Arc>>> = Arc::new(RwLock::new(None)); + let cluster_holder: Arc>>> = + Arc::new(RwLock::new(None)); let lifecycle = Arc::new(ActorLifecycle::new()); // Create message handler (needs cluster reference for gossip) @@ -196,6 +199,9 @@ impl ActorSystem { cluster_holder.clone(), ); + // Clone http2_config before moving it to transport + let http2_config_for_backend = config.http2_config.clone(); + // Create HTTP/2 transport let (transport, actual_addr) = Http2Transport::new( config.addr, @@ -205,26 +211,44 @@ impl ActorSystem { ) .await?; - // Create gossip cluster - let cluster = GossipCluster::new( - node_id, - actual_addr, - transport.clone(), - config.gossip_config, - ); + // Create naming backend based on configuration + let backend: Arc = if config.head_addr.is_some() || config.is_head_node { + // Head node mode: create HeadNodeBackend + let head_config = config.head_node_config.unwrap_or_default(); + let backend = HeadNodeBackend::with_config( + node_id, + actual_addr, + config.is_head_node, + config.head_addr, + http2_config_for_backend, + head_config, + ); + Arc::new(backend) + } else { + // Gossip mode: create GossipBackend + let backend = GossipBackend::new( + node_id, + actual_addr, + transport.clone(), + config.gossip_config, + ); + Arc::new(backend) + }; - let cluster = Arc::new(cluster); { let mut holder = cluster_holder.write().await; - *holder = Some(cluster.clone()); + *holder = Some(backend.clone()); } - // Start cluster gossip - cluster.start(cancel_token.clone()); + // Start backend + backend.start(cancel_token.clone()); - // Join cluster if seed nodes provided - if !config.seed_nodes.is_empty() { - cluster.join(config.seed_nodes).await?; + // Join cluster if seed nodes provided (only for gossip mode) + if !config.seed_nodes.is_empty() && config.head_addr.is_none() && !config.is_head_node { + backend.join(config.seed_nodes).await?; + } else if config.head_addr.is_some() || config.is_head_node { + // For head node mode, join is handled internally + backend.join(Vec::new()).await?; } let system = Arc::new(Self { diff --git a/crates/pulsing-actor/src/transport/http2/client.rs b/crates/pulsing-actor/src/transport/http2/client.rs index 65efb221f..c189b8282 100644 --- a/crates/pulsing-actor/src/transport/http2/client.rs +++ b/crates/pulsing-actor/src/transport/http2/client.rs @@ -180,8 +180,9 @@ impl Http2Client { let body = response.collect().await?.to_bytes(); let error_msg = String::from_utf8_lossy(&body); return Err(anyhow::anyhow!( - "Tell failed with status {}: {}", + "Tell failed with status {} to {}: {}", status, + addr, error_msg )); } diff --git a/crates/pulsing-actor/src/transport/http2/server.rs b/crates/pulsing-actor/src/transport/http2/server.rs index 69952ab4b..e4ef18970 100644 --- a/crates/pulsing-actor/src/transport/http2/server.rs +++ b/crates/pulsing-actor/src/transport/http2/server.rs @@ -97,6 +97,19 @@ pub trait Http2ServerHandler: Send + Sync + 'static { let _ = include_internal; serde_json::json!([]) } + + /// Get as Any for downcasting + fn as_any(&self) -> &dyn std::any::Any; + + /// Handle head node API requests (optional, returns None if not supported) + async fn handle_head_api( + &self, + _path: &str, + _method: &str, + _body: Vec, + ) -> anyhow::Result>> { + Ok(None) + } } /// HTTP/2 Server @@ -323,6 +336,43 @@ impl Http2Server { return Ok(json_response(StatusCode::OK, body)); } + // Head node API endpoints + if path.starts_with("/cluster/head/") { + let body_bytes = match req.collect().await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(e) => { + return Ok(error_response( + StatusCode::BAD_REQUEST, + format!("Failed to read body: {}", e).into_bytes(), + )); + } + }; + + let method_str = method.as_str(); + let path_str = path.as_str(); + match handler + .handle_head_api(path_str, method_str, body_bytes) + .await + { + Ok(Some(response_body)) => { + return Ok(json_response(StatusCode::OK, response_body)); + } + Ok(None) => { + return Ok(error_response( + StatusCode::NOT_IMPLEMENTED, + b"Head node API not supported by this handler".to_vec(), + )); + } + Err(e) => { + tracing::warn!(path = %path, method = %method, error = %e, "Head API error"); + return Ok(error_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{}", e).into_bytes(), + )); + } + } + } + // Only POST for actor messages if method != Method::POST { return Ok(error_response( @@ -691,6 +741,10 @@ mod tests { ) -> anyhow::Result>> { Ok(None) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[tokio::test] diff --git a/crates/pulsing-actor/src/watch.rs b/crates/pulsing-actor/src/watch.rs index 034ee8423..62f546c89 100644 --- a/crates/pulsing-actor/src/watch.rs +++ b/crates/pulsing-actor/src/watch.rs @@ -6,7 +6,7 @@ //! - Cluster broadcast for named actor failures use crate::actor::{ActorId, ActorPath, Envelope, Message, StopReason}; -use crate::cluster::GossipCluster; +use crate::cluster::NamingBackend; use dashmap::DashMap; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -93,7 +93,7 @@ impl ActorLifecycle { named_path: Option, reason: StopReason, named_actor_paths: &DashMap, - cluster: &RwLock>>, + cluster: &RwLock>>, get_sender: F, ) where F: Fn(&str) -> Option>, @@ -146,7 +146,7 @@ impl ActorLifecycle { named_path: Option<&ActorPath>, reason: &StopReason, named_actor_paths: &DashMap, - cluster: &RwLock>>, + cluster: &RwLock>>, ) { if let Some(path) = named_path { // Remove from routing table diff --git a/crates/pulsing-actor/tests/cluster/mod.rs b/crates/pulsing-actor/tests/cluster/mod.rs index 70c258719..8a2627ba0 100644 --- a/crates/pulsing-actor/tests/cluster/mod.rs +++ b/crates/pulsing-actor/tests/cluster/mod.rs @@ -2,7 +2,9 @@ //! //! This module contains tests for cluster-related functionality: //! - Member management (member_tests) +//! - Naming backend (naming_tests) //! - Gossip protocol (gossip_tests) //! - SWIM protocol (swim_tests) mod member_tests; +mod naming_tests; diff --git a/crates/pulsing-actor/tests/cluster/naming_tests.rs b/crates/pulsing-actor/tests/cluster/naming_tests.rs new file mode 100644 index 000000000..2a7627545 --- /dev/null +++ b/crates/pulsing-actor/tests/cluster/naming_tests.rs @@ -0,0 +1,637 @@ +//! Naming backend tests +//! +//! Tests for the NamingBackend trait and GossipBackend implementation. +//! +//! These tests verify that GossipBackend correctly implements the NamingBackend +//! trait by testing through the ActorSystem API, which is the primary interface +//! for users. + +use pulsing_actor::actor::ActorPath; +use pulsing_actor::prelude::*; +use std::collections::HashMap; +use std::time::Duration; + +// ============================================================================ +// NamingBackend Trait Tests via ActorSystem +// ============================================================================ + +#[tokio::test] +async fn test_naming_backend_node_management() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + // Test all_members - should include local node + let members = system.members().await; + assert!(!members.is_empty()); + assert!(members.iter().any(|m| m.node_id == *system.node_id())); + + system.shutdown().await.unwrap(); +} + +// Simple test actor that implements the new Actor trait +struct TestActor; + +#[async_trait::async_trait] +impl Actor for TestActor { + async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result { + // Echo back the message + Ok(msg) + } +} + +#[tokio::test] +async fn test_naming_backend_register_named_actor() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/actor").unwrap(); + + // Register a named actor by spawning it + let _ref = system + .spawn_named(path.clone(), "test_actor", TestActor) + .await + .unwrap(); + + // Should be able to lookup + let info = system.lookup_named(&path).await; + assert!(info.is_some()); + let info = info.unwrap(); + assert_eq!(info.path, path); + assert!(info.instance_nodes.contains(system.node_id())); + + system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_naming_backend_lookup_named_actor() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + // Lookup non-existent actor + let path = ActorPath::new("nonexistent/actor").unwrap(); + let info = system.lookup_named(&path).await; + assert!(info.is_none()); + + system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_naming_backend_all_named_actors() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + // Initially should be empty (or only system actors) + let all = system.all_named_actors().await; + let initial_count = all.len(); + + let path1 = ActorPath::new("test/actor1").unwrap(); + let path2 = ActorPath::new("test/actor2").unwrap(); + + let _ref1 = system + .spawn_named(path1, "actor1", TestActor) + .await + .unwrap(); + let _ref2 = system + .spawn_named(path2, "actor2", TestActor) + .await + .unwrap(); + + // Should now have 2 more + let all = system.all_named_actors().await; + assert!(all.len() >= initial_count + 2); + + system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_naming_backend_resolve_named_actor() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/resolve").unwrap(); + let _ref = system + .spawn_named(path.clone(), "resolve_actor", TestActor) + .await + .unwrap(); + + // Should be able to resolve + let resolved = system.resolve_named(&path, None).await; + assert!(resolved.is_ok()); + + system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_naming_backend_named_actor_instances() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/instances").unwrap(); + let _ref = system + .spawn_named(path.clone(), "instances_actor", TestActor) + .await + .unwrap(); + + // Get instances + let instances = system.get_named_instances(&path).await; + assert!(!instances.is_empty()); + assert!(instances.iter().any(|m| m.node_id == *system.node_id())); + + system.shutdown().await.unwrap(); +} + +// ============================================================================ +// GossipBackend Specific Tests (via type downcasting) +// ============================================================================ + +#[tokio::test] +async fn test_gossip_backend_type_downcast() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + // Test that we can access backend through system's internal API + // This verifies the backend implements as_any() correctly + let members = system.members().await; + assert!(!members.is_empty()); + + // The fact that we can call members() means the backend is working + // We can't directly test downcasting without accessing private fields, + // but we can verify the functionality works + + system.shutdown().await.unwrap(); +} + +// ============================================================================ +// Integration Tests: Named Actor Lifecycle +// ============================================================================ + +#[tokio::test] +async fn test_named_actor_full_lifecycle() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/lifecycle").unwrap(); + + // 1. Register + let _actor_ref = system + .spawn_named(path.clone(), "lifecycle_actor", TestActor) + .await + .unwrap(); + + // 2. Verify registered + let info = system.lookup_named(&path).await; + assert!(info.is_some()); + + // 3. Resolve + let resolved = system.resolve_named(&path, None).await; + assert!(resolved.is_ok()); + + // 4. Get instances + let instances = system.get_named_instances(&path).await; + assert!(!instances.is_empty()); + + // 5. Stop (unregister) + system.stop_named(&path).await.unwrap(); + + // 6. Verify unregistered (may take a moment for gossip to propagate) + tokio::time::sleep(Duration::from_millis(100)).await; + let _info_after = system.lookup_named(&path).await; + // Actor should be removed or marked as failed + // (exact behavior depends on implementation) + + system.shutdown().await.unwrap(); +} + +// Actor with custom metadata +struct MetadataActor { + metadata: HashMap, +} + +impl MetadataActor { + fn new(metadata: HashMap) -> Self { + Self { metadata } + } +} + +#[async_trait::async_trait] +impl Actor for MetadataActor { + fn metadata(&self) -> HashMap { + self.metadata.clone() + } + + async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result { + Ok(msg) + } +} + +#[tokio::test] +async fn test_named_actor_with_metadata() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/metadata").unwrap(); + let mut metadata = HashMap::new(); + metadata.insert("python_class".to_string(), "MyActor".to_string()); + metadata.insert("python_module".to_string(), "my_module".to_string()); + + let _ref = system + .spawn_named_with_options( + path.clone(), + "metadata_actor", + MetadataActor::new(metadata.clone()), + SpawnOptions::new().public(true).metadata(metadata.clone()), + ) + .await + .unwrap(); + + // Get detailed instance info + let instances = system.get_named_instances_detailed(&path).await; + assert!(!instances.is_empty()); + + let (_member, instance) = &instances[0]; + if let Some(instance) = instance { + // Check metadata is preserved + assert_eq!( + instance.metadata.get("python_class"), + Some(&"MyActor".to_string()) + ); + assert_eq!( + instance.metadata.get("python_module"), + Some(&"my_module".to_string()) + ); + } + + system.shutdown().await.unwrap(); +} + +// ============================================================================ +// Error Handling Tests +// ============================================================================ + +#[tokio::test] +async fn test_resolve_nonexistent_named_actor() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("nonexistent/actor").unwrap(); + let result = system.resolve_named(&path, None).await; + + // Should fail to resolve + assert!(result.is_err()); + + system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_named_actor_instances_nonexistent() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("nonexistent/actor").unwrap(); + let instances = system.get_named_instances(&path).await; + + // Should return empty list + assert!(instances.is_empty()); + + system.shutdown().await.unwrap(); +} + +// ============================================================================ +// Multi-Instance Tests (standalone mode limitations) +// ============================================================================ + +#[tokio::test] +async fn test_multiple_registrations_same_path() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + + let path = ActorPath::new("test/multi").unwrap(); + + // First registration + let _ref1 = system + .spawn_named(path.clone(), "multi_actor", TestActor) + .await + .unwrap(); + + // Second registration with same path should either: + // 1. Replace the first one, or + // 2. Add as another instance + // The exact behavior depends on implementation + + let info = system.lookup_named(&path).await; + assert!(info.is_some()); + // Should have at least one instance + assert!(!info.unwrap().instance_nodes.is_empty()); + + system.shutdown().await.unwrap(); +} + +// ============================================================================ +// Head Node Backend Tests +// ============================================================================ + +#[tokio::test] +async fn test_head_node_mode_basic() { + // Create head node + let head_config = SystemConfig::standalone().with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + // Test that head node has itself as a member + let members = head_system.members().await; + assert!(!members.is_empty()); + assert!(members.iter().any(|m| m.node_id == *head_system.node_id())); + + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_node_register_named_actor() { + let head_config = SystemConfig::standalone().with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + let path = ActorPath::new("test/head_actor").unwrap(); + + // Register a named actor on head node + let _ref = head_system + .spawn_named(path.clone(), "head_actor", TestActor) + .await + .unwrap(); + + // Should be able to lookup + let info = head_system.lookup_named(&path).await; + assert!(info.is_some()); + let info = info.unwrap(); + assert_eq!(info.path, path); + assert!(info.instance_nodes.contains(head_system.node_id())); + + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_node_all_named_actors() { + let head_config = SystemConfig::standalone().with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + let initial_count = head_system.all_named_actors().await.len(); + + let path1 = ActorPath::new("test/head1").unwrap(); + let path2 = ActorPath::new("test/head2").unwrap(); + + let _ref1 = head_system + .spawn_named(path1, "head1", TestActor) + .await + .unwrap(); + let _ref2 = head_system + .spawn_named(path2, "head2", TestActor) + .await + .unwrap(); + + let all = head_system.all_named_actors().await; + assert!(all.len() >= initial_count + 2); + + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_node_resolve_named_actor() { + let head_config = SystemConfig::standalone().with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + let path = ActorPath::new("test/head_resolve").unwrap(); + let _ref = head_system + .spawn_named(path.clone(), "head_resolve", TestActor) + .await + .unwrap(); + + // Should be able to resolve + let resolved = head_system.resolve_named(&path, None).await; + assert!(resolved.is_ok()); + + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_node_named_actor_instances() { + let head_config = SystemConfig::standalone().with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + let path = ActorPath::new("test/head_instances").unwrap(); + let _ref = head_system + .spawn_named(path.clone(), "head_instances", TestActor) + .await + .unwrap(); + + // Get instances + let instances = head_system.get_named_instances(&path).await; + assert!(!instances.is_empty()); + assert!(instances + .iter() + .any(|m| m.node_id == *head_system.node_id())); + + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_node_with_custom_config() { + use pulsing_actor::cluster::HeadNodeConfig; + + let config = HeadNodeConfig { + sync_interval: Duration::from_secs(2), + heartbeat_interval: Duration::from_secs(5), + heartbeat_timeout: Duration::from_secs(15), + }; + + let head_config = SystemConfig::standalone() + .with_head_node() + .with_head_node_config(config); + let head_system = ActorSystem::new(head_config).await.unwrap(); + + // Should work normally + let members = head_system.members().await; + assert!(!members.is_empty()); + + head_system.shutdown().await.unwrap(); +} + +// ============================================================================ +// Head-Worker Integration Tests +// ============================================================================ + +#[tokio::test] +async fn test_head_worker_basic_interaction() { + // Create head node + let head_config = SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + let head_addr = head_system.addr(); + + // Create worker node connecting to head + let worker_config = + SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_addr(head_addr); + let worker_system = ActorSystem::new(worker_config).await.unwrap(); + + // Give some time for worker to register with head + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Head should see worker node + let head_members = head_system.members().await; + assert!(head_members.len() >= 2); // Head + Worker + assert!(head_members + .iter() + .any(|m| m.node_id == *worker_system.node_id())); + + // Worker should see head node (and itself) + let worker_members = worker_system.members().await; + assert!(worker_members.len() >= 2); // Head + Worker + assert!(worker_members + .iter() + .any(|m| m.node_id == *head_system.node_id())); + + worker_system.shutdown().await.unwrap(); + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_worker_named_actor_sync() { + // Create head node + let head_config = SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + let head_addr = head_system.addr(); + + // Create worker node + let worker_config = + SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_addr(head_addr); + let worker_system = ActorSystem::new(worker_config).await.unwrap(); + + // Give time for worker to register + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Register named actor on worker + let path = ActorPath::new("test/worker_actor").unwrap(); + let _ref = worker_system + .spawn_named(path.clone(), "worker_actor", TestActor) + .await + .unwrap(); + + // Give time for sync + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Head should see the named actor + let head_info = head_system.lookup_named(&path).await; + assert!(head_info.is_some()); + let head_info = head_info.unwrap(); + assert!(head_info.instance_nodes.contains(worker_system.node_id())); + + // Worker should also see it + let worker_info = worker_system.lookup_named(&path).await; + assert!(worker_info.is_some()); + + worker_system.shutdown().await.unwrap(); + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_worker_multiple_actors() { + // Create head node + let head_config = SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + let head_addr = head_system.addr(); + + // Create worker node + let worker_config = + SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_addr(head_addr); + let worker_system = ActorSystem::new(worker_config).await.unwrap(); + + // Give time for worker to register + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Register multiple actors on worker + let path1 = ActorPath::new("test/multi1").unwrap(); + let path2 = ActorPath::new("test/multi2").unwrap(); + + let _ref1 = worker_system + .spawn_named(path1.clone(), "multi1", TestActor) + .await + .unwrap(); + let _ref2 = worker_system + .spawn_named(path2.clone(), "multi2", TestActor) + .await + .unwrap(); + + // Give time for sync + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Head should see both actors + let all = head_system.all_named_actors().await; + let paths: Vec<_> = all.iter().map(|info| &info.path).collect(); + assert!(paths.contains(&&path1)); + assert!(paths.contains(&&path2)); + + worker_system.shutdown().await.unwrap(); + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_worker_actor_resolution() { + // Create head node + let head_config = SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + let head_addr = head_system.addr(); + + // Create worker node + let worker_config = + SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_addr(head_addr); + let worker_system = ActorSystem::new(worker_config).await.unwrap(); + + // Give time for worker to register + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Register actor on worker + let path = ActorPath::new("test/resolve_actor").unwrap(); + let _ref = worker_system + .spawn_named(path.clone(), "resolve_actor", TestActor) + .await + .unwrap(); + + // Give time for sync + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Head should be able to resolve the actor + let resolved = head_system.resolve_named(&path, None).await; + assert!(resolved.is_ok()); + + // Worker should also be able to resolve + let worker_resolved = worker_system.resolve_named(&path, None).await; + assert!(worker_resolved.is_ok()); + + worker_system.shutdown().await.unwrap(); + head_system.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_head_worker_actor_unregister() { + // Create head node + let head_config = SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_node(); + let head_system = ActorSystem::new(head_config).await.unwrap(); + let head_addr = head_system.addr(); + + // Create worker node + let worker_config = + SystemConfig::with_addr("127.0.0.1:0".parse().unwrap()).with_head_addr(head_addr); + let worker_system = ActorSystem::new(worker_config).await.unwrap(); + + // Give time for worker to register + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Register actor on worker + let path = ActorPath::new("test/unregister_actor").unwrap(); + let _ref = worker_system + .spawn_named(path.clone(), "unregister_actor", TestActor) + .await + .unwrap(); + + // Give time for sync + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Verify it's registered + assert!(head_system.lookup_named(&path).await.is_some()); + + // Unregister + worker_system.stop_named(&path).await.unwrap(); + + // Give time for sync + tokio::time::sleep(Duration::from_millis(2000)).await; + + // Head should no longer see it (or it should be marked as failed) + // Note: exact behavior depends on implementation + + worker_system.shutdown().await.unwrap(); + head_system.shutdown().await.unwrap(); +} diff --git a/crates/pulsing-actor/tests/http2_transport_tests.rs b/crates/pulsing-actor/tests/http2_transport_tests.rs index 4c159ff22..eb95c365e 100644 --- a/crates/pulsing-actor/tests/http2_transport_tests.rs +++ b/crates/pulsing-actor/tests/http2_transport_tests.rs @@ -128,6 +128,10 @@ impl Http2ServerHandler for TestHandler { serde_json::json!(actors) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } // ============================================================================ @@ -1047,6 +1051,10 @@ impl Http2ServerHandler for StreamingHandler { ) -> anyhow::Result>> { Ok(None) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[tokio::test] @@ -1233,6 +1241,10 @@ mod tracing_tests { ) -> anyhow::Result>> { Ok(None) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[test]