Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ action-validator = "latest"
"cargo:wasm-pack" = "latest"
"chromedriver" = "146"
cmake = "latest"
codex = "latest"
dprint = "latest"
editorconfig-checker = "latest"
gemini-cli = "latest"
"github:block/goose" = "latest"
"github:wasm-bindgen/wasm-bindgen" = "0.2.114"
ollama = "latest"
Expand Down Expand Up @@ -102,8 +104,13 @@ description = "Build the face detection workflow WASM module"
dir = "services/ws-modules/face-detection"
run = "wasm-pack build . --target web"

[tasks.build]
depends = ["build-ws-face-detection-module", "build-ws-har1-module", "build-ws-wasm-agent"]
[tasks.build-ws-comm1-module]
description = "Build the comm1 workflow WASM module"
dir = "services/ws-modules/comm1"
run = "wasm-pack build . --target web"

[tasks.build-wasm]
depends = ["build-ws-comm1-module", "build-ws-face-detection-module", "build-ws-har1-module", "build-ws-wasm-agent"]
description = "Build all WebAssembly modules"

[tasks.test-ws-wasm-agent-firefox]
Expand Down
17 changes: 12 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
[workspace.package]
edition = "2024"
license = "Apache-2.0 and MIT"
repository = "https://github.com/edge-toolkit/core"
rust-version = "1.87.0"

[workspace]
members = [
"libs/edge-toolkit",
"services/ws-modules/face-detection",
"services/ws-modules/har1",
"services/ws-server",
"services/ws-wasm-agent",
"libs/edge-toolkit",
"services/ws-modules/comm1",
"services/ws-modules/face-detection",
"services/ws-modules/har1",
"services/ws-server",
"services/ws-wasm-agent",
]
resolver = "2"

