From 1d9d2235ba4e891ec1610d4b1593b8a019bb70aa Mon Sep 17 00:00:00 2001 From: Reiase Date: Mon, 26 Jan 2026 00:46:41 +0800 Subject: [PATCH] Update dependencies and enhance Python integration - Added new dependencies including `allocator-api2`, `crossbeam-channel`, `foldhash`, and `hashbrown` to improve performance and memory management. - Updated `pyproject.toml` to include `crossbeam-channel` for better concurrency handling in Python. - Refactored `ActorContext` to utilize `LruCache` for actor references, optimizing memory usage. - Enhanced Python executor to use `crossbeam-channel` for task management, improving efficiency. - Introduced protocol version handling in Python for better compatibility and flexibility in message processing. - Updated `StorageManager` methods to ensure consistent handling of node IDs as strings for improved reliability in distributed scenarios. --- Cargo.lock | 43 +++++ crates/pulsing-actor/Cargo.toml | 1 + crates/pulsing-actor/src/actor/context.rs | 9 +- crates/pulsing-py/Cargo.toml | 1 + crates/pulsing-py/src/lib.rs | 19 +- crates/pulsing-py/src/python_executor.rs | 24 +-- pyproject.toml | 1 + python/pulsing/actor/remote.py | 205 +++++++++++++++++++--- python/pulsing/queue/manager.py | 50 +++--- 9 files changed, 280 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 335372603..7ab3c760e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,6 +31,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -363,6 +369,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -650,6 +665,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -849,6 +870,17 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.0" @@ -1312,6 +1344,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1794,6 +1835,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", + "lru", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -1863,6 +1905,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "crossbeam-channel", "futures", "pulsing-actor", "pyo3", diff --git a/crates/pulsing-actor/Cargo.toml b/crates/pulsing-actor/Cargo.toml index 4f2ab62f6..6950f25f5 100644 --- a/crates/pulsing-actor/Cargo.toml +++ b/crates/pulsing-actor/Cargo.toml @@ -37,6 +37,7 @@ futures = { workspace = true } # Concurrent data structures dashmap = { workspace = true } +lru = "0.12" # Error handling anyhow = { workspace = true } diff --git a/crates/pulsing-actor/src/actor/context.rs b/crates/pulsing-actor/src/actor/context.rs index dfdb2afd2..4c22f81ce 100644 --- a/crates/pulsing-actor/src/actor/context.rs +++ b/crates/pulsing-actor/src/actor/context.rs @@ -3,8 +3,9 @@ use super::mailbox::Envelope; use super::reference::ActorRef; use super::traits::{ActorId, Message, NodeId}; +use lru::LruCache; use serde::Serialize; -use std::collections::HashMap; +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -14,7 +15,7 @@ use tokio_util::sync::CancellationToken; pub struct ActorContext { actor_id: ActorId, cancel_token: CancellationToken, - actor_refs: HashMap, + actor_refs: LruCache, system: Arc, self_sender: mpsc::Sender, named_path: Option, @@ -48,7 +49,7 @@ impl ActorContext { Self { actor_id, cancel_token, - actor_refs: HashMap::new(), + actor_refs: LruCache::new(NonZeroUsize::new(1024).expect("1024 is non-zero")), system, self_sender, named_path, @@ -107,7 +108,7 @@ impl ActorContext { } let r = self.system.actor_ref(id).await?; - self.actor_refs.insert(*id, r.clone()); + self.actor_refs.put(*id, r.clone()); Ok(r) } diff --git a/crates/pulsing-py/Cargo.toml b/crates/pulsing-py/Cargo.toml index 6bb690a3c..ebcf875f5 100644 --- a/crates/pulsing-py/Cargo.toml +++ b/crates/pulsing-py/Cargo.toml @@ -28,6 +28,7 @@ tracing-subscriber = { workspace = true } reqwest = { workspace = true } pythonize = "0.23" uuid = { workspace = true } +crossbeam-channel = "0.5" [dependencies.pyo3] version = "0.23.4" diff --git a/crates/pulsing-py/src/lib.rs b/crates/pulsing-py/src/lib.rs index a9f34e6ea..6eaea2298 100644 --- a/crates/pulsing-py/src/lib.rs +++ b/crates/pulsing-py/src/lib.rs @@ -23,14 +23,17 @@ pub use python_executor::{init_python_executor, python_executor, ExecutorError}; /// - Load balancing policies: Random, RoundRobin, PowerOfTwo, ConsistentHash, CacheAware #[pymodule] fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { - // Initialize tracing for logging - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::from_default_env() - .add_directive(tracing::Level::INFO.into()), - ) - .try_init() - .ok(); + // Initialize tracing for logging (only if PULSING_INIT_TRACING is set) + // This allows applications to control their own tracing configuration + if std::env::var("PULSING_INIT_TRACING").is_ok() { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::from_default_env() + .add_directive(tracing::Level::INFO.into()), + ) + .try_init() + .ok(); + } // Add error classes errors::add_to_module(m)?; diff --git a/crates/pulsing-py/src/python_executor.rs b/crates/pulsing-py/src/python_executor.rs index e0375527e..c13072586 100644 --- a/crates/pulsing-py/src/python_executor.rs +++ b/crates/pulsing-py/src/python_executor.rs @@ -3,8 +3,8 @@ //! Avoids GIL contention with Tokio's async runtime by isolating Python //! execution to a fixed-size thread pool (default: 4 threads). -use std::sync::mpsc::{self, Sender}; -use std::sync::{Arc, Mutex, OnceLock}; +use crossbeam_channel::{unbounded, Sender}; +use std::sync::OnceLock; use std::thread::{self, JoinHandle}; use tokio::sync::oneshot; @@ -16,14 +16,13 @@ type PythonTask = Box; /// Dedicated thread pool for Python code execution. pub struct PythonExecutor { - sender: Mutex>, + sender: Sender, _threads: Vec>, } impl PythonExecutor { pub fn new(num_threads: usize) -> Self { - let (sender, receiver) = mpsc::channel::(); - let receiver = Arc::new(Mutex::new(receiver)); + let (sender, receiver) = unbounded::(); let threads: Vec<_> = (0..num_threads) .map(|i| { @@ -33,11 +32,7 @@ impl PythonExecutor { .spawn(move || { tracing::debug!("Python executor thread {} started", i); loop { - let task = { - let guard = rx.lock().unwrap(); - guard.recv() - }; - match task { + match rx.recv() { Ok(task) => task(), Err(_) => { tracing::debug!("Python executor thread {} shutting down", i); @@ -53,7 +48,7 @@ impl PythonExecutor { tracing::info!("Python executor initialized with {} threads", num_threads); Self { - sender: Mutex::new(sender), + sender, _threads: threads, } } @@ -71,8 +66,6 @@ impl PythonExecutor { }); self.sender - .lock() - .map_err(|_| ExecutorError::ChannelClosed)? .send(task) .map_err(|_| ExecutorError::ChannelClosed)?; @@ -105,6 +98,7 @@ pub fn init_python_executor(num_threads: usize) -> Result<(), &'static str> { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; #[tokio::test] async fn test_executor_basic() { @@ -124,13 +118,13 @@ mod tests { }) .collect(); - let results: Vec<_> = futures::future::join_all(handles) + let results: Vec = futures::future::join_all(handles) .await .into_iter() .map(|r| r.unwrap()) .collect(); - let expected: Vec<_> = (0..10).map(|i| i * 2).collect(); + let expected: Vec = (0..10).map(|i| i * 2).collect(); assert_eq!(results, expected); } } diff --git a/pyproject.toml b/pyproject.toml index be8e93381..45c0a712f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ module-name = "pulsing._core" manifest-path = "crates/pulsing-py/Cargo.toml" python-packages = ["pulsing"] python-source = "python" +sdist-include = ["LICENSE", "README.md"] [build-system] requires = ["maturin>=1.0,<2.0"] diff --git a/python/pulsing/actor/remote.py b/python/pulsing/actor/remote.py index dc1515685..5a33b0f7d 100644 --- a/python/pulsing/actor/remote.py +++ b/python/pulsing/actor/remote.py @@ -3,6 +3,7 @@ import asyncio import inspect import logging +import os import random import uuid from abc import ABC, abstractmethod @@ -11,6 +12,147 @@ from pulsing._core import ActorRef, ActorSystem, Message, StreamMessage from pulsing.exceptions import PulsingActorError, PulsingRuntimeError +# Protocol version configuration +# Default to v1 for backward compatibility +_DEFAULT_PROTOCOL_VERSION = int(os.getenv("PULSING_PROTOCOL_VERSION", "1")) + + +def _get_protocol_version() -> int: + """Get protocol version from environment or default to v1.""" + return _DEFAULT_PROTOCOL_VERSION + + +def _detect_protocol_version(msg: dict) -> int: + """Auto-detect protocol version from message. + + Returns: + 1 for v1 protocol, 2 for v2 protocol + """ + if "__pulsing_proto__" in msg: + version_str = msg["__pulsing_proto__"] + if isinstance(version_str, str) and version_str.startswith("v"): + return int(version_str[1:]) + return int(version_str) + # v1 compatibility: check for __call__ field + if "__call__" in msg: + return 1 + return 1 # default to v1 + + +def _wrap_call_v1(method: str, args: tuple, kwargs: dict, is_async: bool) -> dict: + """v1 protocol: legacy format (backward compatible). + + Format: + { + "__call__": method_name, + "args": args, + "kwargs": kwargs, + "__async__": is_async + } + """ + return { + "__call__": method, + "args": args, + "kwargs": kwargs, + "__async__": is_async, + } + + +def _wrap_call_v2(method: str, args: tuple, kwargs: dict, is_async: bool) -> dict: + """v2 protocol: namespace isolation. + + Format: + { + "__pulsing_proto__": "v2", + "__pulsing__": { + "call": method_name, + "async": is_async + }, + "user_data": { + "args": args, + "kwargs": kwargs + } + } + """ + return { + "__pulsing_proto__": "v2", + "__pulsing__": { + "call": method, + "async": is_async, + }, + "user_data": { + "args": args, + "kwargs": kwargs, + }, + } + + +def _unwrap_call(msg: dict) -> tuple[str, tuple, dict, bool]: + """Unwrap call message, supporting both v1 and v2 protocols. + + Returns: + (method_name, args, kwargs, is_async) + """ + version = _detect_protocol_version(msg) + + if version == 2: + pulsing = msg.get("__pulsing__", {}) + user_data = msg.get("user_data", {}) + return ( + pulsing.get("call", ""), + tuple(user_data.get("args", ())), + dict(user_data.get("kwargs", {})), + pulsing.get("async", False), + ) + else: # v1 + return ( + msg.get("__call__", ""), + tuple(msg.get("args", ())), + dict(msg.get("kwargs", {})), + msg.get("__async__", False), + ) + + +def _wrap_response_v1(result: Any = None, error: str | None = None) -> dict: + """v1 protocol response format.""" + if error: + return {"__error__": error} + return {"__result__": result} + + +def _wrap_response_v2(result: Any = None, error: str | None = None) -> dict: + """v2 protocol response format.""" + if error: + return { + "__pulsing_proto__": "v2", + "__pulsing__": {"error": error}, + "user_data": {}, + } + return { + "__pulsing_proto__": "v2", + "__pulsing__": {"result": result}, + "user_data": {}, + } + + +def _unwrap_response(resp: dict) -> tuple[Any, str | None]: + """Unwrap response, supporting both v1 and v2 protocols. + + Returns: + (result, error) - one of them will be None + """ + version = _detect_protocol_version(resp) + + if version == 2: + pulsing = resp.get("__pulsing__", {}) + if "error" in pulsing: + return (None, pulsing["error"]) + return (pulsing.get("result"), None) + else: # v1 + if "__error__" in resp: + return (None, resp["__error__"]) + return (resp.get("__result__"), None) + def _convert_rust_error(err: RuntimeError) -> Exception: """Convert Rust-raised RuntimeError to appropriate Pulsing exception. @@ -188,25 +330,27 @@ def __call__(self, *args, **kwargs): async def _sync_call(self, *args, **kwargs) -> Any: """Synchronous method call.""" - call_msg = { - "__call__": self._method, - "args": args, - "kwargs": kwargs, - "__async__": False, - } + # Use configured protocol version (default v1) + protocol_version = _get_protocol_version() + if protocol_version == 2: + call_msg = _wrap_call_v2(self._method, args, kwargs, False) + else: + call_msg = _wrap_call_v1(self._method, args, kwargs, False) + resp = await self._ref.ask(call_msg) if isinstance(resp, dict): - if "__error__" in resp: + result, error = _unwrap_response(resp) + if error: # Actor execution error try: raise PulsingActorError( - resp["__error__"], actor_name=str(self._ref.actor_id.id) + error, actor_name=str(self._ref.actor_id.id) ) except RuntimeError as e: # If it's a Rust error, convert it raise _convert_rust_error(e) from e - return resp.get("__result__") + return result elif isinstance(resp, Message): if resp.is_stream: return _SyncGeneratorStreamReader(resp) @@ -247,12 +391,12 @@ def __init__( async def _get_stream(self): """Get stream (lazy initialization)""" if self._stream_reader is None: - call_msg = { - "__call__": self._method, - "args": self._args, - "kwargs": self._kwargs, - "__async__": True, - } + # Use configured protocol version (default v1) + protocol_version = _get_protocol_version() + if protocol_version == 2: + call_msg = _wrap_call_v2(self._method, self._args, self._kwargs, True) + else: + call_msg = _wrap_call_v1(self._method, self._args, self._kwargs, True) resp = await self._ref.ask(call_msg) # Response may be PyMessage (streaming) or direct Python object @@ -399,19 +543,24 @@ def on_stop(self) -> None: self._instance.on_stop() async def receive(self, msg) -> Any: - # Handle new dict-based call format (Python-to-Python) - if isinstance(msg, dict) and "__call__" in msg: - method = msg["__call__"] - args = msg.get("args", ()) - kwargs = msg.get("kwargs", {}) - is_async_call = msg.get("__async__", False) + # Handle dict-based call format (supporting both v1 and v2) + if isinstance(msg, dict): + # Detect protocol version + version = _detect_protocol_version(msg) + method, args, kwargs, is_async_call = _unwrap_call(msg) if not method or method.startswith("_"): - return {"__error__": f"Invalid method: {method}"} + error_msg = f"Invalid method: {method}" + if version == 2: + return _wrap_response_v2(error=error_msg) + return _wrap_response_v1(error=error_msg) func = getattr(self._instance, method, None) if func is None or not callable(func): - return {"__error__": f"Not found: {method}"} + error_msg = f"Not found: {method}" + if version == 2: + return _wrap_response_v2(error=error_msg) + return _wrap_response_v1(error=error_msg) # Detect if it's an async method (including async generators) is_async_method = ( @@ -439,9 +588,15 @@ async def receive(self, msg) -> Any: return self._handle_generator_result(result) if asyncio.iscoroutine(result): result = await result - return {"__result__": result} + # Use same protocol version as request + if version == 2: + return _wrap_response_v2(result=result) + return _wrap_response_v1(result=result) except Exception as e: - return {"__error__": str(e)} + error_msg = str(e) + if version == 2: + return _wrap_response_v2(error=error_msg) + return _wrap_response_v1(error=error_msg) # Handle legacy Message-based call format (for Rust actor compatibility) if isinstance(msg, Message): diff --git a/python/pulsing/queue/manager.py b/python/pulsing/queue/manager.py index 023277301..412d67dcb 100644 --- a/python/pulsing/queue/manager.py +++ b/python/pulsing/queue/manager.py @@ -18,7 +18,7 @@ STORAGE_MANAGER_NAME = "queue_storage_manager" -def _compute_owner(bucket_key: str, nodes: list[dict]) -> int | None: +def _compute_owner(bucket_key: str, nodes: list[dict]) -> str | None: """Compute owner node ID based on bucket key Uses Rendezvous Hashing (highest random weight hashing) to ensure: @@ -27,6 +27,9 @@ def _compute_owner(bucket_key: str, nodes: list[dict]) -> int | None: 3. Natural uniform load distribution Algorithm: Calculate score for each (key, node) combination, select node with highest score + + Returns: + Node ID as string (unified representation for consistent comparison) """ if not nodes: return None @@ -48,14 +51,14 @@ def _compute_owner(bucket_key: str, nodes: list[dict]) -> int | None: node_id = node.get("node_id") if node_id is None: continue - # node_id is u128 integer, convert to string for consistent hashing + # Normalize node_id to string for consistent hashing and comparison node_id_str = str(node_id) # Combine key and node_id to calculate hash score combined = f"{bucket_key}:{node_id_str}" score = int(hashlib.md5(combined.encode()).hexdigest(), 16) if score > best_score: best_score = score - best_node_id = node_id # Keep as integer + best_node_id = node_id_str # Keep as string for unified comparison return best_node_id @@ -218,7 +221,7 @@ async def get_bucket( owner_node_id = _compute_owner(bucket_key, members) local_node_id = str(self.system.node_id.id) - if owner_node_id is None or owner_node_id == local_node_id: + if owner_node_id is None or str(owner_node_id) == local_node_id: # This node is responsible, create/return bucket bucket_ref = await self._get_or_create_bucket( topic, bucket_id, batch_size, storage_path, backend, backend_options @@ -235,7 +238,7 @@ async def get_bucket( owner_addr = None for m in members: m_node_id = m.get("node_id") - if m_node_id is not None and m_node_id == owner_node_id: + if m_node_id is not None and str(m_node_id) == str(owner_node_id): owner_addr = m.get("addr") break @@ -260,7 +263,7 @@ async def get_topic(self, topic: str) -> dict: owner_node_id = _compute_owner(topic_key, members) local_node_id = str(self.system.node_id.id) - if owner_node_id is None or owner_node_id == local_node_id: + if owner_node_id is None or str(owner_node_id) == local_node_id: # This node is responsible, create/return topic broker broker_ref = await self._get_or_create_topic_broker(topic) return { @@ -274,7 +277,7 @@ async def get_topic(self, topic: str) -> dict: owner_addr = None for m in members: m_node_id = m.get("node_id") - if m_node_id is not None and m_node_id == owner_node_id: + if m_node_id is not None and str(m_node_id) == str(owner_node_id): owner_addr = m.get("addr") break @@ -427,42 +430,44 @@ async def get_bucket_ref( elif msg_type == "Redirect": # Need to redirect to other node - # owner_node_id transmitted as string, convert to int - owner_node_id_str = resp_data.get("owner_node_id") - owner_node_id = int(owner_node_id_str) + # owner_node_id transmitted as string, keep as string for comparison + owner_node_id_str = str(resp_data.get("owner_node_id")) owner_addr = resp_data.get("owner_addr") logger.debug( - f"Redirecting bucket {topic}:{bucket_id} to node {owner_node_id} @ {owner_addr}" + f"Redirecting bucket {topic}:{bucket_id} to node {owner_node_id_str} @ {owner_addr}" ) if redirect_count >= max_redirects: raise RuntimeError(f"Too many redirects for bucket {topic}:{bucket_id}") # Check if redirecting to self (avoid infinite loop) - if owner_node_id == system.node_id.id: + # Compare as strings for consistency + if str(owner_node_id_str) == str(system.node_id.id): raise RuntimeError( f"Redirect loop detected for bucket {topic}:{bucket_id}" ) # Get owner node's StorageManager (with retry, wait for remote node initialization) + # Convert to int for resolve_named which expects int + owner_node_id_int = int(owner_node_id_str) max_resolve_retries = 10 for resolve_retry in range(max_resolve_retries): try: manager = await StorageManager.resolve( - STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id + STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id_int ) break except Exception as e: if resolve_retry < max_resolve_retries - 1: logger.debug( - f"StorageManager not found on node {owner_node_id}, " + f"StorageManager not found on node {owner_node_id_str}, " f"retry {resolve_retry + 1}/{max_resolve_retries}" ) await asyncio.sleep(0.5) else: raise RuntimeError( - f"StorageManager not found on node {owner_node_id} after " + f"StorageManager not found on node {owner_node_id_str} after " f"{max_resolve_retries} retries: {e}" ) from e @@ -502,22 +507,25 @@ async def get_topic_broker( return await TopicBroker.resolve(actor_name, system=system) elif msg_type == "Redirect": - # owner_node_id transmitted as string, convert to int - owner_node_id = int(resp_data["owner_node_id"]) + # owner_node_id transmitted as string, keep as string for comparison + owner_node_id_str = str(resp_data["owner_node_id"]) - logger.debug(f"Redirecting topic {topic} to node {owner_node_id}") + logger.debug(f"Redirecting topic {topic} to node {owner_node_id_str}") if redirect_count >= max_redirects: raise RuntimeError(f"Too many redirects for topic: {topic}") - if owner_node_id == system.node_id.id: + # Compare as strings for consistency + if str(owner_node_id_str) == str(system.node_id.id): raise RuntimeError(f"Redirect loop for topic: {topic}") # Get owner node's StorageManager via proxy + # Convert to int for resolve_named which expects int + owner_node_id_int = int(owner_node_id_str) for retry in range(10): try: manager = await StorageManager.resolve( - STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id + STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id_int ) break except Exception as e: @@ -525,7 +533,7 @@ async def get_topic_broker( await asyncio.sleep(0.5) else: raise RuntimeError( - f"StorageManager not found on node {owner_node_id}: {e}" + f"StorageManager not found on node {owner_node_id_str}: {e}" ) from e else: