Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/pulsing-actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ futures = { workspace = true }

# Concurrent data structures
dashmap = { workspace = true }
lru = "0.12"

# Error handling
anyhow = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions crates/pulsing-actor/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use super::mailbox::Envelope;
use super::reference::ActorRef;
use super::traits::{ActorId, Message, NodeId};
use lru::LruCache;
use serde::Serialize;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
Expand All @@ -14,7 +15,7 @@ use tokio_util::sync::CancellationToken;
pub struct ActorContext {
actor_id: ActorId,
cancel_token: CancellationToken,
actor_refs: HashMap<ActorId, ActorRef>,
actor_refs: LruCache<ActorId, ActorRef>,
system: Arc<dyn ActorSystemRef>,
self_sender: mpsc::Sender<Envelope>,
named_path: Option<String>,
Expand Down Expand Up @@ -48,7 +49,7 @@ impl ActorContext {
Self {
actor_id,
cancel_token,
actor_refs: HashMap::new(),
actor_refs: LruCache::new(NonZeroUsize::new(1024).expect("1024 is non-zero")),
system,
self_sender,
named_path,
Expand Down Expand Up @@ -107,7 +108,7 @@ impl ActorContext {
}

let r = self.system.actor_ref(id).await?;
self.actor_refs.insert(*id, r.clone());
self.actor_refs.put(*id, r.clone());
Ok(r)
}

Expand Down
1 change: 1 addition & 0 deletions crates/pulsing-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing-subscriber = { workspace = true }
reqwest = { workspace = true }
pythonize = "0.23"
uuid = { workspace = true }
crossbeam-channel = "0.5"

[dependencies.pyo3]
version = "0.23.4"
Expand Down
19 changes: 11 additions & 8 deletions crates/pulsing-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ pub use python_executor::{init_python_executor, python_executor, ExecutorError};
/// - Load balancing policies: Random, RoundRobin, PowerOfTwo, ConsistentHash, CacheAware
#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Initialize tracing for logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.try_init()
.ok();
// Initialize tracing for logging (only if PULSING_INIT_TRACING is set)
// This allows applications to control their own tracing configuration
if std::env::var("PULSING_INIT_TRACING").is_ok() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.try_init()
.ok();
}

// Add error classes
errors::add_to_module(m)?;
Expand Down
24 changes: 9 additions & 15 deletions crates/pulsing-py/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! Avoids GIL contention with Tokio's async runtime by isolating Python
//! execution to a fixed-size thread pool (default: 4 threads).

use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex, OnceLock};
use crossbeam_channel::{unbounded, Sender};
use std::sync::OnceLock;
use std::thread::{self, JoinHandle};
use tokio::sync::oneshot;

Expand All @@ -16,14 +16,13 @@ type PythonTask = Box<dyn FnOnce() + Send + 'static>;

/// Dedicated thread pool for Python code execution.
pub struct PythonExecutor {
sender: Mutex<Sender<PythonTask>>,
sender: Sender<PythonTask>,
_threads: Vec<JoinHandle<()>>,
}

impl PythonExecutor {
pub fn new(num_threads: usize) -> Self {
let (sender, receiver) = mpsc::channel::<PythonTask>();
let receiver = Arc::new(Mutex::new(receiver));
let (sender, receiver) = unbounded::<PythonTask>();

let threads: Vec<_> = (0..num_threads)
.map(|i| {
Expand All @@ -33,11 +32,7 @@ impl PythonExecutor {
.spawn(move || {
tracing::debug!("Python executor thread {} started", i);
loop {
let task = {
let guard = rx.lock().unwrap();
guard.recv()
};
match task {
match rx.recv() {
Ok(task) => task(),
Err(_) => {
tracing::debug!("Python executor thread {} shutting down", i);
Expand All @@ -53,7 +48,7 @@ impl PythonExecutor {
tracing::info!("Python executor initialized with {} threads", num_threads);

Self {
sender: Mutex::new(sender),
sender,
_threads: threads,
}
}
Expand All @@ -71,8 +66,6 @@ impl PythonExecutor {
});

self.sender
.lock()
.map_err(|_| ExecutorError::ChannelClosed)?
.send(task)
.map_err(|_| ExecutorError::ChannelClosed)?;

Expand Down Expand Up @@ -105,6 +98,7 @@ pub fn init_python_executor(num_threads: usize) -> Result<(), &'static str> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

#[tokio::test]
async fn test_executor_basic() {
Expand All @@ -124,13 +118,13 @@ mod tests {
})
.collect();

let results: Vec<_> = futures::future::join_all(handles)
let results: Vec<i32> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();

let expected: Vec<_> = (0..10).map(|i| i * 2).collect();
let expected: Vec<i32> = (0..10).map(|i| i * 2).collect();
assert_eq!(results, expected);
}
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ module-name = "pulsing._core"
manifest-path = "crates/pulsing-py/Cargo.toml"
python-packages = ["pulsing"]
python-source = "python"
sdist-include = ["LICENSE", "README.md"]

[build-system]
requires = ["maturin>=1.0,<2.0"]
Expand Down
Loading
Loading