Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

**[中文文档](README.zh.md)**

**Lightweight distributed framework designed for high-performance AI applications.**
**Pulsing is a distributed actor framework that provides a communication backbone for building distributed systems, with specialized support for AI applications.**

🚀 **Zero Dependencies** — Pure Rust + Tokio, no NATS/etcd/Redis

Expand Down
2 changes: 1 addition & 1 deletion README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

**[English](README.md)**

**轻量级分布式框架,专为高性能 AI 应用设计。**
**Pulsing 是一个分布式 actor 框架,为构建分布式系统提供通信骨干,并为 AI 应用提供专门支持。**

🚀 **零外部依赖** — 纯 Rust + Tokio,无需 NATS/etcd/Redis

Expand Down
2 changes: 2 additions & 0 deletions crates/pulsing-actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ integration = []
otlp = ["opentelemetry-otlp"]
# Enable TLS support with passphrase-derived certificates
tls = ["dep:rustls", "dep:tokio-rustls", "dep:rcgen", "dep:ring", "dep:rustls-pemfile", "dep:time"]
# Enable test helpers module for testing in downstream crates
test-helper = []

[dependencies]
# Async runtime
Expand Down
128 changes: 43 additions & 85 deletions crates/pulsing-actor/src/actor/address.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
//! Actor addressing - URI-based actor addressing scheme
//!
//! This module implements the actor addressing scheme as defined in the design document.
//!
//! ## Address Types
//!
//! 1. Named Actor Service Address: `actor:///namespace/path/name`
//! 2. Named Actor Instance Address: `actor:///namespace/path/name@node_id`
//! 3. Global Actor Address: `actor://node_id/actor_id`
//! 4. Local Reference: `actor://0/actor_id` (node_id=0 means local)
//! Actor addressing (URI-based).

use super::traits::NodeId;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -65,14 +56,7 @@ impl fmt::Display for AddressParseError {

impl std::error::Error for AddressParseError {}

/// Actor path for named actors (namespace + hierarchical path + name)
///
/// A path must have at least two segments: namespace and name.
/// Additional segments can be used for logical grouping.
///
/// Examples:
/// - `services/llm/router` (namespace: services, name: router)
/// - `workers/inference/pool` (namespace: workers, name: pool)
/// Actor path for named actors (namespace + path + name).
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct ActorPath {
/// Path segments, e.g., ["services", "llm", "router"]
Expand All @@ -89,34 +73,10 @@ impl ActorPath {
/// Maximum single segment length
pub const MAX_SEGMENT_LENGTH: usize = 64;

/// Create a new actor path from a string
///
/// The path must have at least two segments (namespace/name).
///
/// # Validation Rules
/// - Path cannot be empty
/// - Path cannot exceed 256 characters
/// - Each segment cannot exceed 64 characters
/// - Each segment can only contain alphanumeric characters, `_`, and `-`
/// - Path must have at least two segments (namespace/name)
/// - User code cannot use reserved system namespaces (use `new_system` for internal use)
///
/// # Examples
/// ```
/// use pulsing_actor::actor::ActorPath;
///
/// let path = ActorPath::new("services/llm/router").unwrap();
/// assert_eq!(path.namespace(), "services");
/// assert_eq!(path.name(), "router");
///
/// // These will fail:
/// // ActorPath::new("system/internal").unwrap(); // Reserved namespace
/// // ActorPath::new("a".repeat(300)).unwrap(); // Too long
/// ```
pub fn new(path: impl AsRef<str>) -> Result<Self, AddressParseError> {
let path = path.as_ref().trim_matches('/');
/// Validate and parse path components (shared by `new()` and `new_system()`).
fn validate_path_components(path: &str) -> Result<Vec<String>, AddressParseError> {
let path = path.trim_matches('/');

// Check total path length
if path.len() > Self::MAX_PATH_LENGTH {
return Err(AddressParseError::PathTooLong);
}
Expand All @@ -127,7 +87,6 @@ impl ActorPath {

let segments: Vec<String> = path.split('/').map(|s| s.trim().to_string()).collect();

// Validate segments
for segment in &segments {
if segment.is_empty() {
return Err(AddressParseError::EmptySegment);
Expand All @@ -140,11 +99,47 @@ impl ActorPath {
}
}

// Must have at least namespace/name
if segments.len() < 2 {
return Err(AddressParseError::MissingNamespace);
}

Ok(segments)
}

/// Check if a segment contains only valid characters
fn is_valid_segment(s: &str) -> bool {
!s.is_empty()
&& s.chars()
.all(|c| c.is_alphanumeric() || c == '_' || c == '-')
}

/// Create a new actor path from a string
///
/// The path must have at least two segments (namespace/name).
///
/// # Validation Rules
/// - Path cannot be empty
/// - Path cannot exceed 256 characters
/// - Each segment cannot exceed 64 characters
/// - Each segment can only contain alphanumeric characters, `_`, and `-`
/// - Path must have at least two segments (namespace/name)
/// - User code cannot use reserved system namespaces (use `new_system` for internal use)
///
/// # Examples
/// ```
/// use pulsing_actor::actor::ActorPath;
///
/// let path = ActorPath::new("services/llm/router").unwrap();
/// assert_eq!(path.namespace(), "services");
/// assert_eq!(path.name(), "router");
///
/// // These will fail:
/// // ActorPath::new("system/internal").unwrap(); // Reserved namespace
/// // ActorPath::new("a".repeat(300)).unwrap(); // Too long
/// ```
pub fn new(path: impl AsRef<str>) -> Result<Self, AddressParseError> {
let segments = Self::validate_path_components(path.as_ref())?;

// Check for reserved system namespaces (user code cannot use these)
if Self::SYSTEM_NAMESPACES.contains(&segments[0].as_str()) {
return Err(AddressParseError::ReservedNamespace);
Expand All @@ -164,47 +159,10 @@ impl ActorPath {
/// - Python bindings for `PythonActorService` at `system/python_actor_service`
#[doc(hidden)]
pub fn new_system(path: impl AsRef<str>) -> Result<Self, AddressParseError> {
let path = path.as_ref().trim_matches('/');

// Check total path length
if path.len() > Self::MAX_PATH_LENGTH {
return Err(AddressParseError::PathTooLong);
}

if path.is_empty() {
return Err(AddressParseError::MissingNamespace);
}

let segments: Vec<String> = path.split('/').map(|s| s.trim().to_string()).collect();

// Validate segments
for segment in &segments {
if segment.is_empty() {
return Err(AddressParseError::EmptySegment);
}
if segment.len() > Self::MAX_SEGMENT_LENGTH {
return Err(AddressParseError::SegmentTooLong);
}
if !Self::is_valid_segment(segment) {
return Err(AddressParseError::InvalidCharacter);
}
}

// Must have at least namespace/name
if segments.len() < 2 {
return Err(AddressParseError::MissingNamespace);
}

let segments = Self::validate_path_components(path.as_ref())?;
Ok(Self { segments })
}

/// Check if a segment contains only valid characters
fn is_valid_segment(s: &str) -> bool {
!s.is_empty()
&& s.chars()
.all(|c| c.is_alphanumeric() || c == '_' || c == '-')
}

/// Get the namespace (first segment)
pub fn namespace(&self) -> &str {
&self.segments[0]
Expand Down
56 changes: 6 additions & 50 deletions crates/pulsing-actor/src/actor/context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Actor execution context
//! Actor execution context.

use super::mailbox::Envelope;
use super::reference::ActorRef;
Expand All @@ -10,58 +10,38 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

/// Context provided to actors during message handling
///
/// Provides access to:
/// - `id()` - The actor's assigned ID
/// - `actor_ref()` - Get references to other actors
/// - `watch()`/`unwatch()` - Monitor other actors
/// - `schedule_self()` - Schedule a delayed message to self
/// - `is_cancelled()` - Check if shutdown was requested
/// Context provided to actors during message handling.
pub struct ActorContext {
/// The actor's own ID
actor_id: ActorId,

/// Local node ID
node_id: Option<NodeId>,

/// Cancellation token for graceful shutdown
cancel_token: CancellationToken,

/// Cached actor references
actor_refs: HashMap<ActorId, ActorRef>,

/// System reference for spawning new actors
system: Option<Arc<dyn ActorSystemRef>>,

/// Self mailbox sender for schedule_self
self_sender: Option<mpsc::Sender<Envelope>>,

/// Named path (if this is a named actor)
named_path: Option<String>,
}

/// Trait for system reference (to avoid circular dependency)
/// Trait for system reference.
#[async_trait::async_trait]
pub trait ActorSystemRef: Send + Sync {
/// Get an actor reference by ID
async fn actor_ref(&self, id: &ActorId) -> anyhow::Result<ActorRef>;

/// Get the local node ID
fn node_id(&self) -> NodeId;

/// Watch an actor - will receive a termination message (ActorId, StopReason) when the watched actor stops
async fn watch(&self, watcher: &ActorId, target: &ActorId) -> anyhow::Result<()>;

/// Stop watching an actor
async fn unwatch(&self, watcher: &ActorId, target: &ActorId) -> anyhow::Result<()>;

/// Get a local actor reference by name (for behavior-based actors)
fn local_actor_ref_by_name(&self, name: &str) -> Option<ActorRef>;
}

impl ActorContext {
/// Create a new context (for testing)
pub fn new(actor_id: ActorId) -> Self {
Self {
actor_id,
Expand All @@ -74,7 +54,6 @@ impl ActorContext {
}
}

/// Create context with system reference
pub fn with_system(
actor_id: ActorId,
system: Arc<dyn ActorSystemRef>,
Expand All @@ -93,7 +72,6 @@ impl ActorContext {
}
}

/// Create context with system reference and named path
pub fn with_system_and_name(
actor_id: ActorId,
system: Arc<dyn ActorSystemRef>,
Expand All @@ -113,44 +91,35 @@ impl ActorContext {
}
}

/// Get the named path (if this is a named actor)
pub fn named_path(&self) -> Option<&str> {
self.named_path.as_deref()
}

/// Get a reference to the actor system (if available)
pub fn system(&self) -> Option<Arc<dyn ActorSystemRef>> {
self.system.clone()
}

/// Get the actor's ID
pub fn id(&self) -> &ActorId {
&self.actor_id
}

/// Get the local node ID
pub fn node_id(&self) -> Option<&NodeId> {
self.node_id.as_ref()
}

/// Get the cancellation token
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}

/// Check if shutdown was requested
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}

/// Get an actor reference
pub async fn actor_ref(&mut self, id: &ActorId) -> anyhow::Result<ActorRef> {
// Check cache first
if let Some(r) = self.actor_refs.get(id) {
return Ok(r.clone());
}

// Get from system
if let Some(ref system) = self.system {
let r = system.actor_ref(id).await?;
self.actor_refs.insert(*id, r.clone());
Expand All @@ -160,18 +129,7 @@ impl ActorContext {
Err(anyhow::anyhow!("No system reference available"))
}

/// Schedule a delayed message to self
///
/// Sends a message to this actor after the specified delay.
/// The message is serialized and sent as a fire-and-forget (tell pattern).
///
/// # Example
/// ```ignore
/// ctx.schedule_self(MyMessage { value: 42 }, Duration::from_secs(5));
/// ```
///
/// # Panics
/// Returns an error if the actor context doesn't have a self sender (e.g., in tests).
/// Schedule a delayed message to self.
pub fn schedule_self<M: Serialize + Send + 'static>(
&self,
msg: M,
Expand All @@ -181,10 +139,8 @@ impl ActorContext {
anyhow::anyhow!("No self sender available (context not fully initialized)")
})?;

// Serialize the message
let message = Message::pack(&msg)?;

// Spawn a task that waits for the delay and then sends the message
tokio::spawn(async move {
tokio::time::sleep(delay).await;
let envelope = Envelope::tell(message);
Expand All @@ -196,7 +152,7 @@ impl ActorContext {
Ok(())
}

/// Watch another actor - will receive a termination message (ActorId, StopReason) when it stops
/// Watch another actor.
pub async fn watch(&self, target: &ActorId) -> anyhow::Result<()> {
if let Some(ref system) = self.system {
system.watch(&self.actor_id, target).await
Expand All @@ -205,7 +161,7 @@ impl ActorContext {
}
}

/// Stop watching another actor
/// Stop watching another actor.
pub async fn unwatch(&self, target: &ActorId) -> anyhow::Result<()> {
if let Some(ref system) = self.system {
system.unwatch(&self.actor_id, target).await
Expand Down
Loading
Loading