diff --git a/docs/src/examples/llm_inference.md b/docs/src/examples/llm_inference.md index 05bf47832..f09a41e95 100644 --- a/docs/src/examples/llm_inference.md +++ b/docs/src/examples/llm_inference.md @@ -19,7 +19,12 @@ This guide shows how to run a **router + worker** LLM service with Pulsing, and The router needs an **actor system address** so workers can join the same cluster: ```bash -pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm +pulsing actor pulsing.actors.router.RouterActor \ + --addr 0.0.0.0:8000 \ + --http_host 0.0.0.0 \ + --http_port 8080 \ + --model_name my-llm \ + --worker_name worker ``` ## 2) Start workers @@ -29,13 +34,22 @@ You can run **one or more** workers. Each worker should join the router node via ### Option A: Transformers worker (Terminal B) ```bash -pulsing actor transformers --model gpt2 --device cpu --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --device cpu \ + --addr 0.0.0.0:8001 \ + --seeds 127.0.0.1:8000 \ + --name worker ``` ### Option B: vLLM worker (Terminal C) ```bash -pulsing actor vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000 +pulsing actor pulsing.actors.vllm.VllmWorker \ + --model Qwen/Qwen2.5-0.5B \ + --addr 0.0.0.0:8002 \ + --seeds 127.0.0.1:8000 \ + --name worker ``` ## 3) Verify cluster + workers @@ -43,7 +57,7 @@ pulsing actor vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0 ### List actors (observer mode) ```bash -pulsing actor list --endpoint 127.0.0.1:8000 +pulsing inspect actors --endpoint 127.0.0.1:8000 ``` ### Inspect cluster diff --git a/docs/src/examples/llm_inference.zh.md b/docs/src/examples/llm_inference.zh.md index 143d7d50f..a5ef977aa 100644 --- a/docs/src/examples/llm_inference.zh.md +++ b/docs/src/examples/llm_inference.zh.md @@ -19,7 +19,12 @@ Router 需要指定 **actor system 地址**,以便其它进程启动的 workers 加入同一集群: ```bash -pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm +pulsing actor pulsing.actors.router.RouterActor \ + --addr 0.0.0.0:8000 \ + --http_host 0.0.0.0 \ + --http_port 8080 \ + --model_name my-llm \ + --worker_name worker ``` ## 2)启动 Worker @@ -29,13 +34,22 @@ pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm ### 方案 A:Transformers Worker(终端 B) ```bash -pulsing actor transformers --model gpt2 --device cpu --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --device cpu \ + --addr 0.0.0.0:8001 \ + --seeds 127.0.0.1:8000 \ + --name worker ``` ### 方案 B:vLLM Worker(终端 C) ```bash -pulsing actor vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000 +pulsing actor pulsing.actors.vllm.VllmWorker \ + --model Qwen/Qwen2.5-0.5B \ + --addr 0.0.0.0:8002 \ + --seeds 127.0.0.1:8000 \ + --name worker ``` ## 3)验证集群与 worker @@ -43,7 +57,7 @@ pulsing actor vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0 ### 列出 actors(观察者模式) ```bash -pulsing actor list --endpoint 127.0.0.1:8000 +pulsing inspect actors --endpoint 127.0.0.1:8000 ``` ### 巡检集群 diff --git a/docs/src/guide/operations.md b/docs/src/guide/operations.md index 25d7ae3ff..628d69c7a 100644 --- a/docs/src/guide/operations.md +++ b/docs/src/guide/operations.md @@ -1,74 +1,208 @@ -# CLI Operations +# CLI Commands -Pulsing ships with built-in CLI tools for running, inspecting, and benchmarking distributed systems. +Pulsing ships with built-in CLI tools for starting actors, inspecting systems, and benchmarking distributed services. --- -## Running Services +## Starting Actors -### Router (OpenAI-compatible HTTP API) +The `pulsing actor` command starts actors by providing their full class path. The CLI automatically matches command-line arguments to the Actor's constructor parameters. + +### Format + +Actor type must be a full class path: +- Format: `module.path.ClassName` +- Example: `pulsing.actors.router.RouterActor` +- Example: `pulsing.actors.worker.TransformersWorker` +- Example: `pulsing.actors.vllm.VllmWorker` +- Example: `my_module.my_actor.MyCustomActor` + +### Examples + +#### Router (OpenAI-compatible HTTP API) + +```bash +pulsing actor pulsing.actors.router.RouterActor \ + --addr 0.0.0.0:8000 \ + --http_host 0.0.0.0 \ + --http_port 8080 \ + --model_name my-llm \ + --worker_name worker \ + --scheduler stream_load +``` + +#### Transformers Worker + +```bash +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --device cpu \ + --addr 0.0.0.0:8001 \ + --seeds 127.0.0.1:8000 \ + --name worker +``` + +#### vLLM Worker ```bash -pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm +pulsing actor pulsing.actors.vllm.VllmWorker \ + --model Qwen/Qwen2 \ + --addr 0.0.0.0:8002 \ + --seeds 127.0.0.1:8000 \ + --name worker \ + --role aggregated \ + --max_new_tokens 512 ``` -### Transformers Worker +#### Multiple Workers ```bash -pulsing actor transformers --model gpt2 --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 +# Start multiple workers with different names +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --name worker-1 \ + --seeds 127.0.0.1:8000 + +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --name worker-2 \ + --seeds 127.0.0.1:8000 + +# Router targeting specific worker name +pulsing actor pulsing.actors.router.RouterActor \ + --worker_name worker-1 \ + --seeds 127.0.0.1:8000 ``` -### vLLM Worker +### Common Options + +- `--name NAME`: Actor name (default: "worker") +- `--addr ADDR`: Actor System bind address +- `--seeds SEEDS`: Comma-separated list of seed nodes +- Any other `--param value` pairs matching the Actor's constructor signature + +### How It Works ```bash -pulsing actor vllm --model Qwen/Qwen2 --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000 +# Pass parameters directly as command-line arguments +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --device cpu \ + --preload true \ + --name my-worker \ + --seeds 127.0.0.1:8000 + +# Start vLLM worker with all parameters +pulsing actor pulsing.actors.vllm.VllmWorker \ + --model Qwen/Qwen2 \ + --role aggregated \ + --max_new_tokens 512 \ + --name vllm-worker \ + --seeds 127.0.0.1:8000 ``` +Options: +- `--name NAME`: Actor name (default: "worker") +- `--addr ADDR`: Actor System bind address +- `--seeds SEEDS`: Comma-separated list of seed nodes +- Any other `--param value` pairs matching the Actor's constructor signature + +The Actor class must: +- Be importable from the specified module path +- Inherit from `pulsing.actor.Actor` +- Have a constructor with named parameters (the CLI automatically matches arguments to constructor parameters) + +**How it works:** +The CLI inspects the Actor class constructor signature and automatically extracts matching parameters from command-line arguments. You can use `--help` to see available parameters, or check the Actor class documentation. + --- -## Actor List +--- + +## Inspect + +`pulsing inspect` is a lightweight **observer** tool that queries actor systems via HTTP (no cluster join required). It provides multiple subcommands for different inspection needs. + +### Subcommands -`pulsing actor list` is a lightweight **observer** that queries actors via HTTP (no cluster join required). +#### Cluster Status -### Single Node +Inspect cluster members and their status: ```bash -pulsing actor list --endpoint 127.0.0.1:8000 +pulsing inspect cluster --seeds 127.0.0.1:8000 ``` -### Cluster (via Seeds) +Output includes: +- Total nodes and alive count +- Status summary (Alive, Suspect, Failed, etc.) +- Detailed member list with node ID, address, and status + +#### Actors Distribution + +Inspect named actors distribution across the cluster: ```bash -pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001 +pulsing inspect actors --seeds 127.0.0.1:8000 ``` -### Options +Options: +- `--top N`: Show top N actors by instance count +- `--filter STR`: Filter actor names by substring +- `--all_actors True`: Include internal/system actors -| Flag | Description | -|------|-------------| -| `--all_actors True` | Include internal/system actors | -| `--json True` | Output as JSON | +Examples: +```bash +# Show top 10 actors +pulsing inspect actors --seeds 127.0.0.1:8000 --top 10 -!!! note - Uses HTTP/2 (h2c). Node must expose HTTP endpoints. +# Filter actors by name +pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker +``` ---- +#### Metrics -## Inspect +Inspect Prometheus metrics from cluster nodes: + +```bash +pulsing inspect metrics --seeds 127.0.0.1:8000 +``` -`pulsing inspect` joins a cluster (via seeds) and prints a human-friendly snapshot of members and actors. +Options: +- `--raw True`: Output raw metrics (default) +- `--raw False`: Show summary only (key metrics) + +#### Watch Mode + +Watch cluster state changes in real-time: ```bash -pulsing inspect --seeds 127.0.0.1:8000 +pulsing inspect watch --seeds 127.0.0.1:8000 ``` -Output includes: +Options: +- `--interval 1.0`: Refresh interval in seconds (default: 1.0) +- `--kind all`: What to watch: `cluster`, `actors`, `metrics`, or `all` (default: `all`) +- `--max_rounds N`: Maximum number of refresh rounds (None = infinite) + +Examples: +```bash +# Watch cluster member changes +pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster --interval 2.0 -- **Cluster members**: node id, addr, status -- **Named actors**: distribution across nodes +# Watch actor changes +pulsing inspect watch --seeds 127.0.0.1:8000 --kind actors +``` + +### Common Options + +All subcommands support: -!!! tip - For local seeds (`127.0.0.1`), the CLI auto-binds to `127.0.0.1:0`. +- `--timeout 10.0`: Request timeout in seconds (default: 10.0) +- `--best_effort True`: Continue even if some nodes fail (default: False) + +!!! note + Observer mode uses HTTP/2 (h2c) and does NOT join the gossip cluster, making it lightweight and suitable for production monitoring. --- @@ -93,10 +227,15 @@ pulsing bench gpt2 --url http://localhost:8080 | Task | Command | |------|---------| -| Start router | `pulsing actor router --addr 0.0.0.0:8000 --http_port 8080` | -| Start worker | `pulsing actor transformers --model gpt2 --seeds ...` | -| List actors | `pulsing actor list --endpoint 127.0.0.1:8000` | -| Inspect cluster | `pulsing inspect --seeds 127.0.0.1:8000` | +| Start router | `pulsing actor pulsing.actors.router.RouterActor --addr 0.0.0.0:8000 --http_port 8080` | +| Start worker | `pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --seeds ...` | +| Start multiple workers | `pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --name worker-1 --seeds ...` | +| Router with custom worker | `pulsing actor pulsing.actors.router.RouterActor --worker_name worker-1 --seeds ...` | +| List actors | `pulsing inspect actors --endpoint 127.0.0.1:8000` | +| Inspect cluster | `pulsing inspect cluster --seeds 127.0.0.1:8000` | +| Inspect actors | `pulsing inspect actors --seeds 127.0.0.1:8000 --top 10` | +| Inspect metrics | `pulsing inspect metrics --seeds 127.0.0.1:8000` | +| Watch cluster | `pulsing inspect watch --seeds 127.0.0.1:8000` | | Benchmark | `pulsing bench gpt2 --url http://localhost:8080` | --- diff --git a/docs/src/guide/operations.zh.md b/docs/src/guide/operations.zh.md index 7351c7036..759f88257 100644 --- a/docs/src/guide/operations.zh.md +++ b/docs/src/guide/operations.zh.md @@ -1,74 +1,190 @@ -# CLI 运维操作 +# CLI 命令 -Pulsing 内置 CLI 工具,用于运行、检查和基准测试分布式系统。 +Pulsing 内置 CLI 工具,用于启动 actors、检查系统和基准测试分布式服务。 --- -## 运行服务 +## 启动 Actor -### Router(OpenAI 兼容 HTTP API) +`pulsing actor` 命令通过提供完整的类路径来启动 actors。CLI 会自动将命令行参数匹配到 Actor 的构造函数参数。 + +### 格式 + +Actor 类型必须是完整的类路径: +- 格式: `module.path.ClassName` +- 示例: `pulsing.actors.router.RouterActor` +- 示例: `pulsing.actors.worker.TransformersWorker` +- 示例: `pulsing.actors.vllm.VllmWorker` +- 示例: `my_module.my_actor.MyCustomActor` + +### 示例 + +#### Router(OpenAI 兼容 HTTP API) + +```bash +pulsing actor pulsing.actors.router.RouterActor \ + --addr 0.0.0.0:8000 \ + --http_host 0.0.0.0 \ + --http_port 8080 \ + --model_name my-llm \ + --worker_name worker \ + --scheduler stream_load +``` + +#### Transformers Worker ```bash -pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --device cpu \ + --addr 0.0.0.0:8001 \ + --seeds 127.0.0.1:8000 \ + --name worker ``` -### Transformers Worker +#### vLLM Worker ```bash -pulsing actor transformers --model gpt2 --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 +pulsing actor pulsing.actors.vllm.VllmWorker \ + --model Qwen/Qwen2 \ + --addr 0.0.0.0:8002 \ + --seeds 127.0.0.1:8000 \ + --name worker \ + --role aggregated \ + --max_new_tokens 512 ``` -### vLLM Worker +#### 多个 Worker ```bash -pulsing actor vllm --model Qwen/Qwen2 --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000 +# 启动多个不同名称的 worker +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --name worker-1 \ + --seeds 127.0.0.1:8000 + +pulsing actor pulsing.actors.worker.TransformersWorker \ + --model_name gpt2 \ + --name worker-2 \ + --seeds 127.0.0.1:8000 + +# Router 路由到特定 worker 名称 +pulsing actor pulsing.actors.router.RouterActor \ + --worker_name worker-1 \ + --seeds 127.0.0.1:8000 ``` +### 通用选项 + +- `--name NAME`: Actor 名称(默认: "worker") +- `--addr ADDR`: Actor System 绑定地址 +- `--seeds SEEDS`: 逗号分隔的种子节点列表 +- 任何其他 `--param value` 参数对,匹配 Actor 的构造函数签名 + +### 工作原理 + +CLI 会检查 Actor 类的构造函数签名,并自动从命令行参数中提取匹配的参数。可以使用 `--help` 查看可用参数,或查看 Actor 类的文档。 + +Actor 类必须: +- 可以从指定的模块路径导入 +- 继承自 `pulsing.actor.Actor` +- 具有带命名参数的构造函数(CLI 会自动将参数匹配到构造函数参数) + --- -## Actor List +## Inspect + +`pulsing inspect` 是轻量级 **观察者**工具,通过 HTTP 查询 actor 系统(**无需加入集群**)。它提供多个子命令用于不同的检查需求。 -`pulsing actor list` 是轻量级 **观察者**,通过 HTTP 查询 actor — **无需加入集群**。 +### 子命令 -### 单节点 +#### 集群状态 + +检查集群成员及其状态: ```bash -pulsing actor list --endpoint 127.0.0.1:8000 +pulsing inspect cluster --seeds 127.0.0.1:8000 ``` -### 集群(通过 Seeds) +输出包括: +- 总节点数和存活节点数 +- 状态摘要(Alive、Suspect、Failed 等) +- 详细的成员列表,包含节点 ID、地址和状态 + +#### Actor 分布 + +检查命名 actors 在集群中的分布: ```bash -pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001 +pulsing inspect actors --seeds 127.0.0.1:8000 ``` -### 选项 +选项: +- `--endpoint ADDR`: 查询单个节点(例如:`127.0.0.1:8000`) +- `--top N`: 显示实例数最多的前 N 个 actors +- `--filter STR`: 按子字符串过滤 actor 名称 +- `--all_actors True`: 包含内部/系统 actors +- `--json_output True`: JSON 格式输出 +- `--detailed True`: 显示详细信息(类、模块等) -| 参数 | 描述 | -|------|------| -| `--all_actors True` | 包含内部/系统 actor | -| `--json True` | JSON 格式输出 | +示例: +```bash +# 查询单个节点 +pulsing inspect actors --endpoint 127.0.0.1:8000 -!!! note - 使用 HTTP/2 (h2c)。节点需暴露 HTTP 端点。 +# 显示前 10 个 actors +pulsing inspect actors --seeds 127.0.0.1:8000 --top 10 ---- +# 按名称过滤 actors +pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker -## Inspect +# 显示详细信息 +pulsing inspect actors --endpoint 127.0.0.1:8000 --detailed +``` -`pulsing inspect` 加入集群(通过 seeds)并打印成员和 actor 的可读快照。 +#### 指标 + +检查集群节点的 Prometheus 指标: ```bash -pulsing inspect --seeds 127.0.0.1:8000 +pulsing inspect metrics --seeds 127.0.0.1:8000 ``` -输出包含: +选项: +- `--raw True`: 输出原始指标(默认) +- `--raw False`: 仅显示摘要(关键指标) + +#### 监视模式 + +实时监视集群状态变化: -- **集群成员**:节点 id、地址、状态 -- **命名 Actor**:跨节点分布 +```bash +pulsing inspect watch --seeds 127.0.0.1:8000 +``` -!!! tip - 本地 seeds(`127.0.0.1`)时,CLI 自动绑定到 `127.0.0.1:0`。 +选项: +- `--interval 1.0`: 刷新间隔(秒,默认: 1.0) +- `--kind all`: 监视内容:`cluster`、`actors`、`metrics` 或 `all`(默认: `all`) +- `--max_rounds N`: 最大刷新轮数(None = 无限) + +示例: +```bash +# 监视集群成员变化 +pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster --interval 2.0 + +# 监视 actor 变化 +pulsing inspect watch --seeds 127.0.0.1:8000 --kind actors +``` + +### 通用选项 + +所有子命令支持: + +- `--timeout 10.0`: 请求超时(秒,默认: 10.0) +- `--best_effort True`: 即使某些节点失败也继续(默认: False) + +!!! note + 观察者模式使用 HTTP/2 (h2c),**不会**加入 gossip 集群,使其轻量级且适合生产环境监控。 --- @@ -93,10 +209,15 @@ pulsing bench gpt2 --url http://localhost:8080 | 任务 | 命令 | |------|------| -| 启动 router | `pulsing actor router --addr 0.0.0.0:8000 --http_port 8080` | -| 启动 worker | `pulsing actor transformers --model gpt2 --seeds ...` | -| 列出 actor | `pulsing actor list --endpoint 127.0.0.1:8000` | -| 检查集群 | `pulsing inspect --seeds 127.0.0.1:8000` | +| 启动 router | `pulsing actor pulsing.actors.router.RouterActor --addr 0.0.0.0:8000 --http_port 8080` | +| 启动 worker | `pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --seeds ...` | +| 启动多个 worker | `pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --name worker-1 --seeds ...` | +| Router 指定 worker | `pulsing actor pulsing.actors.router.RouterActor --worker_name worker-1 --seeds ...` | +| 列出 actors | `pulsing inspect actors --endpoint 127.0.0.1:8000` | +| 检查集群 | `pulsing inspect cluster --seeds 127.0.0.1:8000` | +| 检查 actors | `pulsing inspect actors --seeds 127.0.0.1:8000 --top 10` | +| 检查指标 | `pulsing inspect metrics --seeds 127.0.0.1:8000` | +| 监视集群 | `pulsing inspect watch --seeds 127.0.0.1:8000` | | 基准测试 | `pulsing bench gpt2 --url http://localhost:8080` | --- diff --git a/examples/README.md b/examples/README.md index 18f5e3dd9..9f1ce3dab 100644 --- a/examples/README.md +++ b/examples/README.md @@ -71,6 +71,42 @@ python examples/python/ping_pong.py python examples/python/cluster.py --port 8000 ``` +### ⭐⭐ CLI 工具 (`inspect/`) + +| 示例 | 说明 | +|------|------| +| `demo.sh` | 完整演示脚本(推荐) | +| `start_demo.sh` | 仅启动服务脚本 | +| `demo_service.py` | 多节点演示服务 | +| `README.md` | Inspect CLI 使用指南 | + +**一键运行完整演示:** + +```bash +./examples/inspect/demo.sh +``` + +这个脚本会自动: +- 启动 3 个节点 +- 运行所有 inspect 子命令 +- 展示各种用法和输出 +- 按 Ctrl+C 自动清理 + +**或手动启动服务:** + +```bash +# 启动服务(3个终端) +python examples/inspect/demo_service.py --port 8000 +python examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000 +python examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000 + +# 使用 inspect 查看集群状态 +pulsing inspect cluster --seeds 127.0.0.1:8000 +pulsing inspect actors --seeds 127.0.0.1:8000 +pulsing inspect metrics --seeds 127.0.0.1:8000 +pulsing inspect watch --seeds 127.0.0.1:8000 +``` + ### ⭐⭐⭐ Rust 示例 (`rust/`) | 示例 | 说明 | @@ -95,6 +131,7 @@ cargo run --example behavior_fsm -p pulsing-actor | AI 辩论/讨论 | `agent/pulsing/mbti_discussion.py` | | 并行任务竞争 | `agent/pulsing/parallel_ideas_async.py` | | 集群部署 | `python/cluster.py` | +| 学习 CLI 工具 | `inspect/demo_service.py` | | 接入 AutoGen | `agent/autogen/` | | 接入 LangGraph | `agent/langgraph/` | | 学习 Rust API | `rust/behavior_*.rs` | diff --git a/examples/inspect/README.md b/examples/inspect/README.md new file mode 100644 index 000000000..e87a4886a --- /dev/null +++ b/examples/inspect/README.md @@ -0,0 +1,311 @@ +# Inspect CLI 示例 + +这个示例展示如何启动一个多节点的 Pulsing 服务,然后使用 `pulsing inspect` 命令查看集群状态、actors 分布和指标。 + +## 快速开始 + +### 方式一:一键完整演示(推荐) + +运行完整的演示脚本,会自动启动服务并展示所有 inspect 命令: + +```bash +./examples/inspect/demo.sh +``` + +这个脚本会: +1. ✅ 自动启动 3 个节点(后台运行) +2. ✅ 依次运行所有 inspect 子命令并展示输出 +3. ✅ 演示 cluster、actors、metrics、watch 等功能 +4. ✅ 最后保持运行,你可以手动尝试更多命令 +5. ✅ 按 Ctrl+C 自动清理所有节点 + +**输出示例:** +- 集群状态(成员列表、状态统计) +- Actors 分布(全部、top 5、过滤结果) +- Metrics 摘要(关键指标) +- Watch 模式(实时监控演示) + +### 方式二:仅启动服务(用于手动测试) + +如果你只想启动服务,然后手动运行 inspect 命令: + +```bash +./examples/inspect/start_demo.sh +``` + +这个脚本会: +- 启动 3 个节点(后台运行) +- 显示节点 PID 和日志位置 +- 保持运行,等待你手动执行 inspect 命令 +- 按 Ctrl+C 停止所有节点 + +### 方式三:完全手动(用于学习) + +**1. 启动服务(3 个终端)** + +**终端 1 - 节点 1(种子节点):** +```bash +python examples/inspect/demo_service.py --port 8000 +``` + +**终端 2 - 节点 2:** +```bash +python examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000 +``` + +**终端 3 - 节点 3:** +```bash +python examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000 +``` + +**2. 使用 Inspect 命令(新终端)** + +#### 查看集群状态 + +```bash +# 查看集群成员和状态 +pulsing inspect cluster --seeds 127.0.0.1:8000 +``` + +输出示例: +``` +Connecting to cluster via seeds: ['127.0.0.1:8000']... + +Cluster Status: 3 total nodes (3 alive) +================================================================================ + +Status Summary: + Alive: 3 + +Node ID Address Status +-------------------------------------------------------------------------------- +1 127.0.0.1:8000 Alive +2 127.0.0.1:8001 Alive +3 127.0.0.1:8002 Alive + +================================================================================ +``` + +#### 查看 Actors 分布 + +```bash +# 查看所有 actors 的分布 +pulsing inspect actors --seeds 127.0.0.1:8000 + +# 只看 top 5 +pulsing inspect actors --seeds 127.0.0.1:8000 --top 5 + +# 过滤特定类型的 actors +pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker +``` + +输出示例: +``` +Connecting to cluster via seeds: ['127.0.0.1:8000']... +Found 3 alive nodes + +Actor Distribution (5 unique actors): +================================================================================ +Actor Name Instances Nodes +-------------------------------------------------------------------------------- +workers/worker-1 1 1 +workers/worker-2 1 1 +workers/worker-3 1 2 +workers/worker-4 1 2 +services/router 1 1 +services/cache 1 3 + +================================================================================ +Total: 5 unique actors, 6 instances +Across 3 nodes +``` + +#### 查看 Metrics + +```bash +# 查看原始 Prometheus 指标 +pulsing inspect metrics --seeds 127.0.0.1:8000 + +# 只看关键指标摘要 +pulsing inspect metrics --seeds 127.0.0.1:8000 --raw False +``` + +#### 实时监控(Watch 模式) + +```bash +# 监控集群变化 +pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster --interval 2.0 + +# 监控 actors 变化 +pulsing inspect watch --seeds 127.0.0.1:8000 --kind actors --interval 1.0 + +# 监控所有变化 +pulsing inspect watch --seeds 127.0.0.1:8000 --kind all --interval 1.0 +``` + +输出示例: +``` +Watching cluster via seeds: ['127.0.0.1:8000']... +Refresh interval: 1.0s, Watching: cluster +Press Ctrl+C to stop + +[14:30:15] Initial state: 3/3 nodes alive + +[14:30:16] No changes + +[14:30:17] Changes detected: + • Node 2: Alive -> Suspect + +[14:30:18] Changes detected: + • Node 2: Suspect -> Alive +``` + +## 服务架构 + +这个示例服务包含: + +- **Node 1 (8000)**: + - `services/router` - 路由服务 + - `workers/worker-1` - 工作节点 1 + - `workers/worker-2` - 工作节点 2 + +- **Node 2 (8001)**: + - `workers/worker-3` - 工作节点 3 + - `workers/worker-4` - 工作节点 4 + +- **Node 3 (8002)**: + - `services/cache` - 缓存服务 + +## Inspect 命令参考 + +### 通用选项 + +所有子命令都支持: + +- `--seeds ` - 逗号分隔的种子节点地址(必需) +- `--timeout ` - 请求超时(默认:10.0) +- `--best_effort True` - 即使部分节点失败也继续(默认:False) + +### Cluster 子命令 + +```bash +pulsing inspect cluster --seeds 127.0.0.1:8000 +``` + +显示: +- 集群成员总数和存活数 +- 按状态分组的统计 +- 每个节点的详细信息(ID、地址、状态) + +### Actors 子命令 + +```bash +pulsing inspect actors --seeds 127.0.0.1:8000 [options] +``` + +选项: +- `--top N` - 只显示前 N 个 actors(按实例数排序) +- `--filter STR` - 按名称子串过滤 +- `--all_actors True` - 包含内部/系统 actors + +显示: +- Actor 名称 +- 实例数量 +- 分布节点列表 + +### Metrics 子命令 + +```bash +pulsing inspect metrics --seeds 127.0.0.1:8000 [options] +``` + +选项: +- `--raw True` - 输出原始 Prometheus 格式(默认) +- `--raw False` - 只显示关键指标摘要 + +### Watch 子命令 + +```bash +pulsing inspect watch --seeds 127.0.0.1:8000 [options] +``` + +选项: +- `--interval ` - 刷新间隔(默认:1.0) +- `--kind ` - 监控类型:`cluster`、`actors`、`metrics`、`all`(默认:`all`) +- `--max_rounds N` - 最大刷新轮数(None = 无限) + +## 演示脚本说明 + +### `demo.sh` - 完整演示脚本 + +自动完成以下步骤: + +1. **启动服务**:在后台启动 3 个节点 +2. **Inspect Cluster**:展示集群成员和状态 +3. **Inspect Actors**:展示 actors 分布(全部、top 5、过滤) +4. **Inspect Metrics**:展示关键指标摘要 +5. **Watch Mode**:演示实时监控(3 轮) + +脚本会在最后保持运行,你可以: +- 手动运行更多 inspect 命令 +- 查看日志文件:`/tmp/pulsing_node*.log` +- 按 Ctrl+C 自动停止所有节点并退出 + +**提示**:如果你想跳过自动演示,直接手动测试,可以在演示结束后(脚本保持运行时)直接运行你想要的 inspect 命令。 + +## 实验建议 + +1. **运行完整演示** + ```bash + ./examples/inspect/demo.sh + ``` + +2. **手动尝试不同的 inspect 命令** + ```bash + pulsing inspect cluster --seeds 127.0.0.1:8000 + pulsing inspect actors --seeds 127.0.0.1:8000 --top 3 + pulsing inspect metrics --seeds 127.0.0.1:8000 --raw False + ``` + +3. **使用 watch 模式观察变化** + - 启动 watch 后,停止一个节点(Ctrl+C),观察状态变化 + - 重新启动节点,观察恢复过程 + +4. **测试过滤和排序** + ```bash + # 只看 worker 类型的 actors + pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker + + # 只看服务类型的 actors + pulsing inspect actors --seeds 127.0.0.1:8000 --filter router + ``` + +5. **测试容错** + ```bash + # 使用 best_effort,即使部分节点失败也继续 + pulsing inspect actors --seeds 127.0.0.1:8000,127.0.0.1:9999 --best_effort True + ``` + +## 故障排查 + +### 无法连接到集群 + +- 确保至少一个种子节点正在运行 +- 检查端口是否正确(默认 8000) +- 尝试使用 `--timeout` 增加超时时间 + +### 看不到某些 actors + +- 使用 `--all_actors True` 查看所有 actors(包括系统 actors) +- 检查 actor 是否真的创建成功(查看服务日志) + +### Watch 模式没有变化 + +- 确保 `--interval` 设置合理(不要太短,避免频繁请求) +- 尝试不同的 `--kind` 选项 + +## 下一步 + +- 查看 [CLI Operations 文档](../../docs/src/guide/operations.md) 了解更多 CLI 命令 +- 尝试其他示例:[examples/README.md](../README.md) +- 学习如何创建自己的 actors:[examples/python/](../python/) diff --git a/examples/inspect/demo.sh b/examples/inspect/demo.sh new file mode 100755 index 000000000..04e18f7da --- /dev/null +++ b/examples/inspect/demo.sh @@ -0,0 +1,157 @@ +#!/bin/bash +# Complete demo script for Pulsing Inspect CLI + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" + +cd "$PROJECT_ROOT" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up...${NC}" + pkill -f "demo_service.py" 2>/dev/null || true + sleep 1 + echo -e "${GREEN}✓ Cleanup complete${NC}" +} + +# Trap to cleanup on exit +trap cleanup EXIT INT TERM + +# Check if Python is available +if ! command -v python3 &> /dev/null; then + echo -e "${RED}Error: python3 not found${NC}" + exit 1 +fi + +# Check if pulsing command is available +if ! command -v pulsing &> /dev/null; then + echo -e "${RED}Error: pulsing command not found. Please install pulsing first.${NC}" + exit 1 +fi + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Pulsing Inspect CLI Demo${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Step 1: Start nodes +echo -e "${GREEN}Step 1: Starting demo service nodes...${NC}" + +echo " Starting node 1 (seed) on port 8000..." +echo " $ python3 examples/inspect/demo_service.py --port 8000" +python3 examples/inspect/demo_service.py --port 8000 > /tmp/pulsing_node1.log 2>&1 & +NODE1_PID=$! +echo " ✓ Node 1 started (PID: $NODE1_PID)" + +sleep 3 + +echo " Starting node 2 on port 8001..." +echo " $ python3 examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000" +python3 examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000 > /tmp/pulsing_node2.log 2>&1 & +NODE2_PID=$! +echo " ✓ Node 2 started (PID: $NODE2_PID)" + +sleep 3 + +echo " Starting node 3 on port 8002..." +echo " $ python3 examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000" +python3 examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000 > /tmp/pulsing_node3.log 2>&1 & +NODE3_PID=$! +echo " ✓ Node 3 started (PID: $NODE3_PID)" + +echo "" +echo -e "${GREEN}✓ All nodes started${NC}" +echo "" + +# Wait for cluster to stabilize +echo -e "${YELLOW}Waiting for cluster to stabilize...${NC}" +sleep 5 + +# Step 2: Inspect cluster +echo "" +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Step 2: Inspect Cluster${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" +echo -e "${YELLOW}$ pulsing inspect cluster --seeds 127.0.0.1:8000${NC}" +pulsing inspect cluster --seeds 127.0.0.1:8000 +echo "" + +# Wait a bit +sleep 2 + +# Step 3: Inspect actors +echo "" +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Step 3: Inspect Actors${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" +echo -e "${YELLOW}All actors:${NC}" +echo -e "${YELLOW}$ pulsing inspect actors --seeds 127.0.0.1:8000${NC}" +pulsing inspect actors --seeds 127.0.0.1:8000 +echo "" + +echo -e "${YELLOW}Top 5 actors:${NC}" +echo -e "${YELLOW}$ pulsing inspect actors --seeds 127.0.0.1:8000 --top 5${NC}" +pulsing inspect actors --seeds 127.0.0.1:8000 --top 5 +echo "" + +echo -e "${YELLOW}Filtered (worker):${NC}" +echo -e "${YELLOW}$ pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker${NC}" +pulsing inspect actors --seeds 127.0.0.1:8000 --filter worker +echo "" + +# Step 4: Inspect metrics +echo "" +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Step 4: Inspect Metrics${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" +echo -e "${YELLOW}Key metrics summary:${NC}" +echo -e "${YELLOW}$ pulsing inspect metrics --seeds 127.0.0.1:8000 --raw False${NC}" +pulsing inspect metrics --seeds 127.0.0.1:8000 --raw False +echo "" + +# Step 5: Watch mode (short demo) +echo "" +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Step 5: Watch Mode (3 rounds)${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" +echo -e "${YELLOW}Watching cluster changes (3 rounds, 2s interval)...${NC}" +echo -e "${YELLOW}$ pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster --interval 2.0 --max_rounds 3${NC}" +echo "" +pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster --interval 2.0 --max_rounds 3 +echo "" + +# Summary +echo "" +echo -e "${BLUE}========================================${NC}" +echo -e "${GREEN}Demo Complete!${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" +echo "Nodes are still running. You can:" +echo " - Run more inspect commands manually" +echo " - Check logs: /tmp/pulsing_node*.log" +echo " - Press Ctrl+C to stop all nodes" +echo "" +echo "Try these commands:" +echo " pulsing inspect cluster --seeds 127.0.0.1:8000" +echo " pulsing inspect actors --seeds 127.0.0.1:8000 --top 10" +echo " pulsing inspect metrics --seeds 127.0.0.1:8000" +echo " pulsing inspect watch --seeds 127.0.0.1:8000 --kind all" +echo "" + +# Keep running until interrupted +echo -e "${YELLOW}Press Ctrl+C to stop all nodes and exit...${NC}" +wait diff --git a/examples/inspect/demo_service.py b/examples/inspect/demo_service.py new file mode 100644 index 000000000..348645b1a --- /dev/null +++ b/examples/inspect/demo_service.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +Demo Service for Inspect CLI + +This example starts a multi-node cluster with various actors to demonstrate +the inspect CLI commands. + +Usage: + Terminal 1: python examples/inspect/demo_service.py --port 8000 + Terminal 2: python examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000 + Terminal 3: python examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000 + +Then in another terminal, try: + pulsing inspect cluster --seeds 127.0.0.1:8000 + pulsing inspect actors --seeds 127.0.0.1:8000 + pulsing inspect metrics --seeds 127.0.0.1:8000 +""" + +import argparse +import asyncio +import random +import time + +from pulsing.actor import Actor, ActorId, Message, SystemConfig, create_actor_system + + +class WorkerActor(Actor): + """A simple worker actor that processes tasks""" + + def __init__(self, worker_id: str): + self.worker_id = worker_id + self.tasks_processed = 0 + + def on_start(self, actor_id: ActorId): + print(f"[Worker {self.worker_id}] Started") + + def receive(self, msg: Message) -> Message: + if msg.msg_type == "ProcessTask": + task = msg.to_json().get("task", "") + self.tasks_processed += 1 + result = f"Processed: {task} (total: {self.tasks_processed})" + print(f"[Worker {self.worker_id}] {result}") + return Message.from_json( + "TaskResult", {"result": result, "worker": self.worker_id} + ) + elif msg.msg_type == "GetStats": + return Message.from_json( + "Stats", {"worker_id": self.worker_id, "tasks": self.tasks_processed} + ) + return Message.empty() + + +class RouterActor(Actor): + """A router actor that distributes tasks to workers""" + + def __init__(self): + self.workers = [] + self.tasks_routed = 0 + + def on_start(self, actor_id: ActorId): + print("[Router] Started") + + def receive(self, msg: Message) -> Message: + if msg.msg_type == "RouteTask": + self.tasks_routed += 1 + task = msg.to_json().get("task", "") + # Simulate routing logic + worker_id = f"worker-{random.randint(1, 3)}" + return Message.from_json( + "Routed", + {"task": task, "worker": worker_id, "routed": self.tasks_routed}, + ) + elif msg.msg_type == "GetStats": + return Message.from_json( + "Stats", {"router": True, "tasks_routed": self.tasks_routed} + ) + return Message.empty() + + +class CacheActor(Actor): + """A cache actor that stores key-value pairs""" + + def __init__(self): + self.cache = {} + + def on_start(self, actor_id: ActorId): + print("[Cache] Started") + + def receive(self, msg: Message) -> Message: + if msg.msg_type == "Get": + key = msg.to_json().get("key", "") + value = self.cache.get(key, None) + return Message.from_json( + "Value", {"key": key, "value": value, "found": value is not None} + ) + elif msg.msg_type == "Set": + data = msg.to_json() + key = data.get("key", "") + value = data.get("value", "") + self.cache[key] = value + return Message.from_json("SetResult", {"key": key, "success": True}) + elif msg.msg_type == "GetStats": + return Message.from_json("Stats", {"cache_size": len(self.cache)}) + return Message.empty() + + +async def run_node(port: int, seed: str | None): + """Run a node in the cluster""" + print(f"\n{'='*60}") + print(f"Pulsing Demo Service - Node on port {port}") + print(f"{'='*60}\n") + + config = SystemConfig.with_addr(f"127.0.0.1:{port}") + if seed: + config = config.with_seeds([seed]) + print(f"Joining cluster via: {seed}") + + system = await create_actor_system(config) + print(f"✓ System started: {system.node_id} @ {system.addr}\n") + + # Create different actors based on node role + if seed is None: + # Node 1: Create router and some workers + print("Creating actors on node 1...") + await system.spawn("router", RouterActor(), public=True) + print(" ✓ actors/router") + + for i in range(1, 3): + worker_name = f"worker-{i}" + await system.spawn(worker_name, WorkerActor(worker_name), public=True) + print(f" ✓ actors/{worker_name}") + + print("\n✓ Node 1 ready!") + print("\nTo start more nodes:") + print( + " Terminal 2: python examples/inspect/demo_service.py --port 8001 --seed 127.0.0.1:8000" + ) + print( + " Terminal 3: python examples/inspect/demo_service.py --port 8002 --seed 127.0.0.1:8000" + ) + print("\nThen try inspect commands:") + print(" pulsing inspect cluster --seeds 127.0.0.1:8000") + print(" pulsing inspect actors --seeds 127.0.0.1:8000") + print(" pulsing inspect metrics --seeds 127.0.0.1:8000") + print(" pulsing inspect watch --seeds 127.0.0.1:8000 --kind cluster\n") + + elif port == 8001: + # Node 2: Add more workers + await asyncio.sleep(1) # Wait for cluster discovery + print("Creating actors on node 2...") + for i in range(3, 5): + worker_name = f"worker-{i}" + await system.spawn(worker_name, WorkerActor(worker_name), public=True) + print(f" ✓ actors/{worker_name}") + print("\n✓ Node 2 ready!") + + elif port == 8002: + # Node 3: Add cache + await asyncio.sleep(1) + print("Creating actors on node 3...") + await system.spawn("cache", CacheActor(), public=True) + print(" ✓ actors/cache") + print("\n✓ Node 3 ready!") + + # Keep running and periodically show cluster status + try: + while True: + await asyncio.sleep(10) + members = await system.members() + alive = [m for m in members if m.get("status") == "Alive"] + print( + f"[{time.strftime('%H:%M:%S')}] Cluster: {len(alive)}/{len(members)} nodes alive" + ) + except asyncio.CancelledError: + pass + except KeyboardInterrupt: + print("\n\nShutting down...") + finally: + await system.shutdown() + print("✓ Shutdown complete") + + +def main(): + parser = argparse.ArgumentParser(description="Pulsing Demo Service for Inspect CLI") + parser.add_argument( + "--port", type=int, default=8000, help="Port to bind (default: 8000)" + ) + parser.add_argument( + "--seed", + type=str, + default=None, + help="Seed node to join (e.g., 127.0.0.1:8000)", + ) + args = parser.parse_args() + + try: + asyncio.run(run_node(args.port, args.seed)) + except KeyboardInterrupt: + print("\nInterrupted") + + +if __name__ == "__main__": + main() diff --git a/python/pulsing/__init__.py b/python/pulsing/__init__.py index 6ea933b06..e81ba6366 100644 --- a/python/pulsing/__init__.py +++ b/python/pulsing/__init__.py @@ -38,4 +38,4 @@ def incr(self): self.value += 1; return self.value - pulsing.compat.ray: Ray-compatible sync API (for migration) """ -__version__ = "0.7.0" +__version__ = "0.1.0" diff --git a/python/pulsing/cli/__main__.py b/python/pulsing/cli/__main__.py index e19b1bb58..ace91c6f6 100644 --- a/python/pulsing/cli/__main__.py +++ b/python/pulsing/cli/__main__.py @@ -5,153 +5,183 @@ @hp.param("actor") def actor( - actor_type: str, # Positional argument, supports: router, transformers, vllm, list - namespace: str = "pulsing", + actor_type: str, # Positional argument: full class path (e.g., 'pulsing.actors.worker.TransformersWorker') addr: str | None = None, seeds: str | None = None, - model: str | None = None, - model_name: str = "pulsing-model", - device: str = "cuda", - max_new_tokens: int = 512, - role: str = "aggregated", - preload_model: bool = False, - http_host: str = "0.0.0.0", - http_port: int = 8080, - scheduler: str = "stream_load", - # macOS Metal/MLX support parameters - mlx_device: str | None = None, # 'gpu' or 'cpu', default 'gpu' - metal_memory_fraction: float | None = None, # 0.0-1.0, default 0.8 - # Actor list parameters - endpoint: str | None = None, # Single actor system endpoint (list only) - all_actors: bool = False, # Show all actors including internal ones (list only) - json: bool = False, # Output as JSON (list only) + name: str = "worker", # Actor name (default: "worker") + **kwargs, # Additional arguments for Actor constructor ): r""" - Start an Actor-based service or list actors. + Start an Actor-based service. - This command starts actors based on the Pulsing Actor System or lists existing actors. - Supported actor types: - - router: Load balancing router (with OpenAI-compatible HTTP API) - - transformers: Transformers-based inference worker - - vllm: vLLM-based high-performance inference worker - - list: List actors in the current system + This command starts actors based on the Pulsing Actor System. + + Actor type must be a full class path: + - Format: 'module.path.ClassName' + - Example: 'pulsing.actors.router.RouterActor' + - Example: 'pulsing.actors.worker.TransformersWorker' + - Example: 'pulsing.actors.vllm.VllmWorker' + - Example: 'my_module.my_actor.MyCustomActor' + + Pass constructor parameters directly as command-line arguments. + The CLI will automatically match parameters to the Actor's constructor signature. + + Note: To list actors, use 'pulsing inspect actors' instead. Args: - actor_type: Actor type (positional argument). Options: 'router', 'transformers', 'vllm', 'list' - namespace: Service namespace. Default: 'pulsing' + actor_type: Full class path (positional argument), e.g., 'pulsing.actors.worker.TransformersWorker' addr: Actor System bind address (e.g., '0.0.0.0:8000') seeds: Comma-separated list of seed nodes (e.g., '192.168.1.1:8000,192.168.1.2:8000') - model: Model path (required for 'transformers' and 'vllm' type) - model_name: Model name for OpenAI API. Default: 'pulsing-model' - device: Device for inference ('cuda', 'cpu', 'mps'). Default: 'cuda' - max_new_tokens: Max tokens to generate. Default: 512 - role: Worker role for vLLM ('aggregated', 'prefill', 'decode'). Default: 'aggregated' - scheduler: Scheduler algorithm for router. Options: 'round_robin', 'random', 'least_connection', 'stream_load'. Default: 'stream_load' - preload_model: Preload model on startup. Default: False - http_host: HTTP server host (for router). Default: '0.0.0.0' - http_port: HTTP server port (for router). Default: 8080 - mlx_device: MLX device type for macOS ('gpu' or 'cpu'). Default: 'gpu' - metal_memory_fraction: Metal memory fraction for macOS (0.0-1.0). Default: 0.8 - endpoint: (list only) Single actor system endpoint (e.g., '127.0.0.1:8000') - all_actors: (list only) Show all actors including internal system actors - json: (list only) Output in JSON format + name: Actor name. Default: 'worker'. Use different names to run multiple workers in the same cluster. + **kwargs: Additional arguments matching the Actor's constructor parameters. + Pass parameters directly as command-line arguments, e.g., --model_name gpt2 --device cpu Examples: - # Start a router with OpenAI-compatible API on port 8080 - pulsing actor router --http_port 8080 --model_name my-llm + # Start a Transformers worker + pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --device cpu --name my-worker # Start a vLLM worker - pulsing actor vllm --model Qwen/Qwen2 --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000 - - # Start a transformers worker - pulsing actor transformers --model gpt2 --addr 0.0.0.0:8001 --seeds 192.168.1.100:8000 - - # Start worker on CPU - pulsing actor transformers --model gpt2 --device cpu + pulsing actor pulsing.actors.vllm.VllmWorker --model Qwen/Qwen2 --role aggregated --max_new_tokens 512 --name vllm-worker - # List actors from single endpoint - pulsing actor list --endpoint 127.0.0.1:8000 + # Start a Router with OpenAI-compatible API + pulsing actor pulsing.actors.router.RouterActor --http_host 0.0.0.0 --http_port 8080 --model_name my-llm --worker_name worker - # List actors from cluster - pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001 - - # List all actors including internal ones - pulsing actor list --endpoint 127.0.0.1:8000 --all_actors True - - # Output as JSON - pulsing actor list --endpoint 127.0.0.1:8000 --json True + # Start multiple workers with different names + pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --name worker-1 --seeds 127.0.0.1:8000 + pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2 --name worker-2 --seeds 127.0.0.1:8000 """ - from .actors import start_router, start_transformers, start_vllm + from .actors import start_generic_actor - # Handle 'list' subcommand + # Check for deprecated 'list' subcommand if actor_type == "list": - from .actor_list import list_actors_command + print("Error: 'pulsing actor list' has been removed.") + print("Use 'pulsing inspect actors' instead:") + print(" pulsing inspect actors --endpoint 127.0.0.1:8000") + print(" pulsing inspect actors --seeds 127.0.0.1:8000") + return - list_actors_command( - endpoint=endpoint, - seeds=seeds, - all_actors=all_actors, - json_output=json, + # Check if actor_type is a valid class path (must contain dots) + if "." not in actor_type: + raise ValueError( + f"Error: Actor type must be a full class path (e.g., 'pulsing.actors.worker.TransformersWorker').\n" + f"Received: '{actor_type}'\n" + f"Example: pulsing actor pulsing.actors.worker.TransformersWorker --model_name gpt2" ) - return # Parse seeds seed_list = [] if seeds: seed_list = [s.strip() for s in seeds.split(",") if s.strip()] - if actor_type == "router": - start_router( - namespace, addr, seed_list, http_host, http_port, model_name, scheduler - ) - elif actor_type == "transformers": - if not model: - raise ValueError("--model is required for 'transformers' actor type") - start_transformers( - model=model, - namespace=namespace, - addr=addr, - seeds=seed_list, - device=device, - max_new_tokens=max_new_tokens, - preload_model=preload_model, - ) - elif actor_type == "vllm": - if not model: - raise ValueError("--model is required for 'vllm' actor type") - start_vllm( - model=model, - namespace=namespace, - addr=addr, - seeds=seed_list, - max_new_tokens=max_new_tokens, - role=role, - mlx_device=mlx_device, - metal_memory_fraction=metal_memory_fraction, - ) - else: - raise ValueError( - f"Unknown actor type: {actor_type}. Supported types: router, transformers, vllm, list" - ) + # Start generic Actor class + start_generic_actor( + actor_type=actor_type, + addr=addr, + seeds=seed_list, + name=name, + extra_kwargs=kwargs, # All additional CLI arguments + ) @hp.param("inspect") -def inspect(seeds: str | None = None): +def inspect( + subcommand: str, # Positional argument: cluster, actors, metrics, watch + seeds: str | None = None, + # Common options + timeout: float = 10.0, + best_effort: bool = False, + # cluster subcommand options (no specific options yet) + # actors subcommand options + top: int | None = None, + filter: str | None = None, + all_actors: bool = False, + endpoint: str | None = None, # Single node endpoint (alternative to seeds) + json: bool = False, # JSON output format + detailed: bool = False, # Show detailed info (class, module) per node + # metrics subcommand options + raw: bool = True, + # watch subcommand options + interval: float = 1.0, + kind: str = "all", # cluster, actors, metrics, all + max_rounds: int | None = None, +): """ - Inspect the actor system state. + Inspect the actor system state (observer mode via HTTP API). + + This command uses lightweight HTTP observer mode - does NOT join the gossip cluster. Args: - seeds: Comma-separated list of seed nodes to join the cluster. + subcommand: Subcommand to run. Options: 'cluster', 'actors', 'metrics', 'watch' (required) + seeds: Comma-separated list of seed nodes (required) + timeout: Request timeout in seconds (default: 10.0) + best_effort: Continue even if some nodes fail (default: False) + top: (actors only) Show top N actors by instance count + filter: (actors only) Filter actor names by substring + all_actors: (actors only) Include internal/system actors + raw: (metrics only) Output raw metrics (default: True). If False, show summary only + interval: (watch only) Refresh interval in seconds (default: 1.0) + kind: (watch only) What to watch: 'cluster', 'actors', 'metrics', 'all' (default: 'all') + max_rounds: (watch only) Maximum number of refresh rounds (None = infinite) Examples: - pulsing inspect --seeds 127.0.0.1:8000 + # Inspect cluster members + pulsing inspect cluster --seeds 127.0.0.1:8000 + + # Inspect actors distribution + pulsing inspect actors --seeds 127.0.0.1:8000 --top 10 + + # Inspect metrics + pulsing inspect metrics --seeds 127.0.0.1:8000 --raw False + + # Watch cluster changes + pulsing inspect watch --seeds 127.0.0.1:8000 --interval 2.0 --kind cluster """ - from .inspect import inspect_system + from .inspect import ( + inspect_actors, + inspect_cluster, + inspect_metrics, + inspect_watch, + ) seed_list = [] if seeds: seed_list = [s.strip() for s in seeds.split(",") if s.strip()] - inspect_system(seed_list) + + if subcommand == "cluster": + if not seed_list: + print("Error: --seeds is required for 'inspect cluster' command") + return + inspect_cluster(seed_list, timeout=timeout, best_effort=best_effort) + elif subcommand == "actors": + if endpoint and seed_list: + print("Error: Cannot specify both --endpoint and --seeds.") + print("Use --endpoint for single node, --seeds for cluster.") + return + inspect_actors( + seeds=seed_list if not endpoint else None, + endpoint=endpoint, + timeout=timeout, + best_effort=best_effort, + top=top, + filter=filter, + all_actors=all_actors, + json_output=json, + detailed=detailed, + ) + elif subcommand == "metrics": + inspect_metrics(seed_list, timeout=timeout, best_effort=best_effort, raw=raw) + elif subcommand == "watch": + inspect_watch( + seed_list, + timeout=timeout, + best_effort=best_effort, + interval=interval, + kind=kind, + max_rounds=max_rounds, + ) + else: + print(f"Error: Unknown subcommand '{subcommand}'") + print("Supported subcommands: cluster, actors, metrics, watch") @hp.param("bench") diff --git a/python/pulsing/cli/actor_list.py b/python/pulsing/cli/actor_list.py deleted file mode 100644 index c0617df2c..000000000 --- a/python/pulsing/cli/actor_list.py +++ /dev/null @@ -1,312 +0,0 @@ -"""Actor list command implementation - -Query actors from remote actor systems via simple HTTP API. - -Usage: - # Query single endpoint - pulsing actor list --endpoint 127.0.0.1:8000 - - # Query cluster - pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001 - -This implementation uses direct HTTP/2 requests instead of joining the gossip cluster, -making it a lightweight observer tool. -""" - -import asyncio -import json -import subprocess - -MAX_NODES_DISPLAY = 64 # Maximum number of nodes to display -HTTP_TIMEOUT = 10 # seconds - - -def http_get_sync(url: str) -> dict | list | None: - """Make HTTP/2 GET request using curl (which supports h2c)""" - try: - result = subprocess.run( - [ - "curl", - "-s", # Silent - "--http2-prior-knowledge", # Use h2c (HTTP/2 over cleartext) - "-m", - str(HTTP_TIMEOUT), # Timeout - url, - ], - capture_output=True, - text=True, - timeout=HTTP_TIMEOUT + 2, - ) - if result.returncode == 0 and result.stdout: - return json.loads(result.stdout) - return None - except Exception: - return None - - -async def query_single_endpoint( - endpoint: str, all_actors: bool, output_format: str -) -> bool: - """Query a single actor system endpoint via HTTP API""" - - # Ensure endpoint has port - if ":" not in endpoint: - endpoint = f"{endpoint}:8000" - - # Ensure http:// prefix - if not endpoint.startswith("http"): - endpoint = f"http://{endpoint}" - - print(f"Connecting to {endpoint}...") - - # Get actors list - url = f"{endpoint}/actors" - if all_actors: - url += "?all=true" - - actors = http_get_sync(url) - if actors is None: - print(f"Error: Cannot connect to {endpoint}") - return False - - print(f"Connected to {endpoint}") - print() - - _print_output(actors, output_format) - return True - - -async def query_cluster(seeds: list[str], all_actors: bool, output_format: str) -> bool: - """Query all nodes in a cluster via HTTP API""" - - # Normalize seeds - normalized_seeds = [] - for seed in seeds: - if ":" not in seed: - seed = f"{seed}:8000" - if not seed.startswith("http"): - seed = f"http://{seed}" - normalized_seeds.append(seed) - - print(f"Connecting to cluster via seeds: {seeds}...") - - # Get cluster members from first available seed - members = None - for seed in normalized_seeds: - members = http_get_sync(f"{seed}/cluster/members") - if members: - break - - if not members: - print("Error: Cannot connect to any seed node") - return False - - # Filter alive members - alive_members = [m for m in members if m.get("status") == "Alive"] - print(f"Found {len(alive_members)} alive nodes") - - if len(alive_members) > MAX_NODES_DISPLAY: - print( - f"Warning: Cluster has {len(alive_members)} nodes, " - f"showing first {MAX_NODES_DISPLAY}" - ) - alive_members = alive_members[:MAX_NODES_DISPLAY] - - print() - - # Collect actors from each node - all_nodes_data = [] - - for i, member in enumerate(alive_members): - addr = member.get("addr") - node_id = member.get("node_id") - - if not addr: - continue - - # Ensure http:// prefix - if not addr.startswith("http"): - addr = f"http://{addr}" - - if output_format == "table": - print(f"{'='*80}") - print(f"[{i+1}/{len(alive_members)}] Node {node_id} ({addr})") - print(f"{'='*80}") - - # Get actors from this node - url = f"{addr}/actors" - if all_actors: - url += "?all=true" - - actors = http_get_sync(url) - if actors is None: - if output_format == "table": - print(" Error: Cannot connect to this node") - print() - continue - - if output_format == "table": - _print_actors_table(actors) - print() - else: - all_nodes_data.append( - { - "node_id": node_id, - "addr": addr, - "actors": actors, - } - ) - - # Print JSON if needed - if output_format == "json": - print(json.dumps(all_nodes_data, indent=2)) - - # Summary - if output_format == "table": - print(f"{'='*80}") - print(f"Cluster: {len(alive_members)} nodes") - print(f"{'='*80}") - - return True - - -def _print_output(actors_data: list[dict], output_format: str): - """Print actors in specified format""" - if output_format == "json": - print(json.dumps(actors_data, indent=2)) - else: - _print_actors_table(actors_data) - - -async def list_actors_impl(all_actors: bool = False, output_format: str = "table"): - """ - List actors in the current (local) system. - - This function is for testing and in-process usage. - For CLI, use list_actors_command which uses HTTP API. - - Args: - all_actors: Show all actors including internal system actors - output_format: Output format ('table' or 'json') - """ - from pulsing.actor import get_system - - system = get_system() - all_named = await system.all_named_actors() - - actors_data = [] - for actor_info in all_named: - path = actor_info.get("path", "") - name = path[7:] if path.startswith("actors/") else path - - # Skip system/core - if path == "system/core": - continue - - # Filter internal actors if needed - if not all_actors and name.startswith("_"): - continue - - actor_data = { - "name": name, - "type": "system" if name.startswith("_") else "user", - "uptime": 0, - } - - # Get detailed instance info - detailed = actor_info.get("detailed_instances", []) - if detailed: - inst = detailed[0] - actor_data["actor_id"] = inst.get("actor_id", "-") - actor_data["module"] = inst.get("module", "-") - actor_data["class"] = inst.get("class", "-") - actor_data["file"] = inst.get("file", "-") - - actors_data.append(actor_data) - - _print_output(actors_data, output_format) - - -def _print_actors_table(actors_data: list[dict]): - """Print actors in table format""" - if not actors_data: - print(" No actors found.") - return - - # Check for errors - if actors_data and "error" in actors_data[0]: - print(f" Error: {actors_data[0]['error']}") - return - - # Check if we have detailed info (class, module) - has_details = any(a.get("class") for a in actors_data) - - if has_details: - print(f" {'Name':<25} {'Type':<8} {'Class':<25} {'Module':<30}") - print(f" {'-'*90}") - - for actor in actors_data: - name = actor.get("name", "") - actor_type = actor.get("type", "user") - cls = actor.get("class", "-") - module = actor.get("module", "-") - print(f" {name:<25} {actor_type:<8} {cls:<25} {module:<30}") - else: - print(f" {'Name':<40} {'Type':<10}") - print(f" {'-'*50}") - - for actor in actors_data: - name = actor.get("name", "") - actor_type = actor.get("type", "user") - print(f" {name:<40} {actor_type:<10}") - - print(f"\n Total: {len(actors_data)} actor(s)") - - -def list_actors_command( - endpoint: str | None = None, - seeds: str | None = None, - all_actors: bool = False, - json_output: bool = False, -): - """ - List actors from a remote actor system or cluster. - - Uses simple HTTP/2 API calls - does NOT join the gossip cluster. - - Args: - endpoint: Single actor system endpoint (e.g., '127.0.0.1:8000') - seeds: Comma-separated cluster seed addresses - all_actors: Show all actors including internal system actors - json_output: Output in JSON format - - Examples: - # Query single endpoint - pulsing actor list --endpoint 127.0.0.1:8000 - - # Query cluster - pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001 - - # Show all actors as JSON - pulsing actor list --endpoint 127.0.0.1:8000 --all_actors True --json True - """ - if not endpoint and not seeds: - print("Error: Either --endpoint or --seeds is required.") - print() - print("Usage:") - print(" pulsing actor list --endpoint 127.0.0.1:8000") - print(" pulsing actor list --seeds 127.0.0.1:8000,127.0.0.1:8001") - return - - if endpoint and seeds: - print("Error: Cannot specify both --endpoint and --seeds.") - print("Use --endpoint for single node, --seeds for cluster.") - return - - output_format = "json" if json_output else "table" - - if endpoint: - asyncio.run(query_single_endpoint(endpoint, all_actors, output_format)) - else: - seed_list = [s.strip() for s in seeds.split(",") if s.strip()] - asyncio.run(query_cluster(seed_list, all_actors, output_format)) diff --git a/python/pulsing/cli/actor_loader.py b/python/pulsing/cli/actor_loader.py new file mode 100644 index 000000000..bbdff162a --- /dev/null +++ b/python/pulsing/cli/actor_loader.py @@ -0,0 +1,63 @@ +"""Generic Actor loader - dynamically load and instantiate Actor classes""" + +import importlib +import json +from typing import Any + +from pulsing.actor import Actor + + +def load_actor_class(class_path: str) -> type[Actor]: + """Load Actor class from module path + + Args: + class_path: Full class path, e.g., 'pulsing.actors.worker.TransformersWorker' + + Returns: + Actor class + + Raises: + ImportError: If module or class cannot be imported + ValueError: If class is not an Actor subclass + """ + if "." not in class_path: + raise ValueError( + f"Invalid class path '{class_path}'. Expected format: 'module.path.ClassName'\n" + f"Example: pulsing.actors.worker.TransformersWorker" + ) + + # Split module path and class name + parts = class_path.rsplit(".", 1) + if len(parts) != 2: + raise ValueError( + f"Invalid class path '{class_path}'. Expected format: 'module.path.ClassName'" + ) + + module_path, class_name = parts + + try: + # Import module + module = importlib.import_module(module_path) + except ImportError as e: + raise ImportError( + f"Cannot import module '{module_path}': {e}\n" + f"Make sure the module is installed and the path is correct." + ) from e + + # Get class from module + if not hasattr(module, class_name): + raise AttributeError( + f"Class '{class_name}' not found in module '{module_path}'.\n" + f"Available attributes: {[attr for attr in dir(module) if not attr.startswith('_')]}" + ) + + actor_class = getattr(module, class_name) + + # Verify it's an Actor subclass + if not isinstance(actor_class, type) or not issubclass(actor_class, Actor): + raise ValueError( + f"'{class_name}' is not an Actor subclass.\n" + f"Expected a class that inherits from pulsing.actor.Actor" + ) + + return actor_class diff --git a/python/pulsing/cli/actors.py b/python/pulsing/cli/actors.py index 01baa2476..be39117e2 100644 --- a/python/pulsing/cli/actors.py +++ b/python/pulsing/cli/actors.py @@ -2,176 +2,137 @@ import uvloop +from .actor_loader import load_actor_class -def start_router( - namespace: str, - addr: str | None, - seeds: list[str], - http_host: str, - http_port: int, - model_name: str, - scheduler_type: str, -): - """Start Router with OpenAI-compatible API""" - from pulsing.actor import SystemConfig, create_actor_system - from pulsing.actor.helpers import run_until_signal - - from ..actors import ( - LeastConnectionScheduler, - RandomScheduler, - RoundRobinScheduler, - StreamLoadScheduler, - ) - from ..actors.router import start_router as start_router_service - from ..actors.router import stop_router - - # Select scheduler class - scheduler_map = { - "round_robin": RoundRobinScheduler, - "random": RandomScheduler, - "least_connection": LeastConnectionScheduler, - "stream_load": StreamLoadScheduler, - } - scheduler_class = scheduler_map.get(scheduler_type) - if not scheduler_class: - raise ValueError( - f"Unknown scheduler: {scheduler_type}. Options: {list(scheduler_map.keys())}" - ) - - print(f"Starting Router (namespace={namespace}, model={model_name})") - print(f" Actor System addr: {addr or 'auto'}") - print(f" HTTP API: http://{http_host}:{http_port}") - print(f" Scheduler: {scheduler_type}") - - async def run(): - # Create ActorSystem - if addr: - config = SystemConfig.with_addr(addr) - else: - config = SystemConfig.standalone() - - if seeds: - config = config.with_seeds(seeds) - - system = await create_actor_system(config) - print(f"[Router] ActorSystem started at {system.addr}") - - # Start Router HTTP server - runner = await start_router_service( - system, - http_host=http_host, - http_port=http_port, - model_name=model_name, - worker_name="worker", - scheduler_type=scheduler_type, - ) - - # Run until signal received - try: - await run_until_signal(system, "router") - finally: - await stop_router(runner) - - uvloop.run(run()) - -def start_transformers( - model: str, - namespace: str, +def start_generic_actor( + actor_type: str, addr: str | None, seeds: list[str], - device: str, - max_new_tokens: int, - preload_model: bool, + name: str = "worker", + extra_kwargs: dict | None = None, ): - """Start Transformers Worker""" - from pulsing.actor.helpers import spawn_and_run - - from ..actors import GenerationConfig, TransformersWorker - - print(f"Starting Transformers Worker (model={model}, namespace={namespace})") - print(f" Device: {device}") - print(f" Max tokens: {max_new_tokens}") - print(f" Preload: {preload_model}") - - async def run(): - gen_config = GenerationConfig(max_new_tokens=max_new_tokens) - worker = TransformersWorker( - model_name=model, - device=device, - gen_config=gen_config, - preload=preload_model, - ) - - await spawn_and_run( - worker, - name="worker", - addr=addr, - seeds=seeds if seeds else None, - public=True, - ) - - uvloop.run(run()) - - -def start_vllm( - model: str, - namespace: str, - addr: str | None, - seeds: list[str], - max_new_tokens: int, - role: str = "aggregated", - mlx_device: str | None = None, - metal_memory_fraction: float | None = None, -): - """Start vLLM Worker + """Start a generic Actor class by full module path Args: - model: Model path or name - namespace: Service namespace + actor_type: Full class path, e.g., 'pulsing.actors.worker.TransformersWorker' addr: Actor System bind address - seeds: List of seed nodes - max_new_tokens: Maximum tokens to generate - role: Worker role ('aggregated', 'prefill', 'decode') - mlx_device: MLX device type for macOS ('gpu' or 'cpu') - metal_memory_fraction: Metal memory fraction for macOS (0.0-1.0) + seeds: List of seed node addresses + name: Actor name + extra_kwargs: Additional CLI arguments to pass to Actor constructor """ + import inspect from pulsing.actor.helpers import spawn_and_run - from ..actors.vllm import VllmWorker - - print(f"Starting vLLM Worker (model={model}, namespace={namespace}, role={role})") - print(f" Max tokens: {max_new_tokens}") - - # Display macOS Metal configuration - import platform - - if platform.system() == "Darwin": - mlx_info = mlx_device or "gpu (default)" - if metal_memory_fraction is not None: - try: - memory_value = float(metal_memory_fraction) - memory_info = f"{memory_value:.2f}" - except (ValueError, TypeError): - memory_info = str(metal_memory_fraction) - else: - memory_info = "0.8 (default)" - print( - f" macOS Metal support: MLX device={mlx_info}, memory fraction={memory_info}" - ) + print(f"Loading Actor class: {actor_type}") + + # Load Actor class + try: + actor_class = load_actor_class(actor_type) + except (ImportError, ValueError, AttributeError) as e: + print(f"Error: {e}") + return + + print(f" Class: {actor_class.__name__}") + print(f" Module: {actor_class.__module__}") + + # Get Actor constructor signature + try: + actor_sig = inspect.signature(actor_class.__init__) + except Exception as e: + print(f"Error: Cannot inspect Actor constructor: {e}") + return + + # Parameters that should NOT be passed to Actor constructor + cli_only_params = { + "actor_type", + "addr", + "seeds", + "name", + "self", + } - async def run(): - worker = VllmWorker( - model=model, - role=role, - max_new_tokens=max_new_tokens, - mlx_device=mlx_device, - metal_memory_fraction=metal_memory_fraction, - ) + # Get Actor constructor parameter names + actor_params = set(actor_sig.parameters.keys()) - {"self"} + + # Extract Actor constructor parameters from extra_kwargs + constructor_kwargs = {} + + # Filter extra_kwargs to only include Actor constructor parameters + if extra_kwargs: + for param_name, value in extra_kwargs.items(): + # Convert kebab-case to snake_case for matching + snake_case_name = param_name.replace("-", "_") + + # Check if this parameter matches Actor constructor + if ( + snake_case_name in actor_params + and snake_case_name not in cli_only_params + ): + # Get type hint if available for conversion + param = actor_sig.parameters.get(snake_case_name) + if param and param.annotation != inspect.Parameter.empty: + try: + # Type conversion based on annotation + annotation = param.annotation + if annotation is bool and isinstance(value, str): + value = value.lower() in ("true", "1", "yes", "on") + elif annotation is int and isinstance(value, str): + value = int(value) + elif annotation is float and isinstance(value, str): + value = float(value) + except (ValueError, TypeError): + pass # Keep original value if conversion fails + + constructor_kwargs[snake_case_name] = value + + # Show constructor parameters + if constructor_kwargs: + print(" Constructor parameters:") + for key, value in constructor_kwargs.items(): + # Truncate long values for display + value_str = str(value) + if len(value_str) > 50: + value_str = value_str[:47] + "..." + print(f" {key}: {value_str}") + else: + print(" No constructor parameters provided (using defaults)") + + print(f" Actor name: {name}") + if seeds: + print(f" Seeds: {', '.join(seeds)}") + if addr: + print(f" Address: {addr}") + async def run(): + try: + # Instantiate Actor + actor_instance = actor_class(**constructor_kwargs) + except TypeError as e: + print(f"\nError: Failed to instantiate {actor_class.__name__}") + print(f" {e}") + print("\nHint: Check the constructor signature and provided parameters.") + required_params = [ + name + for name, param in actor_sig.parameters.items() + if name != "self" and param.default == inspect.Parameter.empty + ] + if required_params: + print(f" Required parameters: {', '.join(required_params)}") + print(f" All constructor parameters: {', '.join(actor_params)}") + print(f" Provided parameters: {', '.join(constructor_kwargs.keys())}") + return + except Exception as e: + print(f"\nError: Failed to create Actor instance: {e}") + import traceback + + traceback.print_exc() + return + + # Spawn and run await spawn_and_run( - worker, - name="worker", + actor_instance, + name=name, addr=addr, seeds=seeds if seeds else None, public=True, diff --git a/python/pulsing/cli/inspect.py b/python/pulsing/cli/inspect.py index 7a9ba4711..464cdb250 100644 --- a/python/pulsing/cli/inspect.py +++ b/python/pulsing/cli/inspect.py @@ -1,97 +1,624 @@ -"""Pulsing CLI - System inspection""" +"""Pulsing CLI - System inspection (observer mode via HTTP API) -import asyncio +This module provides lightweight inspection tools that query actor systems +via HTTP/2 endpoints without joining the gossip cluster. +""" -import uvloop +import json +import subprocess +import time +from collections import defaultdict +from typing import Any +# Maximum number of nodes to display +MAX_NODES_DISPLAY = 64 -def inspect_system(seeds: list[str]): - """Inspect the actor system state""" - from pulsing.actor import SystemConfig, create_actor_system - async def run(): - if not seeds: - print("Error: --seeds is required for 'inspect' command") +def http_get_sync(url: str, timeout: float = 10.0) -> dict | list | None: + """Make HTTP/2 GET request using curl (which supports h2c), returns JSON""" + try: + result = subprocess.run( + [ + "curl", + "-s", # Silent + "--http2-prior-knowledge", # Use h2c (HTTP/2 over cleartext) + "-m", + str(int(timeout)), # Timeout + url, + ], + capture_output=True, + text=True, + timeout=timeout + 2, + ) + if result.returncode == 0 and result.stdout: + return json.loads(result.stdout) + return None + except Exception: + return None + + +def http_get_text_sync(url: str, timeout: float = 10.0) -> str | None: + """Make HTTP/2 GET request using curl, returns raw text (for /metrics endpoint)""" + try: + result = subprocess.run( + [ + "curl", + "-s", # Silent + "--http2-prior-knowledge", # Use h2c (HTTP/2 over cleartext) + "-m", + str(int(timeout)), # Timeout + url, + ], + capture_output=True, + text=True, + timeout=timeout + 2, + ) + if result.returncode == 0 and result.stdout: + return result.stdout + return None + except Exception: + return None + + +def normalize_address(addr: str) -> str: + """Normalize address to http://host:port format""" + if ":" not in addr: + addr = f"{addr}:8000" + if not addr.startswith("http"): + addr = f"http://{addr}" + return addr + + +def get_cluster_members(seeds: list[str], timeout: float) -> list[dict] | None: + """Get cluster members from first available seed""" + for seed in seeds: + normalized = normalize_address(seed) + members = http_get_sync(f"{normalized}/cluster/members", timeout) + if members: + return members + return None + + +def get_alive_members(members: list[dict]) -> list[dict]: + """Filter alive members from cluster members list""" + return [m for m in members if m.get("status") == "Alive"] + + +def inspect_cluster(seeds: list[str], timeout: float = 10.0, best_effort: bool = False): + """Inspect cluster members and status""" + print(f"Connecting to cluster via seeds: {seeds}...") + + members = get_cluster_members(seeds, timeout) + if not members: + print("Error: Cannot connect to any seed node") + if not best_effort: return + return - print(f"Connecting to cluster via seeds: {seeds}...") + alive_members = get_alive_members(members) - # If seeds are local, bind to 127.0.0.1 to ensure connectivity - if any(s.startswith("127.0.0.1") or s.startswith("localhost") for s in seeds): - config = SystemConfig.with_addr("127.0.0.1:0").with_seeds(seeds) - else: - config = SystemConfig.standalone().with_seeds(seeds) + # Count by status + status_counts = defaultdict(int) + for m in members: + status = m.get("status", "Unknown") + status_counts[status] += 1 + + print(f"\nCluster Status: {len(members)} total nodes ({len(alive_members)} alive)") + print("=" * 80) + + # Status summary + print("\nStatus Summary:") + for status, count in sorted(status_counts.items()): + print(f" {status}: {count}") + + # Member details + if len(alive_members) > MAX_NODES_DISPLAY: + print( + f"\nWarning: Cluster has {len(alive_members)} alive nodes, " + f"showing first {MAX_NODES_DISPLAY}" + ) + alive_members = alive_members[:MAX_NODES_DISPLAY] + + print(f"\n{'Node ID':<20} {'Address':<30} {'Status':<15}") + print("-" * 80) + + for member in sorted(alive_members, key=lambda m: m.get("node_id", 0)): + node_id = str(member.get("node_id", "-")) + addr = member.get("addr", "-") + status = member.get("status", "Unknown") + print(f"{node_id:<20} {addr:<30} {status:<15}") + + # Show non-alive members if any + non_alive = [m for m in members if m.get("status") != "Alive"] + if non_alive: + print(f"\nNon-Alive Members ({len(non_alive)}):") + for member in non_alive: + node_id = str(member.get("node_id", "-")) + addr = member.get("addr", "-") + status = member.get("status", "Unknown") + print(f" {node_id} ({addr}) - {status}") + + print("\n" + "=" * 80) + + +def inspect_actors( + seeds: list[str] | None = None, + endpoint: str | None = None, + timeout: float = 10.0, + best_effort: bool = False, + top: int | None = None, + filter: str | None = None, + all_actors: bool = False, + json_output: bool = False, + detailed: bool = False, +): + """ + Inspect actors from a single node or cluster. + + Supports two modes: + - Single node: Use --endpoint to query one node + - Cluster: Use --seeds to query all nodes in cluster (aggregated view) + """ + # Handle single endpoint mode + if endpoint: + return _inspect_single_node( + endpoint, timeout, all_actors, json_output, detailed, filter + ) + + # Cluster mode (original behavior) + if not seeds: + print("Error: Either --endpoint or --seeds is required.") + print() + print("Usage:") + print(" pulsing inspect actors --endpoint 127.0.0.1:8000") + print(" pulsing inspect actors --seeds 127.0.0.1:8000") + return + + _inspect_cluster_actors( + seeds, timeout, best_effort, top, filter, all_actors, json_output + ) - system = await create_actor_system(config) - # Give some time for discovery - await asyncio.sleep(1.5) +def _inspect_single_node( + endpoint: str, + timeout: float, + all_actors: bool, + json_output: bool, + detailed: bool, + filter_str: str | None = None, +): + """Inspect actors from a single node""" + # Normalize endpoint + if ":" not in endpoint: + endpoint = f"{endpoint}:8000" + normalized = normalize_address(endpoint) - members = await system.members() - print(f"\nCluster Status: {len(members)} nodes found") - print("=" * 60) + print(f"Connecting to {normalized}...") - # Get all named actors automatically - all_named_actors = {} + url = f"{normalized}/actors" + if all_actors: + url += "?all=true" + + actors = http_get_sync(url, timeout) + if actors is None: + print(f"Error: Cannot connect to {normalized}") + return + + print(f"Connected to {normalized}") + print() + + # Apply filter if specified + if filter_str: + actors = [a for a in actors if filter_str.lower() in a.get("name", "").lower()] + + if json_output: + print(json.dumps(actors, indent=2)) + return + + _print_actors_table(actors, detailed) + + +def _print_actors_table(actors_data: list[dict], detailed: bool = False): + """Print actors in table format""" + if not actors_data: + print(" No actors found.") + return + + # Check for errors + if actors_data and "error" in actors_data[0]: + print(f" Error: {actors_data[0]['error']}") + return + + # Check if we have detailed info (class, module) + has_details = detailed or any(a.get("class") for a in actors_data) + + if has_details: + print( + f" {'Name':<25} {'Type':<8} {'Actor ID':<20} {'Class':<25} {'Module':<30}" + ) + print(f" {'-'*110}") + + for actor in actors_data: + name = actor.get("name", "") + actor_type = actor.get("type", "user") + actor_id = actor.get("actor_id", "-") + if isinstance(actor_id, (int, float)): + actor_id = str(int(actor_id)) + elif not isinstance(actor_id, str): + actor_id = str(actor_id) if actor_id else "-" + cls = actor.get("class", "-") + module = actor.get("module", "-") + print(f" {name:<25} {actor_type:<8} {actor_id:<20} {cls:<25} {module:<30}") + else: + print(f" {'Name':<40} {'Type':<10} {'Actor ID':<20}") + print(f" {'-'*72}") + + for actor in actors_data: + name = actor.get("name", "") + actor_type = actor.get("type", "user") + actor_id = actor.get("actor_id", "-") + if isinstance(actor_id, (int, float)): + actor_id = str(int(actor_id)) + elif not isinstance(actor_id, str): + actor_id = str(actor_id) if actor_id else "-" + print(f" {name:<40} {actor_type:<10} {actor_id:<20}") + + print(f"\n Total: {len(actors_data)} actor(s)") + + +def _inspect_cluster_actors( + seeds: list[str], + timeout: float, + best_effort: bool, + top: int | None, + filter: str | None, + all_actors: bool, + json_output: bool, +): + """Inspect actors distribution across cluster (aggregated view)""" + # Convert top to int if it's a string (hyperparameter may pass as string) + if top is not None and not isinstance(top, int): try: - for info in await system.all_named_actors(): - path = str(info.get("path", "")) - name = path[7:] if path.startswith("actors/") else path - if info.get("instance_count", 0) > 0: - try: - instances = await system.get_named_instances(name) - if instances: - all_named_actors[name] = instances - except Exception: - pass - except Exception as e: - print(f" [Warning] Failed to get all named actors: {e}") - - # Group named actors by node - node_actors = {} - for name, instances in all_named_actors.items(): - for inst in instances: - node_actors.setdefault(str(inst.get("node_id")), []).append(name) - - # Display nodes and their actors - for member in members: - node_id = str(member.get("node_id")) - print(f"\nNode: {node_id} ({member.get('addr')}) [{member.get('status')}]") - - if member.get("status") != "Alive": - print(" [Node is not alive]") + top = int(top) + except (ValueError, TypeError): + top = None + print(f"Connecting to cluster via seeds: {seeds}...") + + members = get_cluster_members(seeds, timeout) + if not members: + print("Error: Cannot connect to any seed node") + if not best_effort: + return + return + + alive_members = get_alive_members(members) + print(f"Found {len(alive_members)} alive nodes") + + # Collect actors from each node + # actor_name -> list of (node_id, actor_id) tuples + actor_distribution: dict[str, list[tuple[str, str]]] = defaultdict(list) + failed_nodes = [] + + for member in alive_members: + addr = member.get("addr") + node_id = str(member.get("node_id", "-")) + + if not addr: + continue + + normalized = normalize_address(addr) + url = f"{normalized}/actors" + if all_actors: + url += "?all=true" + + actors = http_get_sync(url, timeout) + if actors is None: + failed_nodes.append((node_id, addr)) + if not best_effort: + print(f"Error: Cannot connect to node {node_id} ({addr})") + continue + + # Extract actor names and IDs + for actor in actors: + name = actor.get("name", "") + if not name: continue - actors = node_actors.get(node_id, []) - if not actors: - print(" [No named actors on this node]") + # Apply filter if specified + if filter and filter.lower() not in name.lower(): continue - # Group by base type - actor_groups = {} - for name in actors: - base = name.rsplit("_", 1)[0] if "_" in name else name - actor_groups.setdefault(base, []).append(name) + # Get actor_id if available + actor_id = actor.get("actor_id", "") + if isinstance(actor_id, (int, float)): + actor_id = str(int(actor_id)) + elif not isinstance(actor_id, str): + actor_id = str(actor_id) if actor_id else "-" - print(f" Named Actors ({len(actors)}):") - for base, names in sorted(actor_groups.items()): - if len(names) == 1: - print(f" - actors/{names[0]}") + actor_distribution[name].append((node_id, actor_id)) + + if failed_nodes: + print(f"\nWarning: Failed to query {len(failed_nodes)} node(s):") + for node_id, addr in failed_nodes: + print(f" {node_id} ({addr})") + + if not actor_distribution: + print("\nNo actors found.") + return + + # Sort by instance count (descending) + sorted_actors = sorted( + actor_distribution.items(), key=lambda x: len(x[1]), reverse=True + ) + + # Apply top filter + if top is not None and top > 0: + sorted_actors = sorted_actors[:top] + + print(f"\nActor Distribution ({len(actor_distribution)} unique actors):") + print("=" * 100) + print( + f"{'Actor Name':<30} {'Actor ID (node:local)':<25} {'Instances':<12} {'Nodes':<30}" + ) + print("-" * 100) + + for actor_name, node_actor_pairs in sorted_actors: + instance_count = len(node_actor_pairs) + # Get unique node IDs + unique_nodes = sorted(set(node_id for node_id, _ in node_actor_pairs)) + nodes_str = ", ".join(unique_nodes[:5]) + if len(unique_nodes) > 5: + nodes_str += f" ... (+{len(unique_nodes) - 5} more)" + + # Get actor IDs - collect all unique IDs + actor_ids = [actor_id for _, actor_id in node_actor_pairs if actor_id != "-"] + if actor_ids: + unique_actor_ids = sorted(set(actor_ids)) + if len(unique_actor_ids) == 1: + # All instances have same ID (single instance or same ID across nodes) + actor_id_str = unique_actor_ids[0] + else: + # Multiple different IDs, show first one and indicate more + actor_id_str = unique_actor_ids[0] + if len(unique_actor_ids) > 1: + actor_id_str += f" (+{len(unique_actor_ids) - 1} more)" + else: + actor_id_str = "-" + + print( + f"{actor_name:<30} {actor_id_str:<20} {instance_count:<12} {nodes_str:<30}" + ) + + # Summary + total_instances = sum(len(pairs) for pairs in actor_distribution.values()) + unique_nodes = set() + for pairs in actor_distribution.values(): + unique_nodes.update(node_id for node_id, _ in pairs) + print("\n" + "=" * 100) + print( + f"Total: {len(actor_distribution)} unique actors, {total_instances} instances" + ) + print(f"Across {len(unique_nodes)} nodes") + + +def inspect_metrics( + seeds: list[str], timeout: float = 10.0, best_effort: bool = False, raw: bool = True +): + """Inspect Prometheus metrics from cluster nodes""" + print(f"Connecting to cluster via seeds: {seeds}...") + + members = get_cluster_members(seeds, timeout) + if not members: + print("Error: Cannot connect to any seed node") + if not best_effort: + return + return + + alive_members = get_alive_members(members) + print(f"Found {len(alive_members)} alive nodes\n") + + failed_nodes = [] + + for i, member in enumerate(alive_members): + addr = member.get("addr") + node_id = str(member.get("node_id", "-")) + + if not addr: + continue + + normalized = normalize_address(addr) + metrics = http_get_text_sync(f"{normalized}/metrics", timeout) + + if metrics is None: + failed_nodes.append((node_id, addr)) + if not best_effort: + print(f"Error: Cannot connect to node {node_id} ({addr})") + continue + + if raw: + # Output raw metrics (as text) + print(f"{'='*80}") + print(f"[{i+1}/{len(alive_members)}] Node {node_id} ({addr})") + print(f"{'='*80}") + print(metrics) + print() + else: + # Summary mode: extract key metrics + lines = metrics.split("\n") + key_metrics = [] + for line in lines: + line = line.strip() + if not line or line.startswith("#"): + continue + # Look for important metrics + if any( + keyword in line + for keyword in [ + "pulsing_cluster_members", + "pulsing_actor_messages", + "pulsing_actor_", + ] + ): + key_metrics.append(line) + print(f"Node {node_id} ({addr}):") + if key_metrics: + for metric in key_metrics[:20]: # Limit to top 20 + print(f" {metric}") + else: + print(" (no key metrics found)") + print() + + if failed_nodes: + print(f"\nWarning: Failed to query {len(failed_nodes)} node(s):") + for node_id, addr in failed_nodes: + print(f" {node_id} ({addr})") + + if not raw: + print("=" * 80) + print("(Use --raw True to see full metrics)") + + +def inspect_watch( + seeds: list[str], + timeout: float = 10.0, + best_effort: bool = False, + interval: float = 1.0, + kind: str = "all", + max_rounds: int | None = None, +): + """Watch cluster state changes""" + # Convert max_rounds to int if it's a string (hyperparameter may pass as string) + if max_rounds is not None and not isinstance(max_rounds, int): + try: + max_rounds = int(max_rounds) + except (ValueError, TypeError): + max_rounds = None + + # Convert interval to float if it's a string + if not isinstance(interval, (int, float)): + try: + interval = float(interval) + except (ValueError, TypeError): + interval = 1.0 + + print(f"Watching cluster via seeds: {seeds}...") + print(f"Refresh interval: {interval}s, Watching: {kind}") + print("Press Ctrl+C to stop\n") + + previous_state: dict[str, Any] = {} + + round_count = 0 + + try: + while True: + if max_rounds is not None and round_count >= max_rounds: + break + + round_count += 1 + current_time = time.strftime("%H:%M:%S") + + # Collect current state + current_state: dict[str, Any] = {} + + if kind in ("cluster", "all"): + members = get_cluster_members(seeds, timeout) + if members: + alive = get_alive_members(members) + current_state["cluster"] = { + "total": len(members), + "alive": len(alive), + "members": { + str(m.get("node_id")): m.get("status") for m in members + }, + } + + if kind in ("actors", "all"): + members = get_cluster_members(seeds, timeout) + if members: + alive = get_alive_members(members) + actor_distribution: dict[str, int] = defaultdict(int) + for member in alive: + addr = member.get("addr") + if not addr: + continue + normalized = normalize_address(addr) + actors = http_get_sync(f"{normalized}/actors", timeout) + if actors: + for actor in actors: + name = actor.get("name", "") + if name: + actor_distribution[name] += 1 + current_state["actors"] = dict(actor_distribution) + + # Detect and print changes + if previous_state: + changes = [] + + if "cluster" in current_state and "cluster" in previous_state: + prev = previous_state["cluster"] + curr = current_state["cluster"] + + if prev["alive"] != curr["alive"]: + changes.append( + f"Alive nodes: {prev['alive']} -> {curr['alive']}" + ) + + # Check member status changes + prev_members = prev.get("members", {}) + curr_members = curr.get("members", {}) + for node_id, status in curr_members.items(): + prev_status = prev_members.get(node_id) + if prev_status != status: + changes.append(f"Node {node_id}: {prev_status} -> {status}") + + if "actors" in current_state and "actors" in previous_state: + prev_actors = previous_state["actors"] + curr_actors = current_state["actors"] + + # New actors + new_actors = set(curr_actors.keys()) - set(prev_actors.keys()) + if new_actors: + changes.append( + f"New actors: {', '.join(sorted(new_actors)[:5])}" + ) + + # Removed actors + removed_actors = set(prev_actors.keys()) - set(curr_actors.keys()) + if removed_actors: + changes.append( + f"Removed actors: {', '.join(sorted(removed_actors)[:5])}" + ) + + # Count changes + for actor_name in set(curr_actors.keys()) & set(prev_actors.keys()): + if prev_actors[actor_name] != curr_actors[actor_name]: + changes.append( + f"{actor_name}: {prev_actors[actor_name]} -> {curr_actors[actor_name]} instances" + ) + + if changes: + print(f"[{current_time}] Changes detected:") + for change in changes: + print(f" • {change}") + print() else: - print(f" - actors/{base}_* ({len(names)} instances)") - for name in sorted(names)[:5]: - print(f" • {name}") - if len(names) > 5: - print(f" ... and {len(names) - 5} more") - - # Summary - if all_named_actors: - total = sum(len(instances) for instances in all_named_actors.values()) - print( - f"\nTotal Named Actors: {len(all_named_actors)} types, {total} instances" - ) - - print("\n" + "=" * 60) - await system.shutdown() - - uvloop.run(run()) + print(f"[{current_time}] No changes") + else: + # First round - just show current state + if "cluster" in current_state: + c = current_state["cluster"] + print( + f"[{current_time}] Initial state: {c['alive']}/{c['total']} nodes alive" + ) + if "actors" in current_state: + a = current_state["actors"] + print( + f"[{current_time}] Initial state: {len(a)} unique actors, {sum(a.values())} total instances" + ) + print() + + previous_state = current_state + time.sleep(interval) + + except KeyboardInterrupt: + print("\n\nWatch stopped.") diff --git a/tests/python/test_actor_list.py b/tests/python/test_actor_list.py index 89003448a..39987ecdf 100644 --- a/tests/python/test_actor_list.py +++ b/tests/python/test_actor_list.py @@ -2,8 +2,9 @@ import asyncio import pytest -from pulsing.actor import init, remote, get_system -from pulsing.cli.actor_list import list_actors_impl +import json +from pulsing.actor import init, remote, get_system, list_actors +from pulsing.cli.inspect import _print_actors_table import io import sys @@ -26,18 +27,55 @@ async def test_actor_list_basic(): await init() system = get_system() - # Create some actors - await TestCounter.remote(system, name="counter-1") - await TestCounter.remote(system, name="counter-2") - await TestCalculator.remote(system, name="calc") + # Create some actors locally (list_actors only returns local actors) + await TestCounter.local(system, name="counter-1") + await TestCounter.local(system, name="counter-2") + await TestCalculator.local(system, name="calc") + + # Wait a bit for actors to be registered in the system + await asyncio.sleep(0.2) # Capture output old_stdout = sys.stdout sys.stdout = buffer = io.StringIO() try: - # List user actors only - await list_actors_impl(all_actors=False, output_format="table") + # Use all_named_actors and get_named_instances instead of list_actors + # list_actors uses SystemActor registry which may not include Python actors + # all_named_actors uses gossip protocol and includes all named actors + all_named = await system.all_named_actors() + + # Build actors list similar to HTTP API format + actors_data = [] + for info in all_named: + path_str = str(info.get("path", "")) + if path_str == "system/core": + continue + + name = path_str[7:] if path_str.startswith("actors/") else path_str + + # Skip internal actors + if name.startswith("_"): + continue + + # Get instances for this actor + instance_count = info.get("instance_count", 0) + if instance_count > 0: + try: + instances = await system.get_named_instances(name) + for inst in instances: + # Check if this instance is on this node + if str(inst.get("node_id")) == str(system.node_id.id): + actor_data = { + "name": name, + "type": "user", + "actor_id": str(inst.get("actor_id", "")), + } + actors_data.append(actor_data) + except Exception: + pass + + _print_actors_table(actors_data) output = buffer.getvalue() # Check output contains actor names @@ -62,16 +100,47 @@ async def test_actor_list_all(): await init() system = get_system() - # Create one user actor - await TestCounter.remote(system, name="test-counter") + # Create one user actor locally (list_actors only returns local actors) + await TestCounter.local(system, name="test-counter") + + # Wait a bit for actors to be registered in the system + await asyncio.sleep(0.2) # Capture output old_stdout = sys.stdout sys.stdout = buffer = io.StringIO() try: - # List all actors - await list_actors_impl(all_actors=True, output_format="table") + # Use all_named_actors and get_named_instances instead of list_actors + all_named = await system.all_named_actors() + + # Build actors list similar to HTTP API format (include all actors) + actors_data = [] + for info in all_named: + path_str = str(info.get("path", "")) + if path_str == "system/core": + continue + + name = path_str[7:] if path_str.startswith("actors/") else path_str + + # Get instances for this actor + instance_count = info.get("instance_count", 0) + if instance_count > 0: + try: + instances = await system.get_named_instances(name) + for inst in instances: + # Check if this instance is on this node + if str(inst.get("node_id")) == str(system.node_id.id): + actor_data = { + "name": name, + "type": "system" if name.startswith("_") else "user", + "actor_id": str(inst.get("actor_id", "")), + } + actors_data.append(actor_data) + except Exception: + pass + + _print_actors_table(actors_data) output = buffer.getvalue() # Check output contains user actor @@ -90,19 +159,54 @@ async def test_actor_list_json(): await init() system = get_system() - await TestCounter.remote(system, name="json-test") + # Create actor locally (list_actors only returns local actors) + await TestCounter.local(system, name="json-test") + + # Wait a bit for actors to be registered in the system + await asyncio.sleep(0.2) # Capture output old_stdout = sys.stdout sys.stdout = buffer = io.StringIO() try: - await list_actors_impl(all_actors=False, output_format="json") + # Use all_named_actors and get_named_instances instead of list_actors + all_named = await system.all_named_actors() + + # Build actors list similar to HTTP API format + actors_data = [] + for info in all_named: + path_str = str(info.get("path", "")) + if path_str == "system/core": + continue + + name = path_str[7:] if path_str.startswith("actors/") else path_str + + # Filter internal actors + if name.startswith("_"): + continue + + # Get instances for this actor + instance_count = info.get("instance_count", 0) + if instance_count > 0: + try: + instances = await system.get_named_instances(name) + for inst in instances: + # Check if this instance is on this node + if str(inst.get("node_id")) == str(system.node_id.id): + actor_data = { + "name": name, + "type": "user", + "actor_id": str(inst.get("actor_id", "")), + } + actors_data.append(actor_data) + except Exception: + pass + + print(json.dumps(actors_data, indent=2)) output = buffer.getvalue() # Should be valid JSON - import json - data = json.loads(output) # Check structure @@ -113,7 +217,9 @@ async def test_actor_list_json(): actor = data[0] assert "name" in actor assert "type" in actor - assert "uptime" in actor + assert "actor_id" in actor + # Note: uptime may not be available from get_named_instances + # HTTP API includes it from instance metadata, but we're using direct API finally: sys.stdout = old_stdout diff --git a/tests/python/test_cli_actor.py b/tests/python/test_cli_actor.py new file mode 100644 index 000000000..0bca36913 --- /dev/null +++ b/tests/python/test_cli_actor.py @@ -0,0 +1,36 @@ +"""Test CLI actor command + +Tests for the actor CLI command using function-level unit tests. +Since all CLI commands are functions (via hp.param), we can test them directly. +""" + +import pytest +from unittest.mock import patch, MagicMock, AsyncMock + +from pulsing.cli.__main__ import actor as actor_cli + + +class TestActorCLI: + """Test actor CLI command""" + + def test_actor_list_deprecated(self, capsys): + """Test that 'list' subcommand shows deprecation message""" + actor_cli( + actor_type="list", + seeds="127.0.0.1:8000", + ) + captured = capsys.readouterr() + assert "pulsing actor list" in captured.out + assert "pulsing inspect actors" in captured.out + + def test_actor_invalid_class_path(self): + """Test error for invalid class path (no dots)""" + with pytest.raises(ValueError, match="must be a full class path"): + actor_cli(actor_type="router") + + def test_actor_invalid_class_path_message(self): + """Test error message shows correct format""" + with pytest.raises(ValueError) as exc_info: + actor_cli(actor_type="router") + assert "full class path" in str(exc_info.value) + assert "pulsing.actors.worker.TransformersWorker" in str(exc_info.value) diff --git a/tests/python/test_cli_inspect.py b/tests/python/test_cli_inspect.py new file mode 100644 index 000000000..e26333931 --- /dev/null +++ b/tests/python/test_cli_inspect.py @@ -0,0 +1,434 @@ +"""Test CLI inspect command + +Tests for the inspect CLI subcommands using function-level unit tests. +Since all CLI commands are functions (via hp.param), we can test them directly. +""" + +import io +import json +import sys +from unittest.mock import patch, MagicMock + +import pytest + +from pulsing.cli.__main__ import inspect as inspect_cli +from pulsing.cli.inspect import ( + inspect_cluster, + inspect_actors, + inspect_metrics, + normalize_address, + get_cluster_members, + get_alive_members, +) + + +class TestHelperFunctions: + """Test helper functions in inspect module""" + + def test_normalize_address(self): + """Test address normalization""" + assert normalize_address("127.0.0.1:8000") == "http://127.0.0.1:8000" + assert normalize_address("127.0.0.1") == "http://127.0.0.1:8000" + assert normalize_address("http://127.0.0.1:8000") == "http://127.0.0.1:8000" + + def test_get_alive_members(self): + """Test filtering alive members""" + members = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + {"node_id": 2, "addr": "127.0.0.1:8001", "status": "Suspect"}, + {"node_id": 3, "addr": "127.0.0.1:8002", "status": "Alive"}, + {"node_id": 4, "addr": "127.0.0.1:8003", "status": "Failed"}, + ] + alive = get_alive_members(members) + assert len(alive) == 2 + assert alive[0]["node_id"] == 1 + assert alive[1]["node_id"] == 3 + + @patch("pulsing.cli.inspect.http_get_sync") + def test_get_cluster_members(self, mock_http_get): + """Test getting cluster members from seeds""" + # First seed fails, second succeeds + mock_http_get.side_effect = [None, [{"node_id": 1, "status": "Alive"}]] + + members = get_cluster_members(["127.0.0.1:8000", "127.0.0.1:8001"], timeout=5.0) + assert members is not None + assert len(members) == 1 + assert members[0]["node_id"] == 1 + + # All seeds fail + mock_http_get.side_effect = [None, None] + members = get_cluster_members(["127.0.0.1:8000", "127.0.0.1:8001"], timeout=5.0) + assert members is None + + +class TestInspectCluster: + """Test inspect cluster subcommand""" + + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_cluster_success(self, mock_get_members): + """Test successful cluster inspection""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + {"node_id": 2, "addr": "127.0.0.1:8001", "status": "Alive"}, + {"node_id": 3, "addr": "127.0.0.1:8002", "status": "Suspect"}, + ] + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_cluster(["127.0.0.1:8000"], timeout=5.0) + output = buffer.getvalue() + + assert "Connecting to cluster via seeds" in output + assert "Cluster Status: 3 total nodes (2 alive)" in output + assert "Status Summary" in output + assert "Alive: 2" in output + assert "Suspect: 1" in output + assert "Node ID" in output + assert "127.0.0.1:8000" in output + assert "127.0.0.1:8001" in output + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_cluster_no_connection(self, mock_get_members): + """Test cluster inspection when no connection available""" + mock_get_members.return_value = None + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_cluster(["127.0.0.1:8000"], timeout=5.0, best_effort=False) + output = buffer.getvalue() + + assert "Error: Cannot connect to any seed node" in output + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_cluster_best_effort(self, mock_get_members): + """Test cluster inspection with best_effort=True""" + mock_get_members.return_value = None + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + # Should not raise, just return silently + inspect_cluster(["127.0.0.1:8000"], timeout=5.0, best_effort=True) + output = buffer.getvalue() + + assert "Error: Cannot connect to any seed node" in output + finally: + sys.stdout = old_stdout + + +class TestInspectActors: + """Test inspect actors subcommand""" + + @patch("pulsing.cli.inspect.http_get_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_actors_success(self, mock_get_members, mock_http_get): + """Test successful actors inspection""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + {"node_id": 2, "addr": "127.0.0.1:8001", "status": "Alive"}, + ] + + # Mock actors from each node + mock_http_get.side_effect = [ + [{"name": "worker-1"}, {"name": "worker-2"}], # Node 1 + [{"name": "worker-1"}, {"name": "router"}], # Node 2 + ] + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_actors(["127.0.0.1:8000"], timeout=5.0) + output = buffer.getvalue() + + assert "Found 2 alive nodes" in output + assert "Actor Distribution" in output + assert "worker-1" in output + assert "worker-2" in output + assert "router" in output + assert "Total:" in output + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.http_get_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_actors_with_filter(self, mock_get_members, mock_http_get): + """Test actors inspection with filter""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + ] + + mock_http_get.side_effect = [ + [{"name": "worker-1"}, {"name": "router-1"}, {"name": "other"}], + ] + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_actors(["127.0.0.1:8000"], timeout=5.0, filter="worker") + output = buffer.getvalue() + + assert "worker-1" in output + assert "router-1" not in output + assert "other" not in output + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.http_get_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_actors_with_top(self, mock_get_members, mock_http_get): + """Test actors inspection with top limit""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + ] + + # Create many actors + actors = [{"name": f"actor-{i}"} for i in range(20)] + mock_http_get.side_effect = [actors] + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_actors(["127.0.0.1:8000"], timeout=5.0, top=5) + output = buffer.getvalue() + + # Should only show top 5 + assert "actor-0" in output + assert "actor-4" in output + # Should not show all 20 + lines = output.split("\n") + actor_lines = [ + line for line in lines if "actor-" in line and "Actor Name" not in line + ] + assert len(actor_lines) <= 5 + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.http_get_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_actors_partial_failure(self, mock_get_members, mock_http_get): + """Test actors inspection with some node failures""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + {"node_id": 2, "addr": "127.0.0.1:8001", "status": "Alive"}, + ] + + # First node succeeds, second fails + mock_http_get.side_effect = [ + [{"name": "worker-1"}], # Node 1 + None, # Node 2 fails + ] + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_actors(["127.0.0.1:8000"], timeout=5.0, best_effort=True) + output = buffer.getvalue() + + assert "worker-1" in output + assert ( + "Warning: Failed to query" in output + or "Error: Cannot connect" in output + ) + finally: + sys.stdout = old_stdout + + +class TestInspectMetrics: + """Test inspect metrics subcommand""" + + @patch("pulsing.cli.inspect.http_get_text_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_metrics_raw(self, mock_get_members, mock_http_get_text): + """Test metrics inspection with raw output""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + ] + + mock_http_get_text.return_value = ( + "# HELP test_metric Test metric\ntest_metric 1.0\n" + ) + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_metrics(["127.0.0.1:8000"], timeout=5.0, raw=True) + output = buffer.getvalue() + + assert "Node 1" in output + assert "test_metric" in output + assert "# HELP" in output + finally: + sys.stdout = old_stdout + + @patch("pulsing.cli.inspect.http_get_text_sync") + @patch("pulsing.cli.inspect.get_cluster_members") + def test_inspect_metrics_summary(self, mock_get_members, mock_http_get_text): + """Test metrics inspection with summary output""" + mock_get_members.return_value = [ + {"node_id": 1, "addr": "127.0.0.1:8000", "status": "Alive"}, + ] + + metrics_text = """# HELP pulsing_cluster_members Cluster members +pulsing_cluster_members{status="Alive"} 2.0 +# HELP pulsing_actor_messages Actor messages +pulsing_actor_messages_total 100.0 +# HELP other_metric Other metric +other_metric 5.0 +""" + + mock_http_get_text.return_value = metrics_text + + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_metrics(["127.0.0.1:8000"], timeout=5.0, raw=False) + output = buffer.getvalue() + + assert "Node 1" in output + assert "pulsing_cluster_members" in output + assert "pulsing_actor_messages" in output + # Should not show other_metric (not a key metric) + assert "other_metric" not in output or "(no key metrics found)" in output + finally: + sys.stdout = old_stdout + + +class TestCLIEntryPoint: + """Test CLI entry point function""" + + @patch("pulsing.cli.inspect.inspect_cluster") + def test_inspect_cli_cluster_subcommand(self, mock_inspect_cluster): + """Test CLI entry point with cluster subcommand""" + inspect_cli( + subcommand="cluster", + seeds="127.0.0.1:8000", + timeout=5.0, + best_effort=False, + ) + + mock_inspect_cluster.assert_called_once_with( + ["127.0.0.1:8000"], timeout=5.0, best_effort=False + ) + + @patch("pulsing.cli.inspect.inspect_actors") + def test_inspect_cli_actors_subcommand(self, mock_inspect_actors): + """Test CLI entry point with actors subcommand""" + inspect_cli( + subcommand="actors", + seeds="127.0.0.1:8000,127.0.0.1:8001", + timeout=5.0, + best_effort=False, + top=10, + filter="worker", + all_actors=False, + ) + + mock_inspect_actors.assert_called_once_with( + seeds=["127.0.0.1:8000", "127.0.0.1:8001"], + endpoint=None, + timeout=5.0, + best_effort=False, + top=10, + filter="worker", + all_actors=False, + json_output=False, + detailed=False, + ) + + @patch("pulsing.cli.inspect.inspect_metrics") + def test_inspect_cli_metrics_subcommand(self, mock_inspect_metrics): + """Test CLI entry point with metrics subcommand""" + inspect_cli( + subcommand="metrics", + seeds="127.0.0.1:8000", + timeout=5.0, + best_effort=False, + raw=False, + ) + + mock_inspect_metrics.assert_called_once_with( + ["127.0.0.1:8000"], timeout=5.0, best_effort=False, raw=False + ) + + @patch("pulsing.cli.inspect.inspect_watch") + def test_inspect_cli_watch_subcommand(self, mock_inspect_watch): + """Test CLI entry point with watch subcommand""" + inspect_cli( + subcommand="watch", + seeds="127.0.0.1:8000", + timeout=5.0, + best_effort=False, + interval=2.0, + kind="cluster", + max_rounds=5, + ) + + mock_inspect_watch.assert_called_once_with( + ["127.0.0.1:8000"], + timeout=5.0, + best_effort=False, + interval=2.0, + kind="cluster", + max_rounds=5, + ) + + def test_inspect_cli_no_seeds(self): + """Test CLI entry point without seeds (should show error)""" + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_cli(subcommand="cluster", seeds=None) + output = buffer.getvalue() + + assert "Error: --seeds is required" in output + finally: + sys.stdout = old_stdout + + def test_inspect_cli_unknown_subcommand(self): + """Test CLI entry point with unknown subcommand""" + old_stdout = sys.stdout + sys.stdout = buffer = io.StringIO() + + try: + inspect_cli(subcommand="unknown", seeds="127.0.0.1:8000") + output = buffer.getvalue() + + assert "Unknown subcommand" in output or "Error" in output + finally: + sys.stdout = old_stdout + + def test_inspect_cli_seeds_parsing(self): + """Test that seeds are properly parsed (comma-separated, trimmed)""" + with patch("pulsing.cli.inspect.inspect_cluster") as mock_inspect: + inspect_cli( + subcommand="cluster", + seeds="127.0.0.1:8000, 127.0.0.1:8001 , 127.0.0.1:8002", + ) + + # Should parse and trim seeds + mock_inspect.assert_called_once() + call_args = mock_inspect.call_args[0][ + 0 + ] # First positional arg (seeds list) + assert len(call_args) == 3 + assert "127.0.0.1:8000" in call_args + assert "127.0.0.1:8001" in call_args + assert "127.0.0.1:8002" in call_args + # Should be trimmed (no spaces) + assert all(" " not in seed for seed in call_args) diff --git a/tests/python/test_rest_api.py b/tests/python/test_rest_api.py index 23e7a98e2..dc6a54a28 100644 --- a/tests/python/test_rest_api.py +++ b/tests/python/test_rest_api.py @@ -1,18 +1,11 @@ -"""Test REST API endpoints for actor list and cluster info""" +"""Test REST API endpoints for actor list and cluster info -import asyncio -import pytest -from unittest.mock import patch, MagicMock -import json -import subprocess +Note: Most functions have been moved to pulsing.cli.inspect. +This file is kept for backward compatibility testing of _print_actors_table. +""" -from pulsing.cli.actor_list import ( - query_single_endpoint, - query_cluster, - _print_output, - _print_actors_table, - http_get_sync, -) +import pytest +from pulsing.cli.inspect import _print_actors_table class TestRestApiHelpers: @@ -48,245 +41,6 @@ def test_print_actors_table_empty(self, capsys): captured = capsys.readouterr() assert "No actors found" in captured.out - def test_print_output_table(self, capsys): - """Test _print_output with table format""" - actors = [{"name": "test", "type": "user", "class": "Test", "module": "app"}] - _print_output(actors, "table") - captured = capsys.readouterr() - assert "test" in captured.out - - def test_print_output_json(self, capsys): - """Test _print_output with JSON format""" - actors = [{"name": "test", "type": "user", "class": "Test", "module": "app"}] - _print_output(actors, "json") - captured = capsys.readouterr() - data = json.loads(captured.out) - assert len(data) == 1 - assert data[0]["name"] == "test" - - -class TestHttpGetSync: - """Test http_get_sync function""" - - def test_http_get_sync_success(self): - """Test successful HTTP GET request""" - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = json.dumps({"status": "ok"}) - - with patch("subprocess.run", return_value=mock_result): - result = http_get_sync("http://127.0.0.1:8000/health") - assert result == {"status": "ok"} - - def test_http_get_sync_failure(self): - """Test HTTP GET request failure""" - mock_result = MagicMock() - mock_result.returncode = 7 # Connection refused - - with patch("subprocess.run", return_value=mock_result): - result = http_get_sync("http://127.0.0.1:8000/health") - assert result is None - - def test_http_get_sync_invalid_json(self): - """Test HTTP GET with invalid JSON response""" - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = "not valid json" - - with patch("subprocess.run", return_value=mock_result): - result = http_get_sync("http://127.0.0.1:8000/health") - assert result is None - - -class TestQuerySingleEndpoint: - """Test query_single_endpoint function""" - - @pytest.mark.asyncio - async def test_query_single_endpoint_success(self): - """Test successful query to single endpoint""" - # /actors endpoint returns a flat array of actors - mock_actors_response = [ - { - "name": "counter", - "type": "user", - "actor_id": "123:1", - "class": "Counter", - "module": "__main__", - "file": "/app/main.py", - } - ] - - with patch("pulsing.cli.actor_list.http_get_sync") as mock_http: - mock_http.return_value = mock_actors_response - result = await query_single_endpoint( - "127.0.0.1:8000", all_actors=False, output_format="table" - ) - assert result is True - - @pytest.mark.asyncio - async def test_query_single_endpoint_connection_error(self): - """Test query with connection error""" - with patch("pulsing.cli.actor_list.http_get_sync") as mock_http: - mock_http.return_value = None - result = await query_single_endpoint( - "127.0.0.1:8000", all_actors=False, output_format="table" - ) - assert result is False - - @pytest.mark.asyncio - async def test_query_single_endpoint_json_format(self, capsys): - """Test query with JSON output format""" - # /actors endpoint returns a flat array - mock_response = [ - { - "name": "test", - "type": "user", - "actor_id": "123:1", - "class": "Test", - "module": "app", - } - ] - - with patch("pulsing.cli.actor_list.http_get_sync") as mock_http: - mock_http.return_value = mock_response - result = await query_single_endpoint( - "127.0.0.1:8000", all_actors=False, output_format="json" - ) - assert result is True - captured = capsys.readouterr() - # Extract JSON from output (skip connection info lines) - lines = captured.out.strip().split("\n") - json_start = next( - i for i, line in enumerate(lines) if line.strip().startswith("[") - ) - json_str = "\n".join(lines[json_start:]) - data = json.loads(json_str) - assert len(data) == 1 - - -class TestQueryCluster: - """Test query_cluster function""" - - @pytest.mark.asyncio - async def test_query_cluster_success(self): - """Test successful cluster query""" - members_response = [ - {"node_id": "123", "addr": "127.0.0.1:8001", "status": "Alive"}, - {"node_id": "456", "addr": "127.0.0.1:8002", "status": "Alive"}, - ] - - # /actors returns a flat array of actors - actors_response = [ - { - "name": "counter", - "type": "user", - "actor_id": "123:1", - "class": "Counter", - "module": "__main__", - } - ] - - def mock_http_get(url): - if "/cluster/members" in url: - return members_response - elif "/actors" in url: - return actors_response - return None - - with patch("pulsing.cli.actor_list.http_get_sync", side_effect=mock_http_get): - result = await query_cluster( - ["127.0.0.1:8000"], all_actors=False, output_format="table" - ) - assert result is True - - @pytest.mark.asyncio - async def test_query_cluster_no_members(self): - """Test cluster query with no members found""" - with patch("pulsing.cli.actor_list.http_get_sync") as mock_http: - mock_http.return_value = None - result = await query_cluster( - ["127.0.0.1:8000"], all_actors=False, output_format="table" - ) - assert result is False - - -class TestActorMetadataParsing: - """Test actor metadata parsing from API responses""" - - def test_parse_actor_with_full_metadata(self): - """Test parsing actor with complete metadata""" - actor_data = { - "path": "actors/my-counter", - "detailed_instances": [ - { - "node_id": 12345, - "actor_id": "12345:42", - "class": "MyCounter", - "module": "myapp.counters", - "file": "/app/myapp/counters.py", - } - ], - } - - path = actor_data.get("path", "") - name = path[7:] if path.startswith("actors/") else path - assert name == "my-counter" - - instances = actor_data.get("detailed_instances", []) - assert len(instances) == 1 - - inst = instances[0] - assert inst.get("actor_id") == "12345:42" - assert inst.get("class") == "MyCounter" - assert inst.get("module") == "myapp.counters" - assert inst.get("file") == "/app/myapp/counters.py" - - def test_parse_actor_with_minimal_metadata(self): - """Test parsing actor with minimal metadata""" - actor_data = { - "path": "actors/simple", - "detailed_instances": [{"node_id": 12345}], - } - - instances = actor_data.get("detailed_instances", []) - inst = instances[0] - - # Missing fields should return None or default - assert inst.get("actor_id") is None - assert inst.get("class") is None - assert inst.get("module") is None - - def test_parse_system_actor(self): - """Test parsing system/internal actor""" - actor_data = { - "path": "actors/_python_actor_service", - "detailed_instances": [ - { - "node_id": 12345, - "actor_id": "12345:0", - "class": "PythonActorService", - "module": "pulsing.actor.remote", - } - ], - } - - path = actor_data.get("path", "") - name = path[7:] if path.startswith("actors/") else path - - # Internal actors start with _ - assert name.startswith("_") - assert name == "_python_actor_service" - - def test_filter_internal_actors(self): - """Test filtering internal actors""" - actors = [ - {"path": "actors/counter", "type": "user"}, - {"path": "actors/_internal", "type": "system"}, - {"path": "actors/calc", "type": "user"}, - ] - - user_only = [a for a in actors if not a["path"].split("/")[-1].startswith("_")] - assert len(user_only) == 2 - - all_actors = actors - assert len(all_actors) == 3 + # Note: Tests for removed functions (http_get_sync, query_single_endpoint, query_cluster, _print_output) + # have been removed. These functions are now in pulsing.cli.inspect. + # Use 'pulsing inspect actors' for CLI usage.