From 4e592449df035351ec3dde06d48ff2f92c73d495 Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Fri, 17 Apr 2026 13:35:52 +0800 Subject: [PATCH] Auto-detect modules --- README.md | 2 + services/ws-modules/comm1/Cargo.toml | 2 + services/ws-modules/comm1/src/lib.rs | 10 + services/ws-modules/data1/Cargo.toml | 2 + services/ws-modules/data1/src/lib.rs | 11 + services/ws-modules/face-detection/Cargo.toml | 2 + services/ws-modules/face-detection/src/lib.rs | 10 + services/ws-modules/har1/Cargo.toml | 2 + services/ws-modules/har1/src/lib.rs | 10 + services/ws-server/src/lib.rs | 910 +++++++++++++++++ services/ws-server/src/main.rs | 925 +----------------- services/ws-server/static/app.js | 102 +- services/ws-server/static/index.html | 7 +- services/ws-server/tests/api_modules.rs | 26 + 14 files changed, 1083 insertions(+), 938 deletions(-) create mode 100644 services/ws-server/src/lib.rs create mode 100644 services/ws-server/tests/api_modules.rs diff --git a/README.md b/README.md index 1422a31..e503851 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,8 @@ Scan the QR-Code with a smart-phone camera and open the URL. Select the module to run in the drop-down, then click "Run module" button. +The module list is dynamically populated from the modules in [services/ws-modules](services/ws-modules). + Note: The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95. ## Grant diff --git a/services/ws-modules/comm1/Cargo.toml b/services/ws-modules/comm1/Cargo.toml index 5385cfe..fa625a1 100644 --- a/services/ws-modules/comm1/Cargo.toml +++ b/services/ws-modules/comm1/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "comm 1" name = "et-ws-comm1" version = "0.1.0" edition.workspace = true @@ -13,6 +14,7 @@ edge-toolkit = { path = "../../../libs/edge-toolkit" } et-ws-wasm-agent = { path = "../../ws-wasm-agent" } js-sys = "0.3" serde.workspace = true +serde-wasm-bindgen = "0.6" serde_json.workspace = true tracing.workspace = true tracing-wasm = "0.2" diff --git a/services/ws-modules/comm1/src/lib.rs b/services/ws-modules/comm1/src/lib.rs index 773355e..905d98a 100644 --- a/services/ws-modules/comm1/src/lib.rs +++ b/services/ws-modules/comm1/src/lib.rs @@ -18,6 +18,16 @@ pub fn init() { info!("comm1 workflow module initialized"); } +#[wasm_bindgen] +pub fn metadata() -> JsValue { + serde_wasm_bindgen::to_value(&json!({ + "name": env!("CARGO_PKG_NAME"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "version": env!("CARGO_PKG_VERSION"), + })) + .unwrap_or(JsValue::NULL) +} + #[wasm_bindgen] pub async fn run() -> Result<(), JsValue> { log("comm1: entered run()")?; diff --git a/services/ws-modules/data1/Cargo.toml b/services/ws-modules/data1/Cargo.toml index f44a5e1..a573d2c 100644 --- a/services/ws-modules/data1/Cargo.toml +++ b/services/ws-modules/data1/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "data 1" name = "et-ws-data1" version = "0.1.0" edition.workspace = true @@ -13,6 +14,7 @@ edge-toolkit = { path = "../../../libs/edge-toolkit" } et-ws-wasm-agent = { path = "../../ws-wasm-agent" } js-sys = "0.3" serde.workspace = true +serde-wasm-bindgen = "0.6" serde_json.workspace = true tracing.workspace = true tracing-wasm = "0.2" diff --git a/services/ws-modules/data1/src/lib.rs b/services/ws-modules/data1/src/lib.rs index c96086b..c892c5f 100644 --- a/services/ws-modules/data1/src/lib.rs +++ b/services/ws-modules/data1/src/lib.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use edge_toolkit::ws::WsMessage; use et_ws_wasm_agent::{WsClient, WsClientConfig, append_to_textarea}; use js_sys::{Promise, Reflect}; +use serde_json::json; use tracing::info; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -15,6 +16,16 @@ pub fn init() { info!("data1 workflow module initialized"); } +#[wasm_bindgen] +pub fn metadata() -> JsValue { + serde_wasm_bindgen::to_value(&json!({ + "name": env!("CARGO_PKG_NAME"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "version": env!("CARGO_PKG_VERSION"), + })) + .unwrap_or(JsValue::NULL) +} + #[wasm_bindgen] pub async fn run() -> Result<(), JsValue> { let msg = "data1: entered run()"; diff --git a/services/ws-modules/face-detection/Cargo.toml b/services/ws-modules/face-detection/Cargo.toml index e4e2ce4..4a6deae 100644 --- a/services/ws-modules/face-detection/Cargo.toml +++ b/services/ws-modules/face-detection/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "face detection" name = "et-ws-face-detection" version = "0.1.0" edition.workspace = true @@ -12,6 +13,7 @@ crate-type = ["cdylib", "rlib"] et-ws-wasm-agent = { path = "../../ws-wasm-agent" } js-sys = "0.3" serde.workspace = true +serde-wasm-bindgen = "0.6" serde_json.workspace = true tracing.workspace = true tracing-wasm = "0.2" diff --git a/services/ws-modules/face-detection/src/lib.rs b/services/ws-modules/face-detection/src/lib.rs index 0516faa..aec6cbf 100644 --- a/services/ws-modules/face-detection/src/lib.rs +++ b/services/ws-modules/face-detection/src/lib.rs @@ -65,6 +65,16 @@ pub fn init() { info!("face detection workflow module initialized"); } +#[wasm_bindgen] +pub fn metadata() -> JsValue { + serde_wasm_bindgen::to_value(&json!({ + "name": env!("CARGO_PKG_NAME"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "version": env!("CARGO_PKG_VERSION"), + })) + .unwrap_or(JsValue::NULL) +} + #[wasm_bindgen] pub fn is_running() -> bool { FACE_RUNTIME.with(|runtime| runtime.borrow().is_some()) diff --git a/services/ws-modules/har1/Cargo.toml b/services/ws-modules/har1/Cargo.toml index 9d32d00..a4e55ad 100644 --- a/services/ws-modules/har1/Cargo.toml +++ b/services/ws-modules/har1/Cargo.toml @@ -1,4 +1,5 @@ [package] +description = "human activity recognition 1" name = "et-ws-har1" version = "0.1.0" edition.workspace = true @@ -12,6 +13,7 @@ crate-type = ["cdylib", "rlib"] et-ws-wasm-agent = { path = "../../ws-wasm-agent" } js-sys = "0.3" serde.workspace = true +serde-wasm-bindgen = "0.6" serde_json.workspace = true tracing.workspace = true tracing-wasm = "0.2" diff --git a/services/ws-modules/har1/src/lib.rs b/services/ws-modules/har1/src/lib.rs index 36d2f64..f016dc4 100644 --- a/services/ws-modules/har1/src/lib.rs +++ b/services/ws-modules/har1/src/lib.rs @@ -22,6 +22,16 @@ pub fn init() { info!("har1 workflow module initialized"); } +#[wasm_bindgen] +pub fn metadata() -> JsValue { + serde_wasm_bindgen::to_value(&json!({ + "name": env!("CARGO_PKG_NAME"), + "description": env!("CARGO_PKG_DESCRIPTION"), + "version": env!("CARGO_PKG_VERSION"), + })) + .unwrap_or(JsValue::NULL) +} + #[wasm_bindgen] pub async fn run() -> Result<(), JsValue> { set_har_status("har1: entered run()")?; diff --git a/services/ws-server/src/lib.rs b/services/ws-server/src/lib.rs new file mode 100644 index 0000000..54c18a9 --- /dev/null +++ b/services/ws-server/src/lib.rs @@ -0,0 +1,910 @@ +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler}; +use actix_files::{Files, NamedFile}; +use actix_web::{Error, HttpRequest, HttpResponse, web}; +use actix_web_actors::ws; +use chrono::Utc; +use edge_toolkit::ws::{ + AgentConnectionState, AgentSummary, ConnectStatus, MessageDeliveryStatus, MessageScope, WsMessage, +}; +use futures_util::StreamExt; +use opentelemetry::{ + global, + trace::{Span, Tracer}, +}; +use serde::{Deserialize, Serialize}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +/// Maximum time the server allows a websocket connection to remain idle before closing it. +/// This should remain comfortably higher than the client's `Alive` message interval. +pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); +/// How often the server checks whether a websocket connection has exceeded `CONNECTION_TIMEOUT`. +/// This is only the check cadence, not the allowed idle duration. +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Message)] +#[rtype(result = "()")] +pub struct ServerEnvelope { + pub message: WsMessage, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PendingDirectMessage { + pub message_id: String, + pub from_agent_id: String, + pub server_received_at: String, + pub message: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentRecord { + pub state: AgentConnectionState, + pub last_known_ip: Option, + #[serde(skip)] + pub session: Option>, + #[serde(default)] + pub pending_direct_messages: BTreeMap, +} + +#[derive(Clone, Default)] +pub struct AgentRegistry { + pub agents: Arc>>, +} + +pub enum DirectSendResult { + Delivered { + pending: PendingDirectMessage, + recipient_addr: Addr, + }, + Queued { + pending: PendingDirectMessage, + }, +} + +pub enum AckResult { + Acknowledged { + message_id: String, + sender_addr: Option>, + sender_agent_id: String, + recipient_agent_id: String, + }, + Invalid { + detail: String, + }, +} + +impl AgentRegistry { + pub fn save(&self, path: &Path) -> std::io::Result<()> { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + let yaml = serde_yaml::to_string(&*agents).map_err(std::io::Error::other)?; + std::fs::write(path, yaml)?; + info!("Agent registry saved to {:?}", path); + Ok(()) + } + + pub fn load(path: &Path) -> std::io::Result { + if !path.exists() { + warn!("Registry file {:?} does not exist, starting with empty registry", path); + return Ok(Self::default()); + } + let yaml = std::fs::read_to_string(path)?; + let agents: BTreeMap = serde_yaml::from_str(&yaml).map_err(std::io::Error::other)?; + info!("Loaded {} agents from registry {:?}", agents.len(), path); + Ok(Self { + agents: Arc::new(Mutex::new(agents)), + }) + } + + pub fn connect_agent( + &self, + requested_id: Option, + client_ip: &str, + session: Addr, + ) -> (String, ConnectStatus) { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + + if let Some(requested_id) = requested_id + && let Some(record) = agents.get_mut(&requested_id) + { + record.state = AgentConnectionState::Connected; + record.last_known_ip = Some(client_ip.to_string()); + record.session = Some(session); + return (requested_id, ConnectStatus::Reconnected); + } + + let assigned_id = Uuid::now_v7().to_string(); + agents.insert( + assigned_id.clone(), + AgentRecord { + state: AgentConnectionState::Connected, + last_known_ip: Some(client_ip.to_string()), + session: Some(session), + pending_direct_messages: BTreeMap::new(), + }, + ); + (assigned_id, ConnectStatus::Assigned) + } + + pub fn mark_disconnected(&self, agent_id: &str) { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + if let Some(record) = agents.get_mut(agent_id) { + record.state = AgentConnectionState::Disconnected; + record.session = None; + } + } + + pub fn list_agents(&self) -> Vec { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + let mut summaries = agents + .iter() + .map(|(agent_id, record)| AgentSummary { + agent_id: agent_id.clone(), + state: record.state.clone(), + last_known_ip: record.last_known_ip.clone(), + }) + .collect::>(); + summaries.sort_by(|left, right| left.agent_id.cmp(&right.agent_id)); + summaries + } + + pub fn queue_or_deliver_direct( + &self, + from_agent_id: &str, + to_agent_id: &str, + server_received_at: String, + message: serde_json::Value, + ) -> DirectSendResult { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + let recipient = agents + .get_mut(to_agent_id) + .expect("queue_or_deliver_direct called for unknown target agent"); + + let pending = PendingDirectMessage { + message_id: Uuid::now_v7().to_string(), + from_agent_id: from_agent_id.to_string(), + server_received_at, + message, + }; + recipient + .pending_direct_messages + .insert(from_agent_id.to_string(), pending); + + if let Some(recipient_addr) = recipient.session.clone() { + DirectSendResult::Delivered { + pending: recipient + .pending_direct_messages + .get(from_agent_id) + .expect("pending direct message was just inserted") + .clone(), + recipient_addr, + } + } else { + DirectSendResult::Queued { + pending: recipient + .pending_direct_messages + .get(from_agent_id) + .expect("pending direct message was just inserted") + .clone(), + } + } + } + + pub fn pending_messages_for(&self, agent_id: &str) -> Vec { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + agents + .get(agent_id) + .map(|record| { + let mut pending = record.pending_direct_messages.values().cloned().collect::>(); + pending.sort_by(|left, right| left.message_id.cmp(&right.message_id)); + pending + }) + .unwrap_or_default() + } + + pub fn acknowledge_message(&self, recipient_agent_id: &str, message_id: &str) -> AckResult { + let mut agents = self.agents.lock().expect("agent registry lock poisoned"); + let Some(recipient) = agents.get_mut(recipient_agent_id) else { + return AckResult::Invalid { + detail: format!("unknown acknowledging agent {}", recipient_agent_id), + }; + }; + + let Some(sender_agent_id) = recipient + .pending_direct_messages + .iter() + .find_map(|(sender_agent_id, pending)| (pending.message_id == message_id).then(|| sender_agent_id.clone())) + else { + return AckResult::Invalid { + detail: "no pending message to acknowledge".to_string(), + }; + }; + + let pending = recipient + .pending_direct_messages + .remove(&sender_agent_id) + .expect("pending direct message disappeared during acknowledgement"); + let sender_addr = agents.get(&sender_agent_id).and_then(|record| record.session.clone()); + + AckResult::Acknowledged { + message_id: pending.message_id, + sender_addr, + sender_agent_id, + recipient_agent_id: recipient_agent_id.to_string(), + } + } + + pub fn connected_recipient_addrs(&self, excluding_agent_id: &str) -> Vec<(String, Addr)> { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + agents + .iter() + .filter_map(|(agent_id, record)| { + if agent_id == excluding_agent_id { + return None; + } + record.session.clone().map(|addr| (agent_id.clone(), addr)) + }) + .collect() + } +} + +/// WebSocket actor for handling connections. +pub struct WebSocketActor { + pub agent_id: Option, + pub last_activity: Instant, + pub client_ip: String, + pub registry: AgentRegistry, +} + +impl WebSocketActor { + pub fn new(registry: AgentRegistry, client_ip: String) -> Self { + info!("New WebSocket actor created for client IP {}", client_ip); + Self { + agent_id: None, + last_activity: Instant::now(), + client_ip, + registry, + } + } + + pub fn current_agent_id(&self) -> &str { + self.agent_id.as_deref().unwrap_or("unassigned") + } + + pub fn assigned_agent_id(&self) -> Option<&str> { + self.agent_id.as_deref() + } + + pub fn mark_activity(&mut self) { + self.last_activity = Instant::now(); + } + + pub fn start_heartbeat(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + let idle_for = Instant::now().saturating_duration_since(act.last_activity); + if idle_for > CONNECTION_TIMEOUT { + warn!( + "WebSocket connection timed out for client {} after {:?} of inactivity", + act.current_agent_id(), + idle_for + ); + ctx.close(Some(ws::CloseReason { + code: ws::CloseCode::Policy, + description: Some(format!( + "connection timed out after {:?} of inactivity", + CONNECTION_TIMEOUT + )), + })); + ctx.stop(); + } + }); + } + + fn assign_or_reconnect_agent( + &mut self, + requested_id: Option, + session: Addr, + ) -> (String, ConnectStatus) { + let (assigned_id, status) = self.registry.connect_agent(requested_id, &self.client_ip, session); + self.agent_id = Some(assigned_id.clone()); + (assigned_id, status) + } + + fn send_json(ctx: &mut ws::WebsocketContext, response: &WsMessage) { + match serde_json::to_string(response) { + Ok(json) => { + ctx.text(json); + let tracer = global::tracer("ws-server"); + let mut sent_span = tracer.start("ws.message.sent"); + sent_span.end(); + } + Err(error) => { + error!("Failed to serialize websocket response: {}", error); + } + } + } + + fn send_status( + ctx: &mut ws::WebsocketContext, + message_id: Option, + status: MessageDeliveryStatus, + detail: impl Into, + ) { + Self::send_json( + ctx, + &WsMessage::MessageStatus { + message_id, + status, + detail: detail.into(), + }, + ); + } + + fn send_invalid(ctx: &mut ws::WebsocketContext, message_id: Option, detail: impl Into) { + Self::send_json( + ctx, + &WsMessage::Invalid { + message_id, + detail: detail.into(), + }, + ); + } + + fn deliver_pending_messages(&self, ctx: &mut ws::WebsocketContext) { + let Some(agent_id) = self.assigned_agent_id() else { + return; + }; + let pending_messages = self.registry.pending_messages_for(agent_id); + if pending_messages.is_empty() { + return; + } + for pending in pending_messages { + info!( + "Delivering pending message {} to agent {} from {}", + pending.message_id, agent_id, pending.from_agent_id + ); + let message = WsMessage::AgentMessage { + message_id: pending.message_id, + from_agent_id: pending.from_agent_id, + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, + }; + Self::send_json(ctx, &message); + } + } +} + +impl Actor for WebSocketActor { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.start_heartbeat(ctx); + info!( + "WebSocket connection established for client IP {} with agent {}", + self.client_ip, + self.current_agent_id() + ); + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.connect"); + span.end(); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + if let Some(agent_id) = self.agent_id.as_deref() { + self.registry.mark_disconnected(agent_id); + info!("Agent {} disconnected; last known IP {}", agent_id, self.client_ip); + } else { + info!( + "WebSocket connection closed before agent assignment for client IP {}", + self.client_ip + ); + } + } +} + +impl Handler for WebSocketActor { + type Result = (); + + fn handle(&mut self, msg: ServerEnvelope, ctx: &mut Self::Context) -> Self::Result { + Self::send_json(ctx, &msg.message); + } +} + +impl StreamHandler> for WebSocketActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(ping)) => { + self.mark_activity(); + ctx.pong(&ping); + } + Ok(ws::Message::Pong(_)) => { + self.mark_activity(); + } + Ok(ws::Message::Text(text)) => { + self.mark_activity(); + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.message.received"); + info!("Received message from client {}: {:?}", self.current_agent_id(), text); + + if let Ok(msg) = serde_json::from_str::(&text) { + match msg { + WsMessage::Connect { agent_id } => { + let requested_id = agent_id.clone(); + info!( + "Connect message: requested_agent_id={:?} client_ip={}", + requested_id, self.client_ip + ); + let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id, ctx.address()); + info!( + "Agent {} status {:?}connected from IP {}", + assigned_id, status, self.client_ip + ); + Self::send_json( + ctx, + &WsMessage::ConnectAck { + agent_id: assigned_id, + status: status.clone(), + }, + ); + info!( + "WebSocket connection ready for client {} with status {:?}", + self.current_agent_id(), + status + ); + self.deliver_pending_messages(ctx); + } + WsMessage::Alive { timestamp } => { + info!("Alive message from client {} at {}", self.current_agent_id(), timestamp); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("Alive message received at {}", Utc::now().to_rfc3339()), + }, + ); + } + WsMessage::ListAgents => { + let agents = self.registry.list_agents(); + info!( + "Agent {} requested list_agents; returning {} agents", + self.current_agent_id(), + agents.len() + ); + Self::send_json(ctx, &WsMessage::ListAgentsResponse { agents }); + } + WsMessage::SendAgentMessage { to_agent_id, message } => { + let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before sending messages"); + span.end(); + return; + }; + + if from_agent_id == to_agent_id { + Self::send_invalid(ctx, None, "agent cannot send a direct message to itself"); + span.end(); + return; + } + + if !self + .registry + .list_agents() + .iter() + .any(|agent| agent.agent_id == to_agent_id) + { + Self::send_invalid(ctx, None, format!("unknown target agent {}", to_agent_id)); + span.end(); + return; + } + + let server_received_at = Utc::now().to_rfc3339(); + match self.registry.queue_or_deliver_direct( + &from_agent_id, + &to_agent_id, + server_received_at.clone(), + message, + ) { + DirectSendResult::Delivered { + pending, + recipient_addr, + } => { + let message_id = pending.message_id.clone(); + info!( + "Direct message {} delivered from {} to {}", + message_id, from_agent_id, to_agent_id + ); + recipient_addr.do_send(ServerEnvelope { + message: WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id: from_agent_id.clone(), + scope: MessageScope::Direct, + server_received_at: pending.server_received_at, + message: pending.message, + }, + }); + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Delivered, + format!("message delivered to agent {}", to_agent_id), + ); + } + DirectSendResult::Queued { pending } => { + let message_id = pending.message_id; + info!( + "Direct message {} queued from {} to disconnected agent {}", + message_id, from_agent_id, to_agent_id + ); + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Queued, + format!("message queued for agent {}", to_agent_id), + ); + } + } + } + WsMessage::BroadcastMessage { message } => { + let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before broadcasting messages"); + span.end(); + return; + }; + + let recipients = self.registry.connected_recipient_addrs(&from_agent_id); + let message_id = Uuid::now_v7().to_string(); + let server_received_at = Utc::now().to_rfc3339(); + for (recipient_id, recipient_addr) in &recipients { + info!( + "Broadcast message {} from {} to {}", + message_id, from_agent_id, recipient_id + ); + recipient_addr.do_send(ServerEnvelope { + message: WsMessage::AgentMessage { + message_id: message_id.clone(), + from_agent_id: from_agent_id.clone(), + scope: MessageScope::Broadcast, + server_received_at: server_received_at.clone(), + message: message.clone(), + }, + }); + } + Self::send_status( + ctx, + Some(message_id), + MessageDeliveryStatus::Broadcast, + format!("broadcast sent to {} connected agents", recipients.len()), + ); + } + WsMessage::MessageAck { message_id } => { + let Some(recipient_agent_id) = self.assigned_agent_id().map(str::to_string) else { + Self::send_invalid(ctx, None, "agent must connect before acknowledging messages"); + span.end(); + return; + }; + + match self.registry.acknowledge_message(&recipient_agent_id, &message_id) { + AckResult::Acknowledged { + message_id, + sender_addr, + sender_agent_id, + recipient_agent_id, + } => { + info!( + "Agent {} acknowledged direct message {} from {}", + recipient_agent_id, message_id, sender_agent_id + ); + Self::send_status( + ctx, + Some(message_id.clone()), + MessageDeliveryStatus::Acknowledged, + "message acknowledged", + ); + if let Some(sender_addr) = sender_addr { + sender_addr.do_send(ServerEnvelope { + message: WsMessage::MessageStatus { + message_id: Some(message_id), + status: MessageDeliveryStatus::Acknowledged, + detail: format!("agent {} acknowledged receipt", recipient_agent_id), + }, + }); + } + } + AckResult::Invalid { detail } => { + warn!("Invalid ack from {} for {}: {}", recipient_agent_id, message_id, detail); + Self::send_invalid(ctx, Some(message_id), detail); + } + } + } + WsMessage::ClientEvent { + capability, + action, + details, + } => { + if capability == "video_cv" && action == "inference" { + let detected_class = details + .get("detected_class") + .and_then(|value| value.as_str()) + .unwrap_or("unknown"); + let confidence = details + .get("confidence") + .and_then(|value| value.as_f64()) + .unwrap_or_default(); + let processed_at = details + .get("processed_at") + .and_then(|value| value.as_str()) + .unwrap_or("unknown"); + info!( + "Video inference received from {}: class={} confidence={:.4} processed_at={}", + self.current_agent_id(), + detected_class, + confidence, + processed_at + ); + } + info!( + "Client event from {}: capability={} action={} details={}", + self.current_agent_id(), + capability, + action, + details + ); + } + WsMessage::StoreFile { filename } => { + let Some(agent_id) = self.assigned_agent_id() else { + Self::send_invalid(ctx, None, "agent must connect before storing files"); + span.end(); + return; + }; + let url = format!("/storage/{}/{}", agent_id, filename); + info!("Agent {} requested storage URL for {}: {}", agent_id, filename, url); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("PUT to {}", url), + }, + ); + } + WsMessage::FetchFile { agent_id, filename } => { + let url = format!("/storage/{}/{}", agent_id, filename); + info!( + "Agent {} requested fetch URL for {}/{}", + self.current_agent_id(), + agent_id, + filename + ); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("GET from {}", url), + }, + ); + } + WsMessage::ConnectAck { .. } + | WsMessage::ListAgentsResponse { .. } + | WsMessage::AgentMessage { .. } + | WsMessage::MessageStatus { .. } + | WsMessage::Invalid { .. } + | WsMessage::Response { .. } => { + warn!( + "Unexpected server-originated message from client {}", + self.current_agent_id() + ); + } + } + } else { + warn!( + "Received unrecognized message from client {}: {}", + self.current_agent_id(), + text + ); + } + span.end(); + } + Ok(ws::Message::Close(reason)) => { + self.mark_activity(); + info!( + "WebSocket close request from client: {} reason: {:?}", + self.current_agent_id(), + reason + ); + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.disconnect"); + span.end(); + ctx.close(reason); + ctx.stop(); + } + Ok(ws::Message::Binary(_)) | Ok(ws::Message::Continuation(_)) | Ok(ws::Message::Nop) => { + self.mark_activity(); + } + Err(e) => { + error!("WebSocket error for client {}: {:?}", self.current_agent_id(), e); + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.error"); + span.end(); + } + } + } +} + +/// WebSocket endpoint handler. +pub async fn ws_handler( + req: HttpRequest, + stream: web::Payload, + registry: web::Data, +) -> Result { + let tracer = global::tracer("ws-server"); + let mut span = tracer.start("ws.connect"); + + let client_ip = req + .peer_addr() + .map(|addr| addr.ip().to_string()) + .or_else(|| { + req.connection_info() + .realip_remote_addr() + .and_then(|addr| addr.split(':').next().map(str::to_string)) + }) + .unwrap_or_else(|| "unknown".to_string()); + + let actor = WebSocketActor::new(registry.get_ref().clone(), client_ip); + let result = ws::start(actor, &req, stream); + + span.end(); + result +} + +pub fn workspace_root() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../..") + .canonicalize() + .expect("workspace root should exist") +} + +pub fn wasm_pkg_dir() -> PathBuf { + workspace_root().join("services/ws-wasm-agent/pkg") +} + +pub fn wasm_modules_dir() -> PathBuf { + workspace_root().join("services/ws-modules") +} + +pub fn browser_static_dir() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join("static") +} + +pub async fn browser_index() -> Result { + let path = browser_static_dir().join("index.html"); + info!("Serving browser interface page: {:?}", path); + + NamedFile::open(path).map_err(|e| { + error!("Failed to open browser interface page: {}", e); + actix_web::error::ErrorNotFound(e) + }) +} + +pub async fn no_content() -> HttpResponse { + HttpResponse::NoContent().finish() +} + +/// Static file handler — serves a named binary file from the ws-server static directory. +/// Example: GET /files/firmware.bin → services/ws-server/static/firmware.bin +pub async fn file_handler(req: HttpRequest) -> Result { + // Extract the filename segment from the URL path. + let filename: PathBuf = req + .match_info() + .query("filename") + .parse() + .map_err(|_| actix_web::error::ErrorBadRequest("invalid filename"))?; + + // Prevent directory traversal: reject any path containing a separator. + if filename.components().count() != 1 { + return Err(actix_web::error::ErrorBadRequest("invalid filename")); + } + + let path = browser_static_dir().join(&filename); + + info!("Serving static file: {:?}", path); + + let file = NamedFile::open(&path).map_err(|e| { + error!("Failed to open static file {:?}: {}", path, e); + actix_web::error::ErrorNotFound(e) + })?; + + // Treat every file as an opaque binary stream so browsers don't + // try to render or sniff the content type. + Ok(file + .use_etag(true) + .use_last_modified(true) + .set_content_type(actix_web::mime::APPLICATION_OCTET_STREAM)) +} + +pub async fn health() -> HttpResponse { + HttpResponse::Ok().json(serde_json::json!({ + "status": "healthy", + "service": "ws-server" + })) +} + +pub async fn list_modules() -> Result { + let modules_dir = wasm_modules_dir(); + let mut modules = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(modules_dir) { + for entry in entries.flatten() { + if let Ok(file_type) = entry.file_type() + && file_type.is_dir() + && let Some(name) = entry.file_name().to_str() + { + let pkg_dir = entry.path().join("pkg"); + if pkg_dir.exists() && pkg_dir.is_dir() { + modules.push(name.to_string()); + } + } + } + } + modules.sort(); + Ok(HttpResponse::Ok().json(modules)) +} + +pub async fn agent_put_file( + req: HttpRequest, + mut payload: web::Payload, + registry: web::Data, +) -> Result { + let agent_id: String = req.match_info().query("agent_id").parse().unwrap(); + let filename: PathBuf = req + .match_info() + .query("filename") + .parse() + .map_err(|_| actix_web::error::ErrorBadRequest("invalid filename"))?; + + { + let agents = registry.agents.lock().expect("lock poisoned"); + if !agents.contains_key(&agent_id) { + return Err(actix_web::error::ErrorNotFound("agent not found")); + } + } + + if filename.components().count() != 1 { + return Err(actix_web::error::ErrorBadRequest("invalid filename")); + } + + let storage_dir = workspace_root().join("services/ws-server/storage"); + let agent_dir = storage_dir.join(&agent_id); + std::fs::create_dir_all(&agent_dir)?; + + let path = agent_dir.join(&filename); + info!("Agent {} storing file: {:?}", agent_id, path); + + let mut file = tokio::fs::File::create(path).await?; + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + tokio::io::copy(&mut &chunk[..], &mut file).await?; + } + + Ok(HttpResponse::Ok().finish()) +} + +pub fn configure_app(cfg: &mut web::ServiceConfig, agent_registry: web::Data, storage_dir: PathBuf) { + cfg.app_data(agent_registry) + .route("/", web::get().to(browser_index)) + .route("/index.html", web::get().to(browser_index)) + .route("/favicon.ico", web::get().to(no_content)) + .route("/health", web::get().to(health)) + .route("/api/modules", web::get().to(list_modules)) + .route("/ws", web::get().to(ws_handler)) + .route("/files/{filename}", web::get().to(file_handler)) + .route("/storage/{agent_id}/{filename}", web::put().to(agent_put_file)) + .service( + Files::new("/storage", storage_dir) + .show_files_listing() + .use_etag(true) + .use_last_modified(true), + ) + .service(Files::new("/modules", wasm_modules_dir()).prefer_utf8(true)) + .service(Files::new("/pkg", wasm_pkg_dir()).prefer_utf8(true)) + .service(Files::new("/static", browser_static_dir()).prefer_utf8(true)); +} diff --git a/services/ws-server/src/main.rs b/services/ws-server/src/main.rs index 0342ab7..700982f 100644 --- a/services/ws-server/src/main.rs +++ b/services/ws-server/src/main.rs @@ -1,33 +1,21 @@ -use std::collections::BTreeMap; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::path::PathBuf; -use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, StreamHandler}; -use actix_files::{Files, NamedFile}; -use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer, web}; -use actix_web_actors::ws; -use chrono::Utc; -use edge_toolkit::ws::{ - AgentConnectionState, AgentSummary, ConnectStatus, MessageDeliveryStatus, MessageScope, WsMessage, -}; -use opentelemetry::{ - global, - trace::{Span, Tracer}, -}; +use actix_web::middleware::Logger; +use actix_web::{App, HttpServer, web}; +use clap::Parser; +use et_ws_server::{AgentRegistry, browser_static_dir, configure_app, wasm_modules_dir, wasm_pkg_dir, workspace_root}; +use opentelemetry::global; use opentelemetry_sdk::trace::SdkTracerProvider as TracerProvider; -use rustls::ServerConfig; -use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use uuid::Uuid; -/// Maximum time the server allows a websocket connection to remain idle before closing it. -/// This should remain comfortably higher than the client's `Alive` message interval. -const CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); -/// How often the server checks whether a websocket connection has exceeded `CONNECTION_TIMEOUT`. -/// This is only the check cadence, not the allowed idle duration. -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path to agent registry YAML file + #[arg(short, long, default_value = "registry.yaml")] + agent_registry: PathBuf, +} // Initialize OpenTelemetry fn init_tracing() -> opentelemetry_sdk::trace::SdkTracerProvider { @@ -36,811 +24,7 @@ fn init_tracing() -> opentelemetry_sdk::trace::SdkTracerProvider { provider } -#[derive(Message)] -#[rtype(result = "()")] -struct ServerEnvelope { - message: WsMessage, -} - -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct PendingDirectMessage { - message_id: String, - from_agent_id: String, - server_received_at: String, - message: serde_json::Value, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct AgentRecord { - state: AgentConnectionState, - last_known_ip: Option, - #[serde(skip)] - session: Option>, - #[serde(default)] - pending_direct_messages: BTreeMap, -} - -#[derive(Clone, Default)] -struct AgentRegistry { - agents: Arc>>, -} - -enum DirectSendResult { - Delivered { - pending: PendingDirectMessage, - recipient_addr: Addr, - }, - Queued { - pending: PendingDirectMessage, - }, -} - -enum AckResult { - Acknowledged { - message_id: String, - sender_addr: Option>, - sender_agent_id: String, - recipient_agent_id: String, - }, - Invalid { - detail: String, - }, -} - -impl AgentRegistry { - fn save(&self, path: &Path) -> std::io::Result<()> { - let agents = self.agents.lock().expect("agent registry lock poisoned"); - let yaml = serde_yaml::to_string(&*agents).map_err(std::io::Error::other)?; - std::fs::write(path, yaml)?; - info!("Agent registry saved to {:?}", path); - Ok(()) - } - - fn load(path: &Path) -> std::io::Result { - if !path.exists() { - warn!("Registry file {:?} does not exist, starting with empty registry", path); - return Ok(Self::default()); - } - let yaml = std::fs::read_to_string(path)?; - let agents: BTreeMap = serde_yaml::from_str(&yaml).map_err(std::io::Error::other)?; - info!("Loaded {} agents from registry {:?}", agents.len(), path); - Ok(Self { - agents: Arc::new(Mutex::new(agents)), - }) - } - - fn connect_agent( - &self, - requested_id: Option, - client_ip: &str, - session: Addr, - ) -> (String, ConnectStatus) { - let mut agents = self.agents.lock().expect("agent registry lock poisoned"); - - if let Some(requested_id) = requested_id - && let Some(record) = agents.get_mut(&requested_id) - { - record.state = AgentConnectionState::Connected; - record.last_known_ip = Some(client_ip.to_string()); - record.session = Some(session); - return (requested_id, ConnectStatus::Reconnected); - } - - let assigned_id = Uuid::now_v7().to_string(); - agents.insert( - assigned_id.clone(), - AgentRecord { - state: AgentConnectionState::Connected, - last_known_ip: Some(client_ip.to_string()), - session: Some(session), - pending_direct_messages: BTreeMap::new(), - }, - ); - (assigned_id, ConnectStatus::Assigned) - } - - fn mark_disconnected(&self, agent_id: &str) { - let mut agents = self.agents.lock().expect("agent registry lock poisoned"); - if let Some(record) = agents.get_mut(agent_id) { - record.state = AgentConnectionState::Disconnected; - record.session = None; - } - } - - fn list_agents(&self) -> Vec { - let agents = self.agents.lock().expect("agent registry lock poisoned"); - let mut summaries = agents - .iter() - .map(|(agent_id, record)| AgentSummary { - agent_id: agent_id.clone(), - state: record.state.clone(), - last_known_ip: record.last_known_ip.clone(), - }) - .collect::>(); - summaries.sort_by(|left, right| left.agent_id.cmp(&right.agent_id)); - summaries - } - - fn queue_or_deliver_direct( - &self, - from_agent_id: &str, - to_agent_id: &str, - server_received_at: String, - message: serde_json::Value, - ) -> DirectSendResult { - let mut agents = self.agents.lock().expect("agent registry lock poisoned"); - let recipient = agents - .get_mut(to_agent_id) - .expect("queue_or_deliver_direct called for unknown target agent"); - - let pending = PendingDirectMessage { - message_id: Uuid::now_v7().to_string(), - from_agent_id: from_agent_id.to_string(), - server_received_at, - message, - }; - recipient - .pending_direct_messages - .insert(from_agent_id.to_string(), pending); - - if let Some(recipient_addr) = recipient.session.clone() { - DirectSendResult::Delivered { - pending: recipient - .pending_direct_messages - .get(from_agent_id) - .expect("pending direct message was just inserted") - .clone(), - recipient_addr, - } - } else { - DirectSendResult::Queued { - pending: recipient - .pending_direct_messages - .get(from_agent_id) - .expect("pending direct message was just inserted") - .clone(), - } - } - } - - fn pending_messages_for(&self, agent_id: &str) -> Vec { - let agents = self.agents.lock().expect("agent registry lock poisoned"); - agents - .get(agent_id) - .map(|record| { - let mut pending = record.pending_direct_messages.values().cloned().collect::>(); - pending.sort_by(|left, right| left.message_id.cmp(&right.message_id)); - pending - }) - .unwrap_or_default() - } - - fn acknowledge_message(&self, recipient_agent_id: &str, message_id: &str) -> AckResult { - let mut agents = self.agents.lock().expect("agent registry lock poisoned"); - let Some(recipient) = agents.get_mut(recipient_agent_id) else { - return AckResult::Invalid { - detail: format!("unknown acknowledging agent {}", recipient_agent_id), - }; - }; - - let Some(sender_agent_id) = recipient - .pending_direct_messages - .iter() - .find_map(|(sender_agent_id, pending)| (pending.message_id == message_id).then(|| sender_agent_id.clone())) - else { - return AckResult::Invalid { - detail: "no pending message to acknowledge".to_string(), - }; - }; - - let pending = recipient - .pending_direct_messages - .remove(&sender_agent_id) - .expect("pending direct message disappeared during acknowledgement"); - let sender_addr = agents.get(&sender_agent_id).and_then(|record| record.session.clone()); - - AckResult::Acknowledged { - message_id: pending.message_id, - sender_addr, - sender_agent_id, - recipient_agent_id: recipient_agent_id.to_string(), - } - } - - fn connected_recipient_addrs(&self, excluding_agent_id: &str) -> Vec<(String, Addr)> { - let agents = self.agents.lock().expect("agent registry lock poisoned"); - agents - .iter() - .filter_map(|(agent_id, record)| { - if agent_id == excluding_agent_id { - return None; - } - record.session.clone().map(|addr| (agent_id.clone(), addr)) - }) - .collect() - } -} - -// WebSocket actor for handling connections -struct WebSocketActor { - agent_id: Option, - last_activity: Instant, - client_ip: String, - registry: AgentRegistry, -} - -impl WebSocketActor { - fn new(registry: AgentRegistry, client_ip: String) -> Self { - info!("New WebSocket actor created for client IP {}", client_ip); - Self { - agent_id: None, - last_activity: Instant::now(), - client_ip, - registry, - } - } - - fn current_agent_id(&self) -> &str { - self.agent_id.as_deref().unwrap_or("unassigned") - } - - fn assigned_agent_id(&self) -> Option<&str> { - self.agent_id.as_deref() - } - - fn mark_activity(&mut self) { - self.last_activity = Instant::now(); - } - - fn start_heartbeat(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - let idle_for = Instant::now().saturating_duration_since(act.last_activity); - if idle_for > CONNECTION_TIMEOUT { - warn!( - "WebSocket connection timed out for client {} after {:?} of inactivity", - act.current_agent_id(), - idle_for - ); - ctx.close(Some(ws::CloseReason { - code: ws::CloseCode::Policy, - description: Some(format!( - "connection timed out after {:?} of inactivity", - CONNECTION_TIMEOUT - )), - })); - ctx.stop(); - } - }); - } - - fn assign_or_reconnect_agent( - &mut self, - requested_id: Option, - session: Addr, - ) -> (String, ConnectStatus) { - let (assigned_id, status) = self.registry.connect_agent(requested_id, &self.client_ip, session); - self.agent_id = Some(assigned_id.clone()); - (assigned_id, status) - } - - fn send_json(ctx: &mut ws::WebsocketContext, response: &WsMessage) { - match serde_json::to_string(response) { - Ok(json) => { - ctx.text(json); - let tracer = global::tracer("ws-server"); - let mut sent_span = tracer.start("ws.message.sent"); - sent_span.end(); - } - Err(error) => { - error!("Failed to serialize websocket response: {}", error); - } - } - } - - fn send_status( - ctx: &mut ws::WebsocketContext, - message_id: Option, - status: MessageDeliveryStatus, - detail: impl Into, - ) { - Self::send_json( - ctx, - &WsMessage::MessageStatus { - message_id, - status, - detail: detail.into(), - }, - ); - } - - fn send_invalid(ctx: &mut ws::WebsocketContext, message_id: Option, detail: impl Into) { - Self::send_json( - ctx, - &WsMessage::Invalid { - message_id, - detail: detail.into(), - }, - ); - } - - fn deliver_pending_messages(&self, ctx: &mut ws::WebsocketContext) { - let Some(agent_id) = self.assigned_agent_id() else { - return; - }; - let pending_messages = self.registry.pending_messages_for(agent_id); - if pending_messages.is_empty() { - return; - } - for pending in pending_messages { - info!( - "Delivering pending message {} to agent {} from {}", - pending.message_id, agent_id, pending.from_agent_id - ); - let message = WsMessage::AgentMessage { - message_id: pending.message_id, - from_agent_id: pending.from_agent_id, - scope: MessageScope::Direct, - server_received_at: pending.server_received_at, - message: pending.message, - }; - Self::send_json(ctx, &message); - } - } -} - -impl Actor for WebSocketActor { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.start_heartbeat(ctx); - info!( - "WebSocket connection established for client IP {} with agent {}", - self.client_ip, - self.current_agent_id() - ); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.connect"); - span.end(); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - if let Some(agent_id) = self.agent_id.as_deref() { - self.registry.mark_disconnected(agent_id); - info!("Agent {} disconnected; last known IP {}", agent_id, self.client_ip); - } else { - info!( - "WebSocket connection closed before agent assignment for client IP {}", - self.client_ip - ); - } - } -} - -impl Handler for WebSocketActor { - type Result = (); - - fn handle(&mut self, msg: ServerEnvelope, ctx: &mut Self::Context) -> Self::Result { - Self::send_json(ctx, &msg.message); - } -} - -impl StreamHandler> for WebSocketActor { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - match msg { - Ok(ws::Message::Ping(ping)) => { - self.mark_activity(); - ctx.pong(&ping); - } - Ok(ws::Message::Pong(_)) => { - self.mark_activity(); - } - Ok(ws::Message::Text(text)) => { - self.mark_activity(); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.message.received"); - info!("Received message from client {}: {:?}", self.current_agent_id(), text); - - if let Ok(msg) = serde_json::from_str::(&text) { - match msg { - WsMessage::Connect { agent_id } => { - let requested_id = agent_id.clone(); - info!( - "Connect message: requested_agent_id={:?} client_ip={}", - requested_id, self.client_ip - ); - let (assigned_id, status) = self.assign_or_reconnect_agent(agent_id, ctx.address()); - info!( - "Agent {} status {:?}connected from IP {}", - assigned_id, status, self.client_ip - ); - Self::send_json( - ctx, - &WsMessage::ConnectAck { - agent_id: assigned_id, - status: status.clone(), - }, - ); - info!( - "WebSocket connection ready for client {} with status {:?}", - self.current_agent_id(), - status - ); - self.deliver_pending_messages(ctx); - } - WsMessage::Alive { timestamp } => { - info!("Alive message from client {} at {}", self.current_agent_id(), timestamp); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("Alive message received at {}", Utc::now().to_rfc3339()), - }, - ); - } - WsMessage::ListAgents => { - let agents = self.registry.list_agents(); - info!( - "Agent {} requested list_agents; returning {} agents", - self.current_agent_id(), - agents.len() - ); - Self::send_json(ctx, &WsMessage::ListAgentsResponse { agents }); - } - WsMessage::SendAgentMessage { to_agent_id, message } => { - let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before sending messages"); - span.end(); - return; - }; - - if from_agent_id == to_agent_id { - Self::send_invalid(ctx, None, "agent cannot send a direct message to itself"); - span.end(); - return; - } - - if !self - .registry - .list_agents() - .iter() - .any(|agent| agent.agent_id == to_agent_id) - { - Self::send_invalid(ctx, None, format!("unknown target agent {}", to_agent_id)); - span.end(); - return; - } - - let server_received_at = Utc::now().to_rfc3339(); - match self.registry.queue_or_deliver_direct( - &from_agent_id, - &to_agent_id, - server_received_at.clone(), - message, - ) { - DirectSendResult::Delivered { - pending, - recipient_addr, - } => { - let message_id = pending.message_id.clone(); - info!( - "Direct message {} delivered from {} to {}", - message_id, from_agent_id, to_agent_id - ); - recipient_addr.do_send(ServerEnvelope { - message: WsMessage::AgentMessage { - message_id: message_id.clone(), - from_agent_id: from_agent_id.clone(), - scope: MessageScope::Direct, - server_received_at: pending.server_received_at, - message: pending.message, - }, - }); - Self::send_status( - ctx, - Some(message_id), - MessageDeliveryStatus::Delivered, - format!("message delivered to agent {}", to_agent_id), - ); - } - DirectSendResult::Queued { pending } => { - let message_id = pending.message_id; - info!( - "Direct message {} queued from {} to disconnected agent {}", - message_id, from_agent_id, to_agent_id - ); - Self::send_status( - ctx, - Some(message_id), - MessageDeliveryStatus::Queued, - format!("message queued for agent {}", to_agent_id), - ); - } - } - } - WsMessage::BroadcastMessage { message } => { - let Some(from_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before broadcasting messages"); - span.end(); - return; - }; - - let recipients = self.registry.connected_recipient_addrs(&from_agent_id); - let message_id = Uuid::now_v7().to_string(); - let server_received_at = Utc::now().to_rfc3339(); - for (recipient_id, recipient_addr) in &recipients { - info!( - "Broadcast message {} from {} to {}", - message_id, from_agent_id, recipient_id - ); - recipient_addr.do_send(ServerEnvelope { - message: WsMessage::AgentMessage { - message_id: message_id.clone(), - from_agent_id: from_agent_id.clone(), - scope: MessageScope::Broadcast, - server_received_at: server_received_at.clone(), - message: message.clone(), - }, - }); - } - Self::send_status( - ctx, - Some(message_id), - MessageDeliveryStatus::Broadcast, - format!("broadcast sent to {} connected agents", recipients.len()), - ); - } - WsMessage::MessageAck { message_id } => { - let Some(recipient_agent_id) = self.assigned_agent_id().map(str::to_string) else { - Self::send_invalid(ctx, None, "agent must connect before acknowledging messages"); - span.end(); - return; - }; - - match self.registry.acknowledge_message(&recipient_agent_id, &message_id) { - AckResult::Acknowledged { - message_id, - sender_addr, - sender_agent_id, - recipient_agent_id, - } => { - info!( - "Agent {} acknowledged direct message {} from {}", - recipient_agent_id, message_id, sender_agent_id - ); - Self::send_status( - ctx, - Some(message_id.clone()), - MessageDeliveryStatus::Acknowledged, - "message acknowledged", - ); - if let Some(sender_addr) = sender_addr { - sender_addr.do_send(ServerEnvelope { - message: WsMessage::MessageStatus { - message_id: Some(message_id), - status: MessageDeliveryStatus::Acknowledged, - detail: format!("agent {} acknowledged receipt", recipient_agent_id), - }, - }); - } - } - AckResult::Invalid { detail } => { - warn!("Invalid ack from {} for {}: {}", recipient_agent_id, message_id, detail); - Self::send_invalid(ctx, Some(message_id), detail); - } - } - } - WsMessage::ClientEvent { - capability, - action, - details, - } => { - if capability == "video_cv" && action == "inference" { - let detected_class = details - .get("detected_class") - .and_then(|value| value.as_str()) - .unwrap_or("unknown"); - let confidence = details - .get("confidence") - .and_then(|value| value.as_f64()) - .unwrap_or_default(); - let processed_at = details - .get("processed_at") - .and_then(|value| value.as_str()) - .unwrap_or("unknown"); - info!( - "Video inference received from {}: class={} confidence={:.4} processed_at={}", - self.current_agent_id(), - detected_class, - confidence, - processed_at - ); - } - info!( - "Client event from {}: capability={} action={} details={}", - self.current_agent_id(), - capability, - action, - details - ); - } - WsMessage::StoreFile { filename } => { - let Some(agent_id) = self.assigned_agent_id() else { - Self::send_invalid(ctx, None, "agent must connect before storing files"); - span.end(); - return; - }; - let url = format!("/storage/{}/{}", agent_id, filename); - info!("Agent {} requested storage URL for {}: {}", agent_id, filename, url); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("PUT to {}", url), - }, - ); - } - WsMessage::FetchFile { agent_id, filename } => { - let url = format!("/storage/{}/{}", agent_id, filename); - info!( - "Agent {} requested fetch URL for {}/{}", - self.current_agent_id(), - agent_id, - filename - ); - Self::send_json( - ctx, - &WsMessage::Response { - message: format!("GET from {}", url), - }, - ); - } - WsMessage::ConnectAck { .. } - | WsMessage::ListAgentsResponse { .. } - | WsMessage::AgentMessage { .. } - | WsMessage::MessageStatus { .. } - | WsMessage::Invalid { .. } - | WsMessage::Response { .. } => { - warn!( - "Unexpected server-originated message from client {}", - self.current_agent_id() - ); - } - } - } else { - warn!( - "Received unrecognized message from client {}: {}", - self.current_agent_id(), - text - ); - } - span.end(); - } - Ok(ws::Message::Close(reason)) => { - self.mark_activity(); - info!( - "WebSocket close request from client: {} reason: {:?}", - self.current_agent_id(), - reason - ); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.disconnect"); - span.end(); - ctx.close(reason); - ctx.stop(); - } - Ok(ws::Message::Binary(_)) | Ok(ws::Message::Continuation(_)) | Ok(ws::Message::Nop) => { - self.mark_activity(); - } - Err(e) => { - error!("WebSocket error for client {}: {:?}", self.current_agent_id(), e); - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.error"); - span.end(); - } - } - } -} - -// WebSocket endpoint handler -async fn ws_handler( - req: HttpRequest, - stream: web::Payload, - registry: web::Data, -) -> Result { - let tracer = global::tracer("ws-server"); - let mut span = tracer.start("ws.connect"); - - let client_ip = req - .peer_addr() - .map(|addr| addr.ip().to_string()) - .or_else(|| { - req.connection_info() - .realip_remote_addr() - .and_then(|addr| addr.split(':').next().map(str::to_string)) - }) - .unwrap_or_else(|| "unknown".to_string()); - - let actor = WebSocketActor::new(registry.get_ref().clone(), client_ip); - let result = ws::start(actor, &req, stream); - - span.end(); - result -} - -fn workspace_root() -> PathBuf { - Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../..") - .canonicalize() - .expect("workspace root should exist") -} - -fn wasm_pkg_dir() -> PathBuf { - workspace_root().join("services/ws-wasm-agent/pkg") -} - -fn wasm_modules_dir() -> PathBuf { - workspace_root().join("services/ws-modules") -} - -fn browser_static_dir() -> PathBuf { - Path::new(env!("CARGO_MANIFEST_DIR")).join("static") -} - -async fn browser_index() -> Result { - let path = browser_static_dir().join("index.html"); - info!("Serving browser interface page: {:?}", path); - - NamedFile::open(path).map_err(|e| { - error!("Failed to open browser interface page: {}", e); - actix_web::error::ErrorNotFound(e) - }) -} - -async fn no_content() -> HttpResponse { - HttpResponse::NoContent().finish() -} - -// Static file handler — serves a named binary file from the ws-server static directory. -// Example: GET /files/firmware.bin → services/ws-server/static/firmware.bin -async fn file_handler(req: HttpRequest) -> Result { - // Extract the filename segment from the URL path. - let filename: PathBuf = req - .match_info() - .query("filename") - .parse() - .map_err(|_| actix_web::error::ErrorBadRequest("invalid filename"))?; - - // Prevent directory traversal: reject any path containing a separator. - if filename.components().count() != 1 { - return Err(actix_web::error::ErrorBadRequest("invalid filename")); - } - - let path = browser_static_dir().join(&filename); - - info!("Serving static file: {:?}", path); - - let file = NamedFile::open(&path).map_err(|e| { - error!("Failed to open static file {:?}: {}", path, e); - actix_web::error::ErrorNotFound(e) - })?; - - // Treat every file as an opaque binary stream so browsers don't - // try to render or sniff the content type. - Ok(file - .use_etag(true) - .use_last_modified(true) - .set_content_type(actix_web::mime::APPLICATION_OCTET_STREAM)) -} - -// Health check endpoint -async fn health() -> HttpResponse { - HttpResponse::Ok().json(serde_json::json!({ - "status": "healthy", - "service": "ws-server" - })) -} - -fn tls_config() -> std::io::Result { +fn tls_config() -> std::io::Result { let certified = rcgen::generate_simple_self_signed(vec![ "localhost".to_string(), "127.0.0.1".to_string(), @@ -848,27 +32,15 @@ fn tls_config() -> std::io::Result { ]) .map_err(|e| std::io::Error::other(format!("failed to generate dev certificate: {e}")))?; - let cert_der: CertificateDer<'static> = certified.cert.der().clone(); - let key_der = PrivatePkcs8KeyDer::from(certified.signing_key.serialize_der()); + let cert_der: rustls::pki_types::CertificateDer<'static> = certified.cert.der().clone(); + let key_der = rustls::pki_types::PrivatePkcs8KeyDer::from(certified.signing_key.serialize_der()); - ServerConfig::builder() + rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(vec![cert_der], key_der.into()) .map_err(|e| std::io::Error::other(format!("failed to configure TLS: {e}"))) } -use clap::Parser; - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Path to agent registry YAML file - #[arg(short, long, default_value = "registry.yaml")] - agent_registry: PathBuf, -} - -use actix_web::middleware::Logger; - #[actix_web::main] async fn main() -> std::io::Result<()> { let args = Args::parse(); @@ -907,25 +79,11 @@ async fn main() -> std::io::Result<()> { std::fs::create_dir_all(&storage_dir)?; let server = HttpServer::new(move || { + let registry = agent_registry.clone(); + let storage = storage_dir.clone(); App::new() .wrap(Logger::default()) - .app_data(agent_registry.clone()) - .route("/", web::get().to(browser_index)) - .route("/index.html", web::get().to(browser_index)) - .route("/favicon.ico", web::get().to(no_content)) - .route("/health", web::get().to(health)) - .route("/ws", web::get().to(ws_handler)) - .route("/files/{filename}", web::get().to(file_handler)) - .route("/storage/{agent_id}/{filename}", web::put().to(agent_put_file)) - .service( - Files::new("/storage", &storage_dir) - .show_files_listing() - .use_etag(true) - .use_last_modified(true), - ) - .service(Files::new("/modules", wasm_modules_dir()).prefer_utf8(true)) - .service(Files::new("/pkg", wasm_pkg_dir()).prefer_utf8(true)) - .service(Files::new("/static", browser_static_dir()).prefer_utf8(true)) + .configure(|cfg| configure_app(cfg, registry, storage)) }) .bind(("0.0.0.0", 8080))? .bind_rustls_0_23(("0.0.0.0", 8443), tls_config)? @@ -943,44 +101,3 @@ async fn main() -> std::io::Result<()> { server.await } - -async fn agent_put_file( - req: HttpRequest, - mut payload: web::Payload, - registry: web::Data, -) -> Result { - let agent_id: String = req.match_info().query("agent_id").parse().unwrap(); - let filename: PathBuf = req - .match_info() - .query("filename") - .parse() - .map_err(|_| actix_web::error::ErrorBadRequest("invalid filename"))?; - - // Validate agent exists - { - let agents = registry.agents.lock().expect("lock poisoned"); - if !agents.contains_key(&agent_id) { - return Err(actix_web::error::ErrorNotFound("agent not found")); - } - } - - if filename.components().count() != 1 { - return Err(actix_web::error::ErrorBadRequest("invalid filename")); - } - - let storage_dir = workspace_root().join("services/ws-server/storage"); - let agent_dir = storage_dir.join(&agent_id); - std::fs::create_dir_all(&agent_dir)?; - - let path = agent_dir.join(&filename); - info!("Agent {} storing file: {:?}", agent_id, path); - - use futures_util::StreamExt; - let mut file = tokio::fs::File::create(path).await?; - while let Some(chunk) = payload.next().await { - let chunk = chunk?; - tokio::io::copy(&mut &chunk[..], &mut file).await?; - } - - Ok(HttpResponse::Ok().finish()) -} diff --git a/services/ws-server/static/app.js b/services/ws-server/static/app.js index 12625a8..7108695 100644 --- a/services/ws-server/static/app.js +++ b/services/ws-server/static/app.js @@ -13,6 +13,8 @@ import init, { WsClientConfig, } from "/pkg/et_ws_wasm_agent.js"; +console.log("app.js: module loading started"); + const logEl = document.getElementById("log"); const moduleSelect = document.getElementById("module-select"); const runModuleButton = document.getElementById("run-module-button"); @@ -33,6 +35,7 @@ const sensorOutputEl = document.getElementById("sensor-output"); const videoOutputEl = document.getElementById("ml-debug-output"); const videoPreview = document.getElementById("video-preview"); const videoOutputCanvas = document.getElementById("video-output-canvas"); + let microphone = null; let videoCapture = null; let bluetoothDevice = null; @@ -71,27 +74,54 @@ const describeError = (error) => ( error instanceof Error ? error.message : String(error) ); -const WORKFLOW_MODULES = { - har1: { - label: "har1", - moduleUrl: "/modules/har1/pkg/et_ws_har1.js", - wasmUrl: "/modules/har1/pkg/et_ws_har1_bg.wasm", - }, - "face-detection": { - label: "face detection", - moduleUrl: "/modules/face-detection/pkg/et_ws_face_detection.js", - wasmUrl: "/modules/face-detection/pkg/et_ws_face_detection_bg.wasm", - }, - comm1: { - label: "comm1", - moduleUrl: "/modules/comm1/pkg/et_ws_comm1.js", - wasmUrl: "/modules/comm1/pkg/et_ws_comm1_bg.wasm", - }, - data1: { - label: "data1", - moduleUrl: "/modules/data1/pkg/et_ws_data1.js", - wasmUrl: "/modules/data1/pkg/et_ws_data1_bg.wasm", - }, +const WORKFLOW_MODULES = new Map(); + +const populateModuleDropdown = async () => { + append("Discovering modules via /api/modules..."); + const resp = await fetch("/api/modules"); + if (!resp.ok) { + append(`Failed to fetch module list from server: ${resp.status} ${resp.statusText}`); + return; + } + const moduleNames = await resp.json(); + append(`Found ${moduleNames.length} potential modules: ${moduleNames.join(", ")}`); + + // Clear current options + moduleSelect.innerHTML = ""; + + for (const name of moduleNames) { + try { + const moduleKey = name; + const moduleUrl = `/modules/${name}/pkg/et_ws_${name.replace(/-/g, "_")}.js`; + const wasmUrl = `/modules/${name}/pkg/et_ws_${name.replace(/-/g, "_")}_bg.wasm`; + + append(`Loading metadata for ${name}...`); + const loadedModule = await import(`${moduleUrl}?v=${Date.now()}`); + await loadedModule.default(wasmUrl); + + let metadata = { name, description: "", version: "" }; + if (typeof loadedModule.metadata === "function") { + metadata = loadedModule.metadata(); + } + + WORKFLOW_MODULES.set(moduleKey, { + label: metadata.description || metadata.name || name, + moduleUrl, + wasmUrl, + loaded: loadedModule, + }); + + const option = document.createElement("option"); + option.value = moduleKey; + option.textContent = WORKFLOW_MODULES.get(moduleKey).label; + moduleSelect.appendChild(option); + + append(`Successfully discovered module: ${name} (${metadata.version})`); + } catch (error) { + append(`Error discovering module ${name}: ${describeError(error)}`); + console.error(`discovery error for ${name}:`, error); + } + } }; const updateAgentCard = (status, agentId = currentAgentId) => { @@ -118,15 +148,17 @@ const writeStoredAgentId = (agentId) => { }; const loadWorkflowModule = async (moduleKey) => { - const moduleConfig = WORKFLOW_MODULES[moduleKey]; + const moduleConfig = WORKFLOW_MODULES.get(moduleKey); if (!moduleConfig) { throw new Error(`unknown workflow module: ${moduleKey}`); } - if (loadedWorkflowModules.has(moduleKey)) { - return loadedWorkflowModules.get(moduleKey); + if (moduleConfig.loaded) { + return moduleConfig.loaded; } + // This part is mostly handled by populateModuleDropdown now + // but kept for robustness if called separately. const cacheBust = Date.now(); const moduleUrl = `${moduleConfig.moduleUrl}?v=${cacheBust}`; const wasmUrl = `${moduleConfig.wasmUrl}?v=${cacheBust}`; @@ -134,13 +166,13 @@ const loadWorkflowModule = async (moduleKey) => { const loadedModule = await import(moduleUrl); append(`${moduleConfig.label} module: initializing ${wasmUrl}`); await loadedModule.default(wasmUrl); - loadedWorkflowModules.set(moduleKey, loadedModule); + moduleConfig.loaded = loadedModule; return loadedModule; }; const runSelectedWorkflowModule = async () => { const moduleKey = moduleSelect.value; - const moduleConfig = WORKFLOW_MODULES[moduleKey]; + const moduleConfig = WORKFLOW_MODULES.get(moduleKey); if (!moduleConfig) { throw new Error(`unknown workflow module: ${moduleKey}`); } @@ -627,6 +659,13 @@ const normalizeBox = (boxValues, format = "xyxy") => { return normalized; }; +const softmax = (logits) => { + const maxLogit = Math.max(...logits); + const scores = logits.map((l) => Math.exp(l - maxLogit)); + const sumScores = scores.reduce((a, b) => a + b, 0); + return scores.map((s) => s / sumScores); +}; + const findDetectionTensor = (entries, patterns, predicate = () => true) => { return entries.find(([name, tensor]) => { const normalizedName = String(name).toLowerCase(); @@ -1104,6 +1143,13 @@ updateAgentCard( ); try { + try { + await populateModuleDropdown(); + } catch (error) { + append(`Module discovery failed: ${describeError(error)}`); + console.error("populateModuleDropdown error:", error); + } + await init(); initTracing(); @@ -1415,7 +1461,7 @@ try { }); runModuleButton.addEventListener("click", async () => { - const selectedModule = WORKFLOW_MODULES[moduleSelect.value]; + const selectedModule = WORKFLOW_MODULES.get(moduleSelect.value); runModuleButton.disabled = true; moduleSelect.disabled = true; runModuleButton.textContent = selectedModule @@ -1442,7 +1488,7 @@ try { window.client = client; window.sendAlive = () => client.send_alive(); window.runWorkflowModule = (moduleKey) => { - if (moduleKey && WORKFLOW_MODULES[moduleKey]) { + if (moduleKey && WORKFLOW_MODULES.has(moduleKey)) { moduleSelect.value = moduleKey; } return runSelectedWorkflowModule(); diff --git a/services/ws-server/static/index.html b/services/ws-server/static/index.html index fbfa4c1..d2a936d 100644 --- a/services/ws-server/static/index.html +++ b/services/ws-server/static/index.html @@ -122,12 +122,7 @@

WASM web agent

- +

diff --git a/services/ws-server/tests/api_modules.rs b/services/ws-server/tests/api_modules.rs new file mode 100644 index 0000000..c1ca9fe --- /dev/null +++ b/services/ws-server/tests/api_modules.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; + +use actix_web::{App, test, web}; +use et_ws_server::{AgentRegistry, configure_app}; + +#[actix_rt::test] +async fn test_list_modules() { + // We need to use the real modules dir or a mock one. + // list_modules uses wasm_modules_dir() which is hardcoded in lib.rs to workspace_root().join("services/ws-modules") + + let agent_registry = web::Data::new(AgentRegistry::default()); + let storage_dir = PathBuf::from("/tmp/et-ws-test-storage"); + + let app = + test::init_service(App::new().configure(|cfg| configure_app(cfg, agent_registry.clone(), storage_dir.clone()))) + .await; + + let req = test::TestRequest::get().uri("/api/modules").to_request(); + let resp: Vec = test::call_and_read_body_json(&app, req).await; + + // We expect at least the modules we know exist + assert!(resp.contains(&"comm1".to_string())); + assert!(resp.contains(&"data1".to_string())); + assert!(resp.contains(&"har1".to_string())); + assert!(resp.contains(&"face-detection".to_string())); +}