From 92abb23f05b92fe30c0f23d108cf47f29d67735c Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Thu, 16 Apr 2026 11:48:07 +0800 Subject: [PATCH] Inter-node communication --- .mise.toml | 11 +- Cargo.toml | 17 +- README.md | 12 +- libs/edge-toolkit/Cargo.toml | 6 +- libs/edge-toolkit/src/ws.rs | 60 ++ services/ws-modules/comm1/Cargo.toml | 24 + services/ws-modules/comm1/src/lib.rs | 207 +++++++ services/ws-modules/face-detection/Cargo.toml | 4 +- services/ws-modules/face-detection/src/lib.rs | 19 +- services/ws-modules/har1/Cargo.toml | 4 +- services/ws-modules/har1/src/lib.rs | 19 +- services/ws-server/Cargo.toml | 5 +- services/ws-server/src/main.rs | 550 ++++++++++++++++-- services/ws-server/static/app.js | 143 +++-- services/ws-server/static/index.html | 40 +- services/ws-wasm-agent/Cargo.toml | 5 +- services/ws-wasm-agent/src/lib.rs | 148 ++++- 17 files changed, 1063 insertions(+), 211 deletions(-) create mode 100644 services/ws-modules/comm1/Cargo.toml create mode 100644 services/ws-modules/comm1/src/lib.rs diff --git a/.mise.toml b/.mise.toml index 96b7d5e..c8ebc5d 100644 --- a/.mise.toml +++ b/.mise.toml @@ -4,8 +4,10 @@ action-validator = "latest" "cargo:wasm-pack" = "latest" "chromedriver" = "146" cmake = "latest" +codex = "latest" dprint = "latest" editorconfig-checker = "latest" +gemini-cli = "latest" "github:block/goose" = "latest" "github:wasm-bindgen/wasm-bindgen" = "0.2.114" ollama = "latest" @@ -102,8 +104,13 @@ description = "Build the face detection workflow WASM module" dir = "services/ws-modules/face-detection" run = "wasm-pack build . --target web" -[tasks.build] -depends = ["build-ws-face-detection-module", "build-ws-har1-module", "build-ws-wasm-agent"] +[tasks.build-ws-comm1-module] +description = "Build the comm1 workflow WASM module" +dir = "services/ws-modules/comm1" +run = "wasm-pack build . --target web" + +[tasks.build-wasm] +depends = ["build-ws-comm1-module", "build-ws-face-detection-module", "build-ws-har1-module", "build-ws-wasm-agent"] description = "Build all WebAssembly modules" [tasks.test-ws-wasm-agent-firefox] diff --git a/Cargo.toml b/Cargo.toml index 7eeb046..c7da4bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,17 @@ +[workspace.package] +edition = "2024" +license = "Apache-2.0 and MIT" +repository = "https://github.com/edge-toolkit/core" +rust-version = "1.87.0" + [workspace] members = [ - "libs/edge-toolkit", - "services/ws-modules/face-detection", - "services/ws-modules/har1", - "services/ws-server", - "services/ws-wasm-agent", + "libs/edge-toolkit", + "services/ws-modules/comm1", + "services/ws-modules/face-detection", + "services/ws-modules/har1", + "services/ws-server", + "services/ws-wasm-agent", ] resolver = "2" diff --git a/README.md b/README.md index c3f5ce7..3428d7c 100644 --- a/README.md +++ b/README.md @@ -30,25 +30,21 @@ and save it as `services/ws-server/static/models/human_activity_recognition.onnx 2. Save it in `services/ws-server/static/models/` 3. Rename the file to `video_cv.onnx`. -### Build and run the agent +### Build WASM and run the WS server ```bash -mise run build-ws-wasm-agent -mise run build-ws-har1-module -mise run build-ws-face-detection-module +mise run build-wasm mise run ws-server ``` -The WASM build disables WebAssembly reference types so it can still load on older browsers such as Chrome 95. +The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95. Find the IP address of your laptop in the local network, which will normally be something like 192.168.1.x. Then on your phone, open Chrome and type in https://192.168.1.x:8433/ -Click "har demo". - -For webcam inference, click "face demo". +Select the module to run in the drop down, then click "Run module" button. ## Grant diff --git a/libs/edge-toolkit/Cargo.toml b/libs/edge-toolkit/Cargo.toml index ac91a16..4d666f2 100644 --- a/libs/edge-toolkit/Cargo.toml +++ b/libs/edge-toolkit/Cargo.toml @@ -2,9 +2,9 @@ name = "edge-toolkit" description = "A collection of utilities and common code for Edge Toolkit services" version = "0.1.0" -edition = "2024" -license = "Apache-2.0 or MIT" -repository = "https://github.com/edge-toolkit/core" +edition.workspace = true +license.workspace = true +repository.workspace = true [dependencies] serde.workspace = true diff --git a/libs/edge-toolkit/src/ws.rs b/libs/edge-toolkit/src/ws.rs index a9def10..e3e1973 100644 --- a/libs/edge-toolkit/src/ws.rs +++ b/libs/edge-toolkit/src/ws.rs @@ -7,6 +7,36 @@ pub enum ConnectStatus { Reconnected, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MessageDeliveryStatus { + Delivered, + Queued, + Acknowledged, + Broadcast, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MessageScope { + Direct, + Broadcast, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum AgentConnectionState { + Connected, + Disconnected, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentSummary { + pub agent_id: String, + pub state: AgentConnectionState, + pub last_known_ip: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum WsMessage { @@ -20,6 +50,36 @@ pub enum WsMessage { Alive { timestamp: String, }, + ListAgents, + ListAgentsResponse { + agents: Vec, + }, + SendAgentMessage { + to_agent_id: String, + message: serde_json::Value, + }, + BroadcastMessage { + message: serde_json::Value, + }, + AgentMessage { + message_id: String, + from_agent_id: String, + scope: MessageScope, + server_received_at: String, + message: serde_json::Value, + }, + MessageAck { + message_id: String, + }, + MessageStatus { + message_id: Option, + status: MessageDeliveryStatus, + detail: String, + }, + Invalid { + message_id: Option, + detail: String, + }, ClientEvent { capability: String, action: String, diff --git a/services/ws-modules/comm1/Cargo.toml b/services/ws-modules/comm1/Cargo.toml new file mode 100644 index 0000000..5385cfe --- /dev/null +++ b/services/ws-modules/comm1/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "et-ws-comm1" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +edge-toolkit = { path = "../../../libs/edge-toolkit" } +et-ws-wasm-agent = { path = "../../ws-wasm-agent" } +js-sys = "0.3" +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-wasm = "0.2" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" +web-sys = { version = "0.3", features = ["Window", "console"] } + +[dev-dependencies] +wasm-bindgen-test = "0.3" diff --git a/services/ws-modules/comm1/src/lib.rs b/services/ws-modules/comm1/src/lib.rs new file mode 100644 index 0000000..773355e --- /dev/null +++ b/services/ws-modules/comm1/src/lib.rs @@ -0,0 +1,207 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use edge_toolkit::ws::{AgentConnectionState, AgentSummary, WsMessage}; +use et_ws_wasm_agent::{WsClient, WsClientConfig, append_to_textarea}; +use js_sys::{Promise, Reflect}; +use serde_json::json; +use tracing::info; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::JsFuture; + +const LIST_AGENTS_POLL_MS: i32 = 1_000; +const MESSAGE_PAUSE_MS: i32 = 3_000; + +#[wasm_bindgen(start)] +pub fn init() { + tracing_wasm::set_as_global_default(); + info!("comm1 workflow module initialized"); +} + +#[wasm_bindgen] +pub async fn run() -> Result<(), JsValue> { + log("comm1: entered run()")?; + + let ws_url = websocket_url()?; + log(&format!("comm1: resolved websocket URL: {ws_url}"))?; + + let mut client = WsClient::new(WsClientConfig::new(ws_url)); + let self_agent_id = Rc::new(RefCell::new(String::new())); + let other_connected_agents: Rc>> = Rc::new(RefCell::new(Vec::new())); + + let on_message = Closure::wrap(Box::new({ + let self_agent_id = self_agent_id.clone(); + let other_connected_agents = other_connected_agents.clone(); + move |value: JsValue| { + let Some(data) = value.as_string() else { + return; + }; + + let Ok(message) = serde_json::from_str::(&data) else { + return; + }; + + match message { + WsMessage::ListAgentsResponse { agents } => { + let own_id = self_agent_id.borrow().clone(); + let others = agents + .into_iter() + .filter(|agent| { + agent.state == AgentConnectionState::Connected + && !own_id.is_empty() + && agent.agent_id != own_id + }) + .collect::>(); + *other_connected_agents.borrow_mut() = others; + } + WsMessage::AgentMessage { + message_id, + from_agent_id, + scope, + server_received_at, + message, + } => { + let summary = + serde_json::to_string(&message).unwrap_or_else(|_| String::from("")); + let line = format!( + "comm1: received {:?} message {} from {} at {}: {}", + scope, message_id, from_agent_id, server_received_at, summary + ); + web_sys::console::log_1(&JsValue::from_str(&line)); + let _ = set_module_status(&line); + } + WsMessage::MessageStatus { + message_id, + status, + detail, + } => { + let line = format!("comm1: message status update {:?} {:?}: {}", message_id, status, detail); + web_sys::console::log_1(&JsValue::from_str(&line)); + let _ = set_module_status(&line); + } + WsMessage::Invalid { message_id, detail } => { + let line = format!("comm1: invalid server response {:?}: {}", message_id, detail); + web_sys::console::warn_1(&JsValue::from_str(&line)); + let _ = set_module_status(&line); + } + _ => {} + } + } + }) as Box); + client.set_on_message(on_message.as_ref().clone()); + + client.connect()?; + wait_for_connected(&client).await?; + let agent_id = wait_for_agent_id(&client).await?; + *self_agent_id.borrow_mut() = agent_id.clone(); + let msg = format!("comm1: websocket connected with agent_id={agent_id}"); + log(&msg)?; + set_module_status(&msg)?; + + let target_agent = loop { + client.request_list_agents()?; + sleep_ms(LIST_AGENTS_POLL_MS).await?; + + let agents = other_connected_agents.borrow().clone(); + if let Some(agent) = agents.first() { + break agent.clone(); + } + }; + + let msg = format!( + "comm1: found connected peer agent {}; sending broadcast", + target_agent.agent_id + ); + log(&msg)?; + set_module_status(&msg)?; + client.broadcast_message(json!({ + "module": "comm1", + "step": "broadcast", + "from_agent_id": agent_id, + "message": "comm1 broadcast to all other connected agents" + }))?; + + sleep_ms(MESSAGE_PAUSE_MS).await?; + + let msg = format!("comm1: sending direct message to {}", target_agent.agent_id); + log(&msg)?; + set_module_status(&msg)?; + client.send_agent_message( + target_agent.agent_id.clone(), + json!({ + "module": "comm1", + "step": "direct", + "from_agent_id": agent_id, + "message": "comm1 direct message" + }), + )?; + + sleep_ms(MESSAGE_PAUSE_MS).await?; + client.disconnect(); + let msg = "comm1: workflow complete"; + log(msg)?; + set_module_status(msg)?; + Ok(()) +} + +fn log(message: &str) -> Result<(), JsValue> { + let line = format!("[comm1] {message}"); + web_sys::console::log_1(&JsValue::from_str(&line)); + Ok(()) +} + +fn set_module_status(message: &str) -> Result<(), JsValue> { + append_to_textarea("module-output", message) +} + +async fn wait_for_connected(client: &WsClient) -> Result<(), JsValue> { + for _ in 0..100 { + if client.get_state() == "connected" { + return Ok(()); + } + sleep_ms(100).await?; + } + + Err(JsValue::from_str("Timed out waiting for websocket connection")) +} + +async fn wait_for_agent_id(client: &WsClient) -> Result { + for _ in 0..100 { + let agent_id = client.get_client_id(); + if !agent_id.is_empty() { + return Ok(agent_id); + } + sleep_ms(100).await?; + } + + Err(JsValue::from_str("Timed out waiting for assigned agent_id")) +} + +async fn sleep_ms(duration_ms: i32) -> Result<(), JsValue> { + let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?; + let promise = Promise::new(&mut |resolve, reject| { + let callback = Closure::once_into_js(move || { + let _ = resolve.call0(&JsValue::NULL); + }); + + if let Err(error) = + window.set_timeout_with_callback_and_timeout_and_arguments_0(callback.unchecked_ref(), duration_ms) + { + let _ = reject.call1(&JsValue::NULL, &error); + } + }); + JsFuture::from(promise).await.map(|_| ()) +} + +fn websocket_url() -> Result { + let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?; + let location = Reflect::get(window.as_ref(), &JsValue::from_str("location"))?; + let protocol = Reflect::get(&location, &JsValue::from_str("protocol"))? + .as_string() + .ok_or_else(|| JsValue::from_str("window.location.protocol is unavailable"))?; + let host = Reflect::get(&location, &JsValue::from_str("host"))? + .as_string() + .ok_or_else(|| JsValue::from_str("window.location.host is unavailable"))?; + let ws_protocol = if protocol == "https:" { "wss:" } else { "ws:" }; + Ok(format!("{ws_protocol}//{host}/ws")) +} diff --git a/services/ws-modules/face-detection/Cargo.toml b/services/ws-modules/face-detection/Cargo.toml index a56553f..e4e2ce4 100644 --- a/services/ws-modules/face-detection/Cargo.toml +++ b/services/ws-modules/face-detection/Cargo.toml @@ -1,7 +1,9 @@ [package] name = "et-ws-face-detection" version = "0.1.0" -edition = "2024" +edition.workspace = true +license.workspace = true +repository.workspace = true [lib] crate-type = ["cdylib", "rlib"] diff --git a/services/ws-modules/face-detection/src/lib.rs b/services/ws-modules/face-detection/src/lib.rs index 2dbd874..deb5e95 100644 --- a/services/ws-modules/face-detection/src/lib.rs +++ b/services/ws-modules/face-detection/src/lib.rs @@ -1,7 +1,7 @@ use std::cell::{Cell, RefCell}; use std::rc::Rc; -use et_ws_wasm_agent::{VideoCapture, WsClient, WsClientConfig}; +use et_ws_wasm_agent::{VideoCapture, WsClient, WsClientConfig, set_textarea_value}; use js_sys::{Array, Float32Array, Function, Promise, Reflect}; use serde_json::json; use tracing::info; @@ -706,7 +706,7 @@ fn face_detach_stream() { } fn face_set_status(message: &str) { - let _ = set_textarea_value("face-output", message); + let _ = set_textarea_value("module-output", message); } fn face_capture_input_tensor() -> Result { @@ -808,21 +808,6 @@ fn image_data_to_tensor(image_data: &ImageData) -> Vec { tensor_data } -fn set_textarea_value(element_id: &str, message: &str) -> Result<(), JsValue> { - if let Some(window) = web_sys::window() - && let Some(document) = window.document() - && let Some(output) = document.get_element_by_id(element_id) - { - Reflect::set( - output.as_ref(), - &JsValue::from_str("value"), - &JsValue::from_str(message), - )?; - } - - Ok(()) -} - fn face_video_element() -> Result { let document = web_sys::window() .and_then(|window| window.document()) diff --git a/services/ws-modules/har1/Cargo.toml b/services/ws-modules/har1/Cargo.toml index 0ee8ead..9d32d00 100644 --- a/services/ws-modules/har1/Cargo.toml +++ b/services/ws-modules/har1/Cargo.toml @@ -1,7 +1,9 @@ [package] name = "et-ws-har1" version = "0.1.0" -edition = "2024" +edition.workspace = true +license.workspace = true +repository.workspace = true [lib] crate-type = ["cdylib", "rlib"] diff --git a/services/ws-modules/har1/src/lib.rs b/services/ws-modules/har1/src/lib.rs index 6c7fe65..06cbfb9 100644 --- a/services/ws-modules/har1/src/lib.rs +++ b/services/ws-modules/har1/src/lib.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use et_ws_wasm_agent::{DeviceSensors, MotionReading, WsClient, WsClientConfig}; +use et_ws_wasm_agent::{DeviceSensors, MotionReading, WsClient, WsClientConfig, set_textarea_value}; use js_sys::{Array, Float32Array, Function, Promise, Reflect}; use serde_json::json; use tracing::info; @@ -262,22 +262,7 @@ fn log(message: &str) -> Result<(), JsValue> { } fn set_har_status(message: &str) -> Result<(), JsValue> { - set_textarea_value("har-output", message) -} - -fn set_textarea_value(element_id: &str, message: &str) -> Result<(), JsValue> { - if let Some(window) = web_sys::window() - && let Some(document) = window.document() - && let Some(output) = document.get_element_by_id(element_id) - { - js_sys::Reflect::set( - output.as_ref(), - &JsValue::from_str("value"), - &JsValue::from_str(message), - )?; - } - - Ok(()) + set_textarea_value("module-output", message) } fn format_number(value: f64, digits: usize) -> String { diff --git a/services/ws-server/Cargo.toml b/services/ws-server/Cargo.toml index 5b4d02b..f8f5d03 100644 --- a/services/ws-server/Cargo.toml +++ b/services/ws-server/Cargo.toml @@ -1,8 +1,9 @@ [package] name = "et-ws-server" version = "0.1.0" -edition = "2021" -license = "Apache-2.0 or MIT" +edition.workspace = true +license.workspace = true +repository.workspace = true [dependencies] actix = "0.13" diff --git a/services/ws-server/src/main.rs b/services/ws-server/src/main.rs index 176ea50..1400fbf 100644 --- a/services/ws-server/src/main.rs +++ b/services/ws-server/src/main.rs @@ -1,21 +1,23 @@ -use std::collections::HashSet; +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use actix::{Actor, ActorContext, AsyncContext, StreamHandler}; +use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler}; use actix_files::{Files, NamedFile}; -use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer, web}; use actix_web_actors::ws; use chrono::Utc; -use edge_toolkit::ws::{ConnectStatus, WsMessage}; +use edge_toolkit::ws::{ + AgentConnectionState, AgentSummary, ConnectStatus, MessageDeliveryStatus, MessageScope, WsMessage, +}; use opentelemetry::{ global, trace::{Span, Tracer}, }; use opentelemetry_sdk::trace::SdkTracerProvider as TracerProvider; -use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use rustls::ServerConfig; +use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use tracing::{error, info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use uuid::Uuid; @@ -34,24 +36,223 @@ fn init_tracing() -> opentelemetry_sdk::trace::SdkTracerProvider { provider } +#[derive(Message)] +#[rtype(result = "()")] +struct ServerEnvelope { + message: WsMessage, +} + +#[derive(Debug, Clone)] +struct PendingDirectMessage { + message_id: String, + from_agent_id: String, + server_received_at: String, + message: serde_json::Value, +} + +#[derive(Debug, Clone)] +struct AgentRecord { + state: AgentConnectionState, + last_known_ip: Option, + session: Option>, + pending_direct_messages: BTreeMap, +} + #[derive(Clone, Default)] struct AgentRegistry { - issued_agent_ids: Arc>>, + agents: Arc>>, +} + +enum DirectSendResult { + Delivered { + pending: PendingDirectMessage, + recipient_addr: Addr, + }, + Queued { + pending: PendingDirectMessage, + }, +} + +enum AckResult { + Acknowledged { + message_id: String, + sender_addr: Option>, + sender_agent_id: String, + recipient_agent_id: String, + }, + Invalid { + detail: String, + }, +} + +impl AgentRegistry { + fn connect_agent( + &self, + requested_id: Option, + client_ip: &str, + session: Addr, + ) -> (String, ConnectStatus) { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + + if let Some(requested_id) = requested_id + && let Some(record) = agents.get_mut(&requested_id) + { + record.state = AgentConnectionState::Connected; + record.last_known_ip = Some(client_ip.to_string()); + record.session = Some(session); + return (requested_id, ConnectStatus::Reconnected); + } + + let assigned_id = Uuid::now_v7().to_string(); + agents.insert( + assigned_id.clone(), + AgentRecord { + state: AgentConnectionState::Connected, + last_known_ip: Some(client_ip.to_string()), + session: Some(session), + pending_direct_messages: BTreeMap::new(), + }, + ); + (assigned_id, ConnectStatus::Assigned) + } + + fn mark_disconnected(&self, agent_id: &str) { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + if let Some(record) = agents.get_mut(agent_id) { + record.state = AgentConnectionState::Disconnected; + record.session = None; + } + } + + fn list_agents(&self) -> Vec { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + let mut summaries = agents + .iter() + .map(|(agent_id, record)| AgentSummary { + agent_id: agent_id.clone(), + state: record.state.clone(), + last_known_ip: record.last_known_ip.clone(), + }) + .collect::>(); + summaries.sort_by(|left, right| left.agent_id.cmp(&right.agent_id)); + summaries + } + + fn queue_or_deliver_direct( + &self, + from_agent_id: &str, + to_agent_id: &str, + server_received_at: String, + message: serde_json::Value, + ) -> DirectSendResult { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + let recipient = agents + .get_mut(to_agent_id) + .expect("queue_or_deliver_direct called for unknown target agent"); + + let pending = PendingDirectMessage { + message_id: Uuid::now_v7().to_string(), + from_agent_id: from_agent_id.to_string(), + server_received_at, + message, + }; + recipient + .pending_direct_messages + .insert(from_agent_id.to_string(), pending); + + if let Some(recipient_addr) = recipient.session.clone() { + DirectSendResult::Delivered { + pending: recipient + .pending_direct_messages + .get(from_agent_id) + .expect("pending direct message was just inserted") + .clone(), + recipient_addr, + } + } else { + DirectSendResult::Queued { + pending: recipient + .pending_direct_messages + .get(from_agent_id) + .expect("pending direct message was just inserted") + .clone(), + } + } + } + + fn pending_messages_for(&self, agent_id: &str) -> Vec { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + agents + .get(agent_id) + .map(|record| { + let mut pending = record.pending_direct_messages.values().cloned().collect::>(); + pending.sort_by(|left, right| left.message_id.cmp(&right.message_id)); + pending + }) + .unwrap_or_default() + } + + fn acknowledge_message(&self, recipient_agent_id: &str, message_id: &str) -> AckResult { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + let Some(recipient) = agents.get_mut(recipient_agent_id) else { + return AckResult::Invalid { + detail: format!("unknown acknowledging agent {}", recipient_agent_id), + }; + }; + + let Some(sender_agent_id) = recipient + .pending_direct_messages + .iter() + .find_map(|(sender_agent_id, pending)| (pending.message_id == message_id).then(|| sender_agent_id.clone())) + else { + return AckResult::Invalid { + detail: "no pending message to acknowledge".to_string(), + }; + }; + + let pending = recipient + .pending_direct_messages + .remove(&sender_agent_id) + .expect("pending direct message disappeared during acknowledgement"); + let sender_addr = agents.get(&sender_agent_id).and_then(|record| record.session.clone()); + + AckResult::Acknowledged { + message_id: pending.message_id, + sender_addr, + sender_agent_id, + recipient_agent_id: recipient_agent_id.to_string(), + } + } + + fn connected_recipient_addrs(&self, excluding_agent_id: &str) -> Vec<(String, Addr)> { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + agents + .iter() + .filter_map(|(agent_id, record)| { + if agent_id == excluding_agent_id { + return None; + } + record.session.clone().map(|addr| (agent_id.clone(), addr)) + }) + .collect() + } } // WebSocket actor for handling connections struct WebSocketActor { agent_id: Option, last_activity: Instant, + client_ip: String, registry: AgentRegistry, } impl WebSocketActor { - fn new(registry: AgentRegistry) -> Self { - info!("New WebSocket actor created without assigned agent_id"); + fn new(registry: AgentRegistry, client_ip: String) -> Self { + info!("New WebSocket actor created for client IP {}", client_ip); Self { agent_id: None, last_activity: Instant::now(), + client_ip, registry, } } @@ -60,6 +261,10 @@ impl WebSocketActor { self.agent_id.as_deref().unwrap_or("unassigned") } + fn assigned_agent_id(&self) -> Option<&str> { + self.agent_id.as_deref() + } + fn mark_activity(&mut self) { self.last_activity = Instant::now(); } @@ -85,24 +290,78 @@ impl WebSocketActor { }); } - fn assign_or_reconnect_agent(&mut self, requested_id: Option) -> (String, ConnectStatus) { - let mut issued_agent_ids = self - .registry - .issued_agent_ids - .lock() - .expect("agent registry lock poisoned"); - - if let Some(requested_id) = requested_id { - if issued_agent_ids.contains(&requested_id) { - self.agent_id = Some(requested_id.clone()); - return (requested_id, ConnectStatus::Reconnected); + fn assign_or_reconnect_agent( + &mut self, + requested_id: Option, + session: Addr, + ) -> (String, ConnectStatus) { + let (assigned_id, status) = self.registry.connect_agent(requested_id, &self.client_ip, session); + self.agent_id = Some(assigned_id.clone()); + (assigned_id, status) + } + + fn send_json(ctx: &mut ws::WebsocketContext, response: &WsMessage) { + match serde_json::to_string(response) { + Ok(json) => { + ctx.text(json); + let tracer = global::tracer("ws-server"); + let mut sent_span = tracer.start("ws.message.sent"); + sent_span.end(); + } + Err(error) => { + error!("Failed to serialize websocket response: {}", error); } } + } - let assigned_id = Uuid::now_v7().to_string(); - issued_agent_ids.insert(assigned_id.clone()); - self.agent_id = Some(assigned_id.clone()); - (assigned_id, ConnectStatus::Assigned) + fn send_status( + ctx: &mut ws::WebsocketContext, + message_id: Option, + status: MessageDeliveryStatus, + detail: impl Into, + ) { + Self::send_json( + ctx, + &WsMessage::MessageStatus { + message_id, + status, + detail: detail.into(), + }, + ); + } + + fn send_invalid(ctx: &mut ws::WebsocketContext, message_id: Option, detail: impl Into) { + Self::send_json( + ctx, + &WsMessage::Invalid { + message_id, + detail: detail.into(), + }, + ); + } + + fn deliver_pending_messages(&self, ctx: &mut ws::WebsocketContext) { + let Some(agent_id) = self.assigned_agent_id() else { + return; + }; + let pending_messages = self.registry.pending_messages_for(agent_id); + if pending_messages.is_empty() { + return; + } + for pending in pending_messages { + info!( + "Delivering pending message {} to agent {} from {}", + pending.message_id, agent_id, pending.from_agent_id + ); + let message = WsMessage::AgentMessage { + message_id: pending.message_id, + from_agent_id: pending.from_agent_id, + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, + }; + Self::send_json(ctx, &message); + } } } @@ -112,13 +371,34 @@ impl Actor for WebSocketActor { fn started(&mut self, ctx: &mut Self::Context) { self.start_heartbeat(ctx); info!( - "WebSocket connection established for client: {}", + "WebSocket connection established for client IP {} with agent {}", + self.client_ip, self.current_agent_id() ); let tracer = global::tracer("ws-server"); let mut span = tracer.start("ws.connect"); span.end(); } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + if let Some(agent_id) = self.agent_id.as_deref() { + self.registry.mark_disconnected(agent_id); + info!("Agent {} disconnected; last known IP {}", agent_id, self.client_ip); + } else { + info!( + "WebSocket connection closed before agent assignment for client IP {}", + self.client_ip + ); + } + } +} + +impl Handler for WebSocketActor { + type Result = (); + + fn handle(&mut self, msg: ServerEnvelope, ctx: &mut Self::Context) -> Self::Result { + Self::send_json(ctx, &msg.message); + } } impl StreamHandler> for WebSocketActor { @@ -141,35 +421,188 @@ impl StreamHandler> for WebSocketActor { match msg { WsMessage::Connect { agent_id } => { let requested_id = agent_id.clone(); - let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id); info!( - "Connect message: requested_agent_id={:?} assigned_agent_id={} status={:?}", - requested_id, assigned_id, status + "Connect message: requested_agent_id={:?} client_ip={}", + requested_id, self.client_ip + ); + let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id, ctx.address()); + info!( + "Agent {} status {:?}connected from IP {}", + assigned_id, status, self.client_ip + ); + Self::send_json( + ctx, + &WsMessage::ConnectAck { + agent_id: assigned_id, + status: status.clone(), + }, + ); + info!( + "WebSocket connection ready for client {} with status {:?}", + self.current_agent_id(), + status + ); + self.deliver_pending_messages(ctx); + } + WsMessage::Alive { timestamp } => { + info!("Alive message from client {} at {}", self.current_agent_id(), timestamp); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("Alive message received at {}", Utc::now().to_rfc3339()), + }, + ); + } + WsMessage::ListAgents => { + let agents = self.registry.list_agents(); + info!( + "Agent {} requested list_agents; returning {} agents", + self.current_agent_id(), + agents.len() ); - let response = WsMessage::ConnectAck { - agent_id: assigned_id, - status: status.clone(), + Self::send_json(ctx, &WsMessage::ListAgentsResponse { agents }); + } + WsMessage::SendAgentMessage { to_agent_id, message } => { + let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before sending messages"); + span.end(); + return; + }; + + if from_agent_id == to_agent_id { + Self::send_invalid(ctx, None, "agent cannot send a direct message to itself"); + span.end(); + return; + } + + if !self + .registry + .list_agents() + .iter() + .any(|agent| agent.agent_id == to_agent_id) + { + Self::send_invalid(ctx, None, format!("unknown target agent {}", to_agent_id)); + span.end(); + return; + } + + let server_received_at = Utc::now().to_rfc3339(); + match self.registry.queue_or_deliver_direct( + &from_agent_id, + &to_agent_id, + server_received_at.clone(), + message, + ) { + DirectSendResult::Delivered { + pending, + recipient_addr, + } => { + let message_id = pending.message_id.clone(); + info!( + "Direct message {} delivered from {} to {}", + message_id, from_agent_id, to_agent_id + ); + recipient_addr.do_send(ServerEnvelope { + message: WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id: from_agent_id.clone(), + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, + }, + }); + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Delivered, + format!("message delivered to agent {}", to_agent_id), + ); + } + DirectSendResult::Queued { pending } => { + let message_id = pending.message_id; + info!( + "Direct message {} queued from {} to disconnected agent {}", + message_id, from_agent_id, to_agent_id + ); + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Queued, + format!("message queued for agent {}", to_agent_id), + ); + } + } + } + WsMessage::BroadcastMessage { message } => { + let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before broadcasting messages"); + span.end(); + return; }; - if let Ok(json) = serde_json::to_string(&response) { - ctx.text(json); - let mut sent_span = tracer.start("ws.message.sent"); - sent_span.end(); + + let recipients = self.registry.connected_recipient_addrs(&from_agent_id); + let message_id = Uuid::now_v7().to_string(); + let server_received_at = Utc::now().to_rfc3339(); + for (recipient_id, recipient_addr) in &recipients { info!( - "WebSocket connection ready for client {} with status {:?}", - self.current_agent_id(), - status + "Broadcast message {} from {} to {}", + message_id, from_agent_id, recipient_id ); + recipient_addr.do_send(ServerEnvelope { + message: WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id: from_agent_id.clone(), + scope: MessageScope::Broadcast, + server_received_at: server_received_at.clone(), + message: message.clone(), + }, + }); } + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Broadcast, + format!("broadcast sent to {} connected agents", recipients.len()), + ); } - WsMessage::Alive { timestamp } => { - info!("Alive message from client {} at {}", self.current_agent_id(), timestamp); - let response = WsMessage::Response { - message: format!("Alive message received at {}", Utc::now().to_rfc3339()), + WsMessage::MessageAck { message_id } => { + let Some(recipient_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before acknowledging messages"); + span.end(); + return; }; - if let Ok(json) = serde_json::to_string(&response) { - ctx.text(json); - let mut sent_span = tracer.start("ws.message.sent"); - sent_span.end(); + + match self.registry.acknowledge_message(&recipient_agent_id, &message_id) { + AckResult::Acknowledged { + message_id, + sender_addr, + sender_agent_id, + recipient_agent_id, + } => { + info!( + "Agent {} acknowledged direct message {} from {}", + recipient_agent_id, message_id, sender_agent_id + ); + Self::send_status( + ctx, + Some(message_id.clone()), + MessageDeliveryStatus::Acknowledged, + "message acknowledged", + ); + if let Some(sender_addr) = sender_addr { + sender_addr.do_send(ServerEnvelope { + message: WsMessage::MessageStatus { + message_id: Some(message_id), + status: MessageDeliveryStatus::Acknowledged, + detail: format!("agent {} acknowledged receipt", recipient_agent_id), + }, + }); + } + } + AckResult::Invalid { detail } => { + warn!("Invalid ack from {} for {}: {}", recipient_agent_id, message_id, detail); + Self::send_invalid(ctx, Some(message_id), detail); + } } } WsMessage::ClientEvent { @@ -206,15 +639,17 @@ impl StreamHandler> for WebSocketActor { details ); } - WsMessage::ConnectAck { .. } => { + WsMessage::ConnectAck { .. } + | WsMessage::ListAgentsResponse { .. } + | WsMessage::AgentMessage { .. } + | WsMessage::MessageStatus { .. } + | WsMessage::Invalid { .. } + | WsMessage::Response { .. } => { warn!( - "Unexpected connect_ack message from client: {}", + "Unexpected server-originated message from client {}", self.current_agent_id() ); } - WsMessage::Response { .. } => { - warn!("Unexpected response message from client: {}", self.current_agent_id()); - } } } else { warn!( @@ -235,6 +670,8 @@ impl StreamHandler> for WebSocketActor { let tracer = global::tracer("ws-server"); let mut span = tracer.start("ws.disconnect"); span.end(); + ctx.close(reason); + ctx.stop(); } Ok(ws::Message::Binary(_)) | Ok(ws::Message::Continuation(_)) | Ok(ws::Message::Nop) => { self.mark_activity(); @@ -258,7 +695,18 @@ async fn ws_handler( let tracer = global::tracer("ws-server"); let mut span = tracer.start("ws.connect"); - let result = ws::start(WebSocketActor::new(registry.get_ref().clone()), &req, stream); + let client_ip = req + .peer_addr() + .map(|addr| addr.ip().to_string()) + .or_else(|| { + req.connection_info() + .realip_remote_addr() + .and_then(|addr| addr.split(':').next().map(str::to_string)) + }) + .unwrap_or_else(|| "unknown".to_string()); + + let actor = WebSocketActor::new(registry.get_ref().clone(), client_ip); + let result = ws::start(actor, &req, stream); span.end(); result diff --git a/services/ws-server/static/app.js b/services/ws-server/static/app.js index 331669f..b1ff851 100644 --- a/services/ws-server/static/app.js +++ b/services/ws-server/static/app.js @@ -14,8 +14,8 @@ import init, { } from "/pkg/et_ws_wasm_agent.js"; const logEl = document.getElementById("log"); -const runHarButton = document.getElementById("run-har-button"); -const runFaceDetectionButton = document.getElementById("run-face-detection-button"); +const moduleSelect = document.getElementById("module-select"); +const runModuleButton = document.getElementById("run-module-button"); const micButton = document.getElementById("mic-button"); const videoButton = document.getElementById("video-button"); const bluetoothButton = document.getElementById("bluetooth-button"); @@ -30,7 +30,7 @@ const videoOutputButton = document.getElementById("video-output-button"); const agentStatusEl = document.getElementById("agent-status"); const agentIdEl = document.getElementById("agent-id"); const sensorOutputEl = document.getElementById("sensor-output"); -const videoOutputEl = document.getElementById("video-output"); +const videoOutputEl = document.getElementById("ml-debug-output"); const videoPreview = document.getElementById("video-preview"); const videoOutputCanvas = document.getElementById("video-output-canvas"); let microphone = null; @@ -54,8 +54,7 @@ let videoOverlayContext = videoOutputCanvas.getContext("2d"); let videoOutputVisible = false; let videoRenderFrameId = null; let lastVideoInferenceSummary = null; -let faceDetectionModule = null; -let faceDetectionModuleInitialized = false; +const loadedWorkflowModules = new Map(); let sendClientEvent = () => {}; const VIDEO_INFERENCE_INTERVAL_MS = 750; const VIDEO_RENDER_SCORE_THRESHOLD = 0.35; @@ -72,6 +71,24 @@ const describeError = (error) => ( error instanceof Error ? error.message : String(error) ); +const WORKFLOW_MODULES = { + har1: { + label: "har1", + moduleUrl: "/modules/har1/pkg/et_ws_har1.js", + wasmUrl: "/modules/har1/pkg/et_ws_har1_bg.wasm", + }, + "face-detection": { + label: "face detection", + moduleUrl: "/modules/face-detection/pkg/et_ws_face_detection.js", + wasmUrl: "/modules/face-detection/pkg/et_ws_face_detection_bg.wasm", + }, + comm1: { + label: "comm1", + moduleUrl: "/modules/comm1/pkg/et_ws_comm1.js", + wasmUrl: "/modules/comm1/pkg/et_ws_comm1_bg.wasm", + }, +}; + const updateAgentCard = (status, agentId = currentAgentId) => { currentAgentId = agentId || null; agentStatusEl.textContent = status; @@ -95,6 +112,42 @@ const writeStoredAgentId = (agentId) => { } }; +const loadWorkflowModule = async (moduleKey) => { + const moduleConfig = WORKFLOW_MODULES[moduleKey]; + if (!moduleConfig) { + throw new Error(`unknown workflow module: ${moduleKey}`); + } + + if (loadedWorkflowModules.has(moduleKey)) { + return loadedWorkflowModules.get(moduleKey); + } + + const cacheBust = Date.now(); + const moduleUrl = `${moduleConfig.moduleUrl}?v=${cacheBust}`; + const wasmUrl = `${moduleConfig.wasmUrl}?v=${cacheBust}`; + append(`${moduleConfig.label} module: importing ${moduleUrl}`); + const loadedModule = await import(moduleUrl); + append(`${moduleConfig.label} module: initializing ${wasmUrl}`); + await loadedModule.default(wasmUrl); + loadedWorkflowModules.set(moduleKey, loadedModule); + return loadedModule; +}; + +const runSelectedWorkflowModule = async () => { + const moduleKey = moduleSelect.value; + const moduleConfig = WORKFLOW_MODULES[moduleKey]; + if (!moduleConfig) { + throw new Error(`unknown workflow module: ${moduleKey}`); + } + + const loadedModule = await loadWorkflowModule(moduleKey); + append(`${moduleConfig.label} module: calling run()`); + const runPromise = loadedModule.run(); + append(`${moduleConfig.label} module: run() started`); + await runPromise; + append(`${moduleConfig.label} module completed`); +}; + const handleProtocolMessage = (message) => { let parsed; @@ -1356,68 +1409,23 @@ try { } }); - runHarButton.addEventListener("click", async () => { - runHarButton.disabled = true; - runHarButton.textContent = "Running har demo..."; + runModuleButton.addEventListener("click", async () => { + const selectedModule = WORKFLOW_MODULES[moduleSelect.value]; + runModuleButton.disabled = true; + moduleSelect.disabled = true; + runModuleButton.textContent = selectedModule + ? `Running ${selectedModule.label}...` + : "Running module..."; try { - const cacheBust = Date.now(); - const moduleUrl = `/modules/har1/pkg/et_ws_har1.js?v=${cacheBust}`; - const wasmUrl = `/modules/har1/pkg/et_ws_har1_bg.wasm?v=${cacheBust}`; - append(`har1 module: importing ${moduleUrl}`); - const har1Module = await import(moduleUrl); - append(`har1 module: initializing ${wasmUrl}`); - await har1Module.default(wasmUrl); - append("har1 module: calling run()"); - const runPromise = har1Module.run(); - append("har1 module: run() started"); - await runPromise; - append("har1 module completed"); + await runSelectedWorkflowModule(); } catch (error) { - append(`har1 module error: ${describeError(error)}`); + append(`${selectedModule?.label ?? "workflow"} module error: ${describeError(error)}`); console.error(error); } finally { - runHarButton.disabled = false; - runHarButton.textContent = "har demo"; - } - }); - - runFaceDetectionButton.addEventListener("click", async () => { - runFaceDetectionButton.disabled = true; - - try { - if (!faceDetectionModule) { - const cacheBust = Date.now(); - const moduleUrl = `/modules/face-detection/pkg/et_ws_face_detection.js?v=${cacheBust}`; - const wasmUrl = `/modules/face-detection/pkg/et_ws_face_detection_bg.wasm?v=${cacheBust}`; - append(`face detection module: importing ${moduleUrl}`); - faceDetectionModule = await import(moduleUrl); - append(`face detection module: initializing ${wasmUrl}`); - await faceDetectionModule.default(wasmUrl); - faceDetectionModuleInitialized = true; - } - - if (!faceDetectionModuleInitialized) { - throw new Error("face detection module failed to initialize"); - } - - if (faceDetectionModule.is_running()) { - faceDetectionModule.stop(); - append("face detection module stopped"); - runFaceDetectionButton.textContent = "face demo"; - return; - } - - append("face detection module: calling run()"); - await faceDetectionModule.run(); - append("face detection module started"); - runFaceDetectionButton.textContent = "stop face demo"; - } catch (error) { - append(`face detection module error: ${describeError(error)}`); - console.error(error); - runFaceDetectionButton.textContent = "face demo"; - } finally { - runFaceDetectionButton.disabled = false; + runModuleButton.disabled = false; + moduleSelect.disabled = false; + runModuleButton.textContent = "Run module"; } }); @@ -1428,8 +1436,15 @@ try { window.client = client; window.sendAlive = () => client.send_alive(); - window.runHarModule = () => runHarButton.click(); - window.runFaceDetectionModule = () => runFaceDetectionButton.click(); + window.runWorkflowModule = (moduleKey) => { + if (moduleKey && WORKFLOW_MODULES[moduleKey]) { + moduleSelect.value = moduleKey; + } + return runSelectedWorkflowModule(); + }; + window.runHarModule = () => window.runWorkflowModule("har1"); + window.runFaceDetectionModule = () => window.runWorkflowModule("face-detection"); + window.runComm1Module = () => window.runWorkflowModule("comm1"); } catch (error) { append(`error: ${error instanceof Error ? error.message : String(error)}`); console.error(error); diff --git a/services/ws-server/static/index.html b/services/ws-server/static/index.html index 158c0fd..a6765c7 100644 --- a/services/ws-server/static/index.html +++ b/services/ws-server/static/index.html @@ -121,8 +121,29 @@

WASM web agent

unassigned

- - + + + +

+ + + + +
+ + + +
+
Booting…
+ +

- - - - -
- - - - -
-
Booting…
diff --git a/services/ws-wasm-agent/Cargo.toml b/services/ws-wasm-agent/Cargo.toml index 146a81f..d630f04 100644 --- a/services/ws-wasm-agent/Cargo.toml +++ b/services/ws-wasm-agent/Cargo.toml @@ -1,8 +1,9 @@ [package] name = "et-ws-wasm-agent" version = "0.1.0" -edition = "2021" -license = "Apache-2.0 or MIT" +edition.workspace = true +license.workspace = true +repository.workspace = true [lib] crate-type = ["cdylib", "rlib"] diff --git a/services/ws-wasm-agent/src/lib.rs b/services/ws-wasm-agent/src/lib.rs index 18dd24c..e491704 100644 --- a/services/ws-wasm-agent/src/lib.rs +++ b/services/ws-wasm-agent/src/lib.rs @@ -1237,11 +1237,7 @@ fn js_string_field(value: &JsValue, field: &str) -> String { } fn string_or_unknown(value: String) -> String { - if value.is_empty() { - "unknown".to_string() - } else { - value - } + if value.is_empty() { "unknown".to_string() } else { value } } async fn request_sensor_permission(target: JsValue) -> Result { @@ -1520,10 +1516,10 @@ impl WsClient { s.state = ConnectionState::Connected; s.reconnect_attempts = 0; s.reconnect_delay_ms = initial_delay; - if let Some(timeout_id) = s.reconnect_timeout_id.take() { - if let Some(window) = web_sys::window() { - window.clear_timeout_with_handle(timeout_id); - } + if let Some(timeout_id) = s.reconnect_timeout_id.take() + && let Some(window) = web_sys::window() + { + window.clear_timeout_with_handle(timeout_id); } } cli_ptr.notify_state_change(); @@ -1566,6 +1562,43 @@ impl WsClient { WsMessage::Response { message } => { info!("Server response: {}", message); } + WsMessage::ListAgents => { + warn!("Unexpected list_agents message from server"); + } + WsMessage::ListAgentsResponse { agents } => { + info!("Server returned {} agents", agents.len()); + } + WsMessage::SendAgentMessage { .. } => { + warn!("Unexpected send_agent_message request from server"); + } + WsMessage::BroadcastMessage { .. } => { + warn!("Unexpected broadcast_message request from server"); + } + WsMessage::AgentMessage { + message_id, + from_agent_id, + scope, + server_received_at, + .. + } => { + info!( + "Received {:?} agent message {} from {} at {}", + scope, message_id, from_agent_id, server_received_at + ); + } + WsMessage::MessageAck { .. } => { + warn!("Unexpected message_ack from server"); + } + WsMessage::MessageStatus { + message_id, + status, + detail, + } => { + info!("Message status update {:?} {:?}: {}", message_id, status, detail); + } + WsMessage::Invalid { message_id, detail } => { + warn!("Invalid server message {:?}: {}", message_id, detail); + } WsMessage::Alive { .. } => { warn!("Unexpected alive message from server"); } @@ -1579,10 +1612,10 @@ impl WsClient { } // Notify callback if set let s = shared.borrow(); - if let Some(ref callback) = s.on_message_callback { - if let Some(function) = callback.dyn_ref::() { - let _ = function.call1(&JsValue::NULL, &JsValue::from_str(&data)); - } + if let Some(ref callback) = s.on_message_callback + && let Some(function) = callback.dyn_ref::() + { + let _ = function.call1(&JsValue::NULL, &JsValue::from_str(&data)); } } } @@ -1814,10 +1847,10 @@ impl WsClient { fn notify_state_change(&self) { let state = self.get_state(); let s = self.shared.borrow(); - if let Some(ref callback) = s.on_state_change_callback { - if let Some(function) = callback.dyn_ref::() { - let _ = function.call1(&JsValue::NULL, &JsValue::from_str(&state)); - } + if let Some(ref callback) = s.on_state_change_callback + && let Some(function) = callback.dyn_ref::() + { + let _ = function.call1(&JsValue::NULL, &JsValue::from_str(&state)); } } @@ -1932,15 +1965,40 @@ impl WsClient { fn cancel_reconnect(&self) { let mut s = self.shared.borrow_mut(); - if let Some(timeout_id) = s.reconnect_timeout_id.take() { - if let Some(window) = web_sys::window() { - window.clear_timeout_with_handle(timeout_id); - } + if let Some(timeout_id) = s.reconnect_timeout_id.take() + && let Some(window) = web_sys::window() + { + window.clear_timeout_with_handle(timeout_id); } } } impl WsClient { + pub fn request_list_agents(&self) -> Result<(), JsValue> { + let payload = serde_json::to_string(&WsMessage::ListAgents) + .map_err(|error| JsValue::from_str(&format!("Failed to serialize list_agents: {error}")))?; + self.send(&payload) + } + + pub fn broadcast_message(&self, message: serde_json::Value) -> Result<(), JsValue> { + let payload = serde_json::to_string(&WsMessage::BroadcastMessage { message }) + .map_err(|error| JsValue::from_str(&format!("Failed to serialize broadcast message: {error}")))?; + self.send(&payload) + } + + pub fn send_agent_message( + &self, + to_agent_id: impl Into, + message: serde_json::Value, + ) -> Result<(), JsValue> { + let payload = serde_json::to_string(&WsMessage::SendAgentMessage { + to_agent_id: to_agent_id.into(), + message, + }) + .map_err(|error| JsValue::from_str(&format!("Failed to serialize direct message: {error}")))?; + self.send(&payload) + } + pub fn send_client_event( &self, capability: impl Into, @@ -2004,3 +2062,51 @@ fn store_last_offline_at(timestamp: &str) -> Result<(), JsValue> { .ok_or_else(|| JsValue::from_str("No localStorage available"))?; storage.set_item(STORED_LAST_OFFLINE_AT_KEY, timestamp) } + +#[wasm_bindgen(js_name = set_textarea_value)] +pub fn set_textarea_value(element_id: &str, message: &str) -> Result<(), JsValue> { + if let Some(window) = web_sys::window() + && let Some(document) = window.document() + && let Some(output) = document.get_element_by_id(element_id) + { + js_sys::Reflect::set( + output.as_ref(), + &JsValue::from_str("value"), + &JsValue::from_str(message), + )?; + } + + Ok(()) +} + +#[wasm_bindgen(js_name = append_to_textarea)] +pub fn append_to_textarea(element_id: &str, message: &str) -> Result<(), JsValue> { + if let Some(window) = web_sys::window() + && let Some(document) = window.document() + && let Some(output) = document.get_element_by_id(element_id) + { + let current_value = js_sys::Reflect::get(output.as_ref(), &JsValue::from_str("value"))? + .as_string() + .unwrap_or_default(); + let next_value = if current_value.is_empty() || current_value.starts_with("Workflow module") { + message.to_string() + } else { + format!("{current_value}\n{message}") + }; + + js_sys::Reflect::set( + output.as_ref(), + &JsValue::from_str("value"), + &JsValue::from_str(&next_value), + )?; + + // Auto-scroll to bottom + js_sys::Reflect::set( + output.as_ref(), + &JsValue::from_str("scrollTop"), + &js_sys::Reflect::get(output.as_ref(), &JsValue::from_str("scrollHeight"))?, + )?; + } + + Ok(()) +}