Expand Down
12 changes: 4 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,21 @@ and save it as `services/ws-server/static/models/human_activity_recognition.onnx
2. Save it in `services/ws-server/static/models/`
3. Rename the file to `video_cv.onnx`.

### Build and run the agent
### Build WASM and run the WS server

```bash
mise run build-ws-wasm-agent
mise run build-ws-har1-module
mise run build-ws-face-detection-module
mise run build-wasm
mise run ws-server
```

The WASM build disables WebAssembly reference types so it can still load on older browsers such as Chrome 95.
The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95.

Find the IP address of your laptop in the local network,
which will normally be something like 192.168.1.x.

Then on your phone, open Chrome and type in https://192.168.1.x:8433/

Click "har demo".

For webcam inference, click "face demo".
Select the module to run in the drop down, then click "Run module" button.

## Grant

Expand Down
6 changes: 3 additions & 3 deletions libs/edge-toolkit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
name = "edge-toolkit"
description = "A collection of utilities and common code for Edge Toolkit services"
version = "0.1.0"
edition = "2024"
license = "Apache-2.0 or MIT"
repository = "https://github.com/edge-toolkit/core"
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
serde.workspace = true
Expand Down
60 changes: 60 additions & 0 deletions libs/edge-toolkit/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,36 @@ pub enum ConnectStatus {
Reconnected,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum MessageDeliveryStatus {
Delivered,
Queued,
Acknowledged,
Broadcast,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum MessageScope {
Direct,
Broadcast,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentConnectionState {
Connected,
Disconnected,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentSummary {
pub agent_id: String,
pub state: AgentConnectionState,
pub last_known_ip: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsMessage {
Expand All @@ -20,6 +50,36 @@ pub enum WsMessage {
Alive {
timestamp: String,
},
ListAgents,
ListAgentsResponse {
agents: Vec<AgentSummary>,
},
SendAgentMessage {
to_agent_id: String,
message: serde_json::Value,
},
BroadcastMessage {
message: serde_json::Value,
},
AgentMessage {
message_id: String,
from_agent_id: String,
scope: MessageScope,
server_received_at: String,
message: serde_json::Value,
},
MessageAck {
message_id: String,
},
MessageStatus {
message_id: Option<String>,
status: MessageDeliveryStatus,
detail: String,
},
Invalid {
message_id: Option<String>,
detail: String,
},
ClientEvent {
capability: String,
action: String,
Expand Down
24 changes: 24 additions & 0 deletions services/ws-modules/comm1/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "et-ws-comm1"
version = "0.1.0"
edition.workspace = true
license.workspace = true
repository.workspace = true

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
edge-toolkit = { path = "../../../libs/edge-toolkit" }
et-ws-wasm-agent = { path = "../../ws-wasm-agent" }
js-sys = "0.3"
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-wasm = "0.2"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Window", "console"] }

[dev-dependencies]
wasm-bindgen-test = "0.3"
207 changes: 207 additions & 0 deletions services/ws-modules/comm1/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use std::cell::RefCell;
use std::rc::Rc;

use edge_toolkit::ws::{AgentConnectionState, AgentSummary, WsMessage};
use et_ws_wasm_agent::{WsClient, WsClientConfig, append_to_textarea};
use js_sys::{Promise, Reflect};
use serde_json::json;
use tracing::info;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;

const LIST_AGENTS_POLL_MS: i32 = 1_000;
const MESSAGE_PAUSE_MS: i32 = 3_000;

#[wasm_bindgen(start)]
pub fn init() {
tracing_wasm::set_as_global_default();
info!("comm1 workflow module initialized");
}

#[wasm_bindgen]
pub async fn run() -> Result<(), JsValue> {
log("comm1: entered run()")?;

let ws_url = websocket_url()?;
log(&format!("comm1: resolved websocket URL: {ws_url}"))?;

let mut client = WsClient::new(WsClientConfig::new(ws_url));
let self_agent_id = Rc::new(RefCell::new(String::new()));
let other_connected_agents: Rc<RefCell<Vec<AgentSummary>>> = Rc::new(RefCell::new(Vec::new()));

let on_message = Closure::wrap(Box::new({
let self_agent_id = self_agent_id.clone();
let other_connected_agents = other_connected_agents.clone();
move |value: JsValue| {
let Some(data) = value.as_string() else {
return;
};

let Ok(message) = serde_json::from_str::<WsMessage>(&data) else {
return;
};

match message {
WsMessage::ListAgentsResponse { agents } => {
let own_id = self_agent_id.borrow().clone();
let others = agents
.into_iter()
.filter(|agent| {
agent.state == AgentConnectionState::Connected
&& !own_id.is_empty()
&& agent.agent_id != own_id
})
.collect::<Vec<_>>();
*other_connected_agents.borrow_mut() = others;
}
WsMessage::AgentMessage {
message_id,
from_agent_id,
scope,
server_received_at,
message,
} => {
let summary =
serde_json::to_string(&message).unwrap_or_else(|_| String::from("<unprintable message>"));
let line = format!(
"comm1: received {:?} message {} from {} at {}: {}",
scope, message_id, from_agent_id, server_received_at, summary
);
web_sys::console::log_1(&JsValue::from_str(&line));
let _ = set_module_status(&line);
}
WsMessage::MessageStatus {
message_id,
status,
detail,
} => {
let line = format!("comm1: message status update {:?} {:?}: {}", message_id, status, detail);
web_sys::console::log_1(&JsValue::from_str(&line));
let _ = set_module_status(&line);
}
WsMessage::Invalid { message_id, detail } => {
let line = format!("comm1: invalid server response {:?}: {}", message_id, detail);
web_sys::console::warn_1(&JsValue::from_str(&line));
let _ = set_module_status(&line);
}
_ => {}
}
}
}) as Box<dyn FnMut(JsValue)>);
client.set_on_message(on_message.as_ref().clone());

client.connect()?;
wait_for_connected(&client).await?;
let agent_id = wait_for_agent_id(&client).await?;
*self_agent_id.borrow_mut() = agent_id.clone();
let msg = format!("comm1: websocket connected with agent_id={agent_id}");
log(&msg)?;
set_module_status(&msg)?;

let target_agent = loop {
client.request_list_agents()?;
sleep_ms(LIST_AGENTS_POLL_MS).await?;

let agents = other_connected_agents.borrow().clone();
if let Some(agent) = agents.first() {
break agent.clone();
}
};

let msg = format!(
"comm1: found connected peer agent {}; sending broadcast",
target_agent.agent_id
);
log(&msg)?;
set_module_status(&msg)?;
client.broadcast_message(json!({
"module": "comm1",
"step": "broadcast",
"from_agent_id": agent_id,
"message": "comm1 broadcast to all other connected agents"
}))?;

sleep_ms(MESSAGE_PAUSE_MS).await?;

let msg = format!("comm1: sending direct message to {}", target_agent.agent_id);
log(&msg)?;
set_module_status(&msg)?;
client.send_agent_message(
target_agent.agent_id.clone(),
json!({
"module": "comm1",
"step": "direct",
"from_agent_id": agent_id,
"message": "comm1 direct message"
}),
)?;

sleep_ms(MESSAGE_PAUSE_MS).await?;
client.disconnect();
let msg = "comm1: workflow complete";
log(msg)?;
set_module_status(msg)?;
Ok(())
}

fn log(message: &str) -> Result<(), JsValue> {
let line = format!("[comm1] {message}");
web_sys::console::log_1(&JsValue::from_str(&line));
Ok(())
}

fn set_module_status(message: &str) -> Result<(), JsValue> {
append_to_textarea("module-output", message)
}

async fn wait_for_connected(client: &WsClient) -> Result<(), JsValue> {
for _ in 0..100 {
if client.get_state() == "connected" {
return Ok(());
}
sleep_ms(100).await?;
}

Err(JsValue::from_str("Timed out waiting for websocket connection"))
}

async fn wait_for_agent_id(client: &WsClient) -> Result<String, JsValue> {
for _ in 0..100 {
let agent_id = client.get_client_id();
if !agent_id.is_empty() {
return Ok(agent_id);
}
sleep_ms(100).await?;
}

Err(JsValue::from_str("Timed out waiting for assigned agent_id"))
}

async fn sleep_ms(duration_ms: i32) -> Result<(), JsValue> {
let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?;
let promise = Promise::new(&mut |resolve, reject| {
let callback = Closure::once_into_js(move || {
let _ = resolve.call0(&JsValue::NULL);
});

if let Err(error) =
window.set_timeout_with_callback_and_timeout_and_arguments_0(callback.unchecked_ref(), duration_ms)
{
let _ = reject.call1(&JsValue::NULL, &error);
}
});
JsFuture::from(promise).await.map(|_| ())
}

fn websocket_url() -> Result<String, JsValue> {
let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?;
let location = Reflect::get(window.as_ref(), &JsValue::from_str("location"))?;
let protocol = Reflect::get(&location, &JsValue::from_str("protocol"))?
.as_string()
.ok_or_else(|| JsValue::from_str("window.location.protocol is unavailable"))?;
let host = Reflect::get(&location, &JsValue::from_str("host"))?
.as_string()
.ok_or_else(|| JsValue::from_str("window.location.host is unavailable"))?;
let ws_protocol = if protocol == "https:" { "wss:" } else { "ws:" };
Ok(format!("{ws_protocol}//{host}/ws"))
}
Loading
Loading