From cb89227af8b3ac6d0771fa84c5705fe8b604fc62 Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Tue, 7 Apr 2026 08:35:53 +0800 Subject: [PATCH] Add OpenTelemetry logging --- .mise.toml | 14 +++++ Cargo.toml | 10 ++++ README.md | 16 ++++++ config/o2.env | 3 ++ libs/edge-toolkit/Cargo.toml | 7 +++ libs/edge-toolkit/src/args.rs | 18 +++++++ libs/edge-toolkit/src/auth.rs | 30 +++++++++++ libs/edge-toolkit/src/config.rs | 60 +++++++++++++++++++++ libs/edge-toolkit/src/lib.rs | 4 ++ libs/edge-toolkit/src/ports.rs | 30 +++++++++++ libs/edge-toolkit/tests/args.rs | 20 +++++++ services/ws-server/Cargo.toml | 41 ++++++++++---- services/ws-server/src/config.rs | 11 ++++ services/ws-server/src/main.rs | 58 ++++++++++++-------- services/ws-server/src/otlp.rs | 93 ++++++++++++++++++++++++++++++++ utilities/onnx/Cargo.toml | 11 ++++ utilities/onnx/src/main.rs | 21 ++++++++ 17 files changed, 415 insertions(+), 32 deletions(-) create mode 100644 config/o2.env create mode 100644 libs/edge-toolkit/src/args.rs create mode 100644 libs/edge-toolkit/src/auth.rs create mode 100644 libs/edge-toolkit/src/config.rs create mode 100644 libs/edge-toolkit/src/ports.rs create mode 100644 libs/edge-toolkit/tests/args.rs create mode 100644 services/ws-server/src/config.rs create mode 100644 services/ws-server/src/otlp.rs create mode 100644 utilities/onnx/Cargo.toml create mode 100644 utilities/onnx/src/main.rs diff --git a/.mise.toml b/.mise.toml index 756f189..ff118a2 100644 --- a/.mise.toml +++ b/.mise.toml @@ -85,6 +85,10 @@ description = "Run the WebSocket server" dir = "services/ws-server" run = "cargo run" +[tasks.ws-server.env] +OTLP_AUTH_PASSWORD = "1234" +OTLP_AUTH_USERNAME = "root@example.com" + [tasks.build-ws-wasm-agent] description = "Build the WebSocket WASM client" dir = "services/ws-wasm-agent" @@ -189,3 +193,13 @@ run = "env CHROMEDRIVER=\"$(mise where chromedriver)/bin/chromedriver\" wasm-pac [tasks.ws-e2e-chrome] depends = ["test-ws-wasm-agent-chrome", "ws-server"] description = "Run both the ws-server and ws-wasm-agent using Chrome" + +[tasks.openobserve] +alias = "o2" +run = "docker run --rm -it --name openobserve -p 5080:5080 --env-file config/o2.env openobserve/openobserve:v0.70.3" + +[tasks.demo] +depends = ["openobserve", "ws-server"] + +[tasks.open-o2] +run = "open http://localhost:5080/" diff --git a/Cargo.toml b/Cargo.toml index 1b03869..5f2094d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,23 @@ members = [ "services/ws-modules/video1", "services/ws-server", "services/ws-wasm-agent", + "utilities/onnx", ] resolver = "2" [workspace.dependencies] +base64 = "0.22.1" chrono = { version = "0.4", features = ["serde"] } +clap = { version = "4.4", features = ["derive"] } et-web = { path = "libs/web" } +log = "0.4" +onnx-extractor = "0.3" +rstest = "0.26" +secrecy = { version = "0.10.3", features = ["serde"] } serde = { version = "1", features = ["derive"] } +serde-env = "0.3" +serde-inline-default = "1.0" +serde_default = "0.2" serde_json = "1" tracing = "0.1" uuid = { version = "1", features = ["v4", "v7"] } diff --git a/README.md b/README.md index e503851..6db1485 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,14 @@ and save it as `services/ws-server/static/models/human_activity_recognition.onnx ### Build WASM and run the WS server +In a separate terminal start OpenObserve (o2) and leave it running. + +```bash +mise run o2 +``` + +Then start the server + ```bash mise run build-wasm mise run ws-server @@ -45,6 +53,14 @@ The module list is dynamically populated from the modules in [services/ws-module Note: The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95. +In a separate terminal, open the OpenObserve UX using: + +```bash +mise run open-o2 +``` + +The server logs appear in the Logs section. + ## Grant This repository is part of a grant managed by the School of EECMS, Curtin University. diff --git a/config/o2.env b/config/o2.env new file mode 100644 index 0000000..8240b96 --- /dev/null +++ b/config/o2.env @@ -0,0 +1,3 @@ +RUST_LOG=warn +ZO_ROOT_USER_EMAIL=root@example.com +ZO_ROOT_USER_PASSWORD=1234 diff --git a/libs/edge-toolkit/Cargo.toml b/libs/edge-toolkit/Cargo.toml index 4d666f2..d6371b6 100644 --- a/libs/edge-toolkit/Cargo.toml +++ b/libs/edge-toolkit/Cargo.toml @@ -7,5 +7,12 @@ license.workspace = true repository.workspace = true [dependencies] +base64.workspace = true +secrecy.workspace = true serde.workspace = true +serde-env.workspace = true +serde_default.workspace = true serde_json.workspace = true + +[dev-dependencies] +rstest.workspace = true diff --git a/libs/edge-toolkit/src/args.rs b/libs/edge-toolkit/src/args.rs new file mode 100644 index 0000000..7fec7d8 --- /dev/null +++ b/libs/edge-toolkit/src/args.rs @@ -0,0 +1,18 @@ +/// Return the executable name that was invoked. +/// +/// Removes `.exe` suffix. +/// +/// # Panics +/// This function will panic if there is no command line argument 0 +/// which may happen if the invoking environment is not similar to a "std" environment. +#[must_use] +pub fn executable_name() -> String { + executable_name_inner(std::env::args().collect()) +} + +#[expect(clippy::unwrap_used)] +pub fn executable_name_inner(args: Vec) -> String { + let path = args.first().unwrap(); + let path = std::path::PathBuf::from(path); + path.file_stem().unwrap().to_string_lossy().to_string() +} diff --git a/libs/edge-toolkit/src/auth.rs b/libs/edge-toolkit/src/auth.rs new file mode 100644 index 0000000..2695717 --- /dev/null +++ b/libs/edge-toolkit/src/auth.rs @@ -0,0 +1,30 @@ +use base64::{Engine, engine::general_purpose::STANDARD as b64standard}; +use secrecy::{ExposeSecret, SecretString}; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize)] +/// Basic Authentication config. +pub struct BasicAuth { + /// Username. + pub username: String, + /// Password. + pub password: SecretString, +} + +impl BasicAuth { + /// Create a new `BasicAuth` instance. + #[must_use] + pub fn new(username: String, password: SecretString) -> Self { + Self { username, password } + } + + /// Add authorisation header to HashMap. + pub fn add_basic_auth_header(&self, headers: &mut std::collections::HashMap) { + let mut buf = String::default(); + b64standard.encode_string( + format!("{}:{}", self.username, self.password.expose_secret()).as_bytes(), + &mut buf, + ); + headers.insert("authorization".to_string(), format!("Basic {buf}")); + } +} diff --git a/libs/edge-toolkit/src/config.rs b/libs/edge-toolkit/src/config.rs new file mode 100644 index 0000000..0da8ad5 --- /dev/null +++ b/libs/edge-toolkit/src/config.rs @@ -0,0 +1,60 @@ +use serde::Deserialize; +use serde_default::DefaultFromSerde; + +use crate::args::executable_name; +use crate::auth::BasicAuth; +use crate::ports::Services; + +/// Localhost address 127.0.0.1 . +pub const LOCALHOST: &str = "127.0.0.1"; + +/// Default port for the otlp http collector. +#[must_use] +const fn default_otlp_collector_port() -> u16 { + Services::OtlpCollector.port() +} + +/// Default url for the otlp collector. This is the tracing endpoint path for OpenObserve trace collection. +#[must_use] +pub fn default_otlp_collector_url() -> String { + format!("http://{LOCALHOST}:{}/api/default/v1", default_otlp_collector_port()) +} + +/// Default service label name for use in OpenTelemetry. +/// +/// Removes "-server" suffix from the invoked executable name if present, +/// such as binary name `et-ws-server`. +#[must_use] +pub fn default_trace_service_label() -> String { + executable_name().replace("-server", "") +} + +/// OTLP message data protocol. +/// +/// Binary is more compact and efficient, while JSON is more human-readable and easier to debug. +#[expect(clippy::exhaustive_enums)] +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq)] +pub enum OtlpProtocol { + /// Binary messages. + #[default] + Binary, + /// JSON messages. + JSON, +} + +/// OpenTelemetry service config. +#[derive(Clone, Debug, DefaultFromSerde, Deserialize)] +#[non_exhaustive] +pub struct OtlpConfig { + /// OpenTelemetry collector URL. + #[serde(default = "default_otlp_collector_url")] + pub collector_url: String, + /// OpenTelemetry protocol. + #[serde(default)] + pub protocol: OtlpProtocol, + /// OpenTelemetry service label. + #[serde(default = "default_trace_service_label")] + pub service_label: String, + /// OpenTelemetry HTTP basic auth. + pub auth: Option, +} diff --git a/libs/edge-toolkit/src/lib.rs b/libs/edge-toolkit/src/lib.rs index 6757c99..5d8cb0b 100644 --- a/libs/edge-toolkit/src/lib.rs +++ b/libs/edge-toolkit/src/lib.rs @@ -1 +1,5 @@ +pub mod args; +pub mod auth; +pub mod config; +pub mod ports; pub mod ws; diff --git a/libs/edge-toolkit/src/ports.rs b/libs/edge-toolkit/src/ports.rs new file mode 100644 index 0000000..00178a8 --- /dev/null +++ b/libs/edge-toolkit/src/ports.rs @@ -0,0 +1,30 @@ +//! Port allocation. + +/// Define each services port. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +pub enum Services { + /// OTLP collector. + OtlpCollector, + + /// Insecure WebSocket server. + InsecureWebSocketServer, + /// Secure WebSocket server. + SecureWebSocketServer, +} + +use Services::{InsecureWebSocketServer, OtlpCollector, SecureWebSocketServer}; + +impl Services { + /// Get the allocation port for the service. + #[must_use] + pub const fn port(&self) -> u16 { + match self { + // OpenObserve specific http port + OtlpCollector => 5080, + + InsecureWebSocketServer => 8080, + SecureWebSocketServer => 8443, + } + } +} diff --git a/libs/edge-toolkit/tests/args.rs b/libs/edge-toolkit/tests/args.rs new file mode 100644 index 0000000..cc11d04 --- /dev/null +++ b/libs/edge-toolkit/tests/args.rs @@ -0,0 +1,20 @@ +#![cfg(test)] + +#[rstest::rstest] +#[case::normal(vec!["et-ws-server"])] +#[case::unix_path(vec!["/path/to/et-ws-server"])] +#[case::windows_exe(vec!["et-ws-server.exe"])] +fn executable_name(#[case] args: Vec<&str>) { + let args: Vec = args.into_iter().map(String::from).collect(); + assert_eq!( + edge_toolkit::args::executable_name_inner(args), + "et-ws-server".to_string() + ); +} + +#[cfg(windows)] +#[rstest::rstest] +#[case::windows_path(vec!["C:\\path\\to\\et-ws-server"])] +fn executable_name_windows(#[case] args: Vec<&str>) { + executable_name(args); +} diff --git a/services/ws-server/Cargo.toml b/services/ws-server/Cargo.toml index af753c5..7bdb425 100644 --- a/services/ws-server/Cargo.toml +++ b/services/ws-server/Cargo.toml @@ -8,28 +8,47 @@ repository.workspace = true [dependencies] actix = "0.13" actix-files = "0.6" +actix-rt = "2" actix-web = { version = "4", features = ["rustls-0_23"] } actix-web-actors = "4" +chrono.workspace = true +clap.workspace = true edge-toolkit = { path = "../../libs/edge-toolkit" } -# mime = "0.3" +futures-util = "0.3" +hostname = "0.4" +local-ip-address = "0.6" +log.workspace = true +onnx-extractor.workspace = true opentelemetry = "0.31" -opentelemetry-otlp = { version = "0.31", default-features = false, features = ["logs", "metrics", "trace"] } +opentelemetry-appender-tracing = "0.31" +opentelemetry-otlp = { version = "0.31", default-features = false, features = [ + "http-json", + "http-proto", + "logs", + "metrics", + "reqwest-blocking-client", + "trace", +] } opentelemetry_sdk = "0.31" +qr2term = "0.3" rcgen = "0.14" +regex = { version = "1.12", default-features = false } rustls = "0.23" +secrecy.workspace = true serde.workspace = true +serde-env.workspace = true +serde-inline-default.workspace = true +serde_default.workspace = true serde_json.workspace = true serde_yaml = "0.9" tokio = { version = "1", features = ["full"] } tracing.workspace = true +tracing-actix-web = { version = "0.7", default-features = false, features = [ + "emit_event_on_error", + "opentelemetry_0_31", + "uuid_v7", +] } +tracing-log = "0.2" +tracing-opentelemetry = "0.32" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -#opentelemetry-actix-web = "0.10" -chrono.workspace = true -clap = { version = "4.4", features = ["derive"] } -futures-util = "0.3" -local-ip-address = "0.6" -qr2term = "0.3" uuid.workspace = true - -[dependencies.actix-rt] -version = "2" diff --git a/services/ws-server/src/config.rs b/services/ws-server/src/config.rs new file mode 100644 index 0000000..954c1bc --- /dev/null +++ b/services/ws-server/src/config.rs @@ -0,0 +1,11 @@ +use edge_toolkit::config::OtlpConfig; +use serde::Deserialize; +use serde_default::DefaultFromSerde; + +/// Application environment variables and config. +#[derive(Clone, Debug, DefaultFromSerde, Deserialize)] +pub struct Config { + /// OpenTelemetry config. + #[serde(default)] + pub otlp: Option, +} diff --git a/services/ws-server/src/main.rs b/services/ws-server/src/main.rs index 700982f..6bfac5e 100644 --- a/services/ws-server/src/main.rs +++ b/services/ws-server/src/main.rs @@ -4,11 +4,14 @@ use actix_web::middleware::Logger; use actix_web::{App, HttpServer, web}; use clap::Parser; use et_ws_server::{AgentRegistry, browser_static_dir, configure_app, wasm_modules_dir, wasm_pkg_dir, workspace_root}; -use opentelemetry::global; -use opentelemetry_sdk::trace::SdkTracerProvider as TracerProvider; use tracing::{error, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +mod config; +mod otlp; + +use crate::config::Config; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -17,13 +20,6 @@ struct Args { agent_registry: PathBuf, } -// Initialize OpenTelemetry -fn init_tracing() -> opentelemetry_sdk::trace::SdkTracerProvider { - let provider = TracerProvider::builder().build(); - global::set_tracer_provider(provider.clone()); - provider -} - fn tls_config() -> std::io::Result { let certified = rcgen::generate_simple_self_signed(vec![ "localhost".to_string(), @@ -44,23 +40,40 @@ fn tls_config() -> std::io::Result { #[actix_web::main] async fn main() -> std::io::Result<()> { let args = Args::parse(); - let _provider = init_tracing(); - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info,ws_server=debug,actix_web=info".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); + let env = serde_env::from_env::().unwrap(); + + eprintln!("Starting with env vars {env:#?}"); + + if let Some(otlp_config) = &env.otlp { + info!("OpenTelemetry configuration detected, initializing tracing..."); + let _provider = crate::otlp::init(otlp_config); + } else { + info!("No OpenTelemetry configuration detected, using default tracing settings..."); + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,et_ws_server=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + } let tls_config = tls_config()?; let network_ip = local_ip_address::local_ip() .map(|ip| ip.to_string()) .unwrap_or_else(|_| "127.0.0.1".to_string()); - let https_url = format!("https://{}:8443", network_ip); - info!("Starting WebSocket server on http://{}:8080", network_ip); + let https_url = format!( + "https://{}:{}", + network_ip, + edge_toolkit::ports::Services::SecureWebSocketServer.port() + ); + info!( + "Starting WebSocket server on http://{}:{}", + network_ip, + edge_toolkit::ports::Services::InsecureWebSocketServer.port() + ); info!("Starting WebSocket server on {}", https_url); info!("Scan this QR code to open the browser interface:"); if let Err(e) = qr2term::print_qr(&https_url) { @@ -85,8 +98,11 @@ async fn main() -> std::io::Result<()> { .wrap(Logger::default()) .configure(|cfg| configure_app(cfg, registry, storage)) }) - .bind(("0.0.0.0", 8080))? - .bind_rustls_0_23(("0.0.0.0", 8443), tls_config)? + .bind(("0.0.0.0", edge_toolkit::ports::Services::InsecureWebSocketServer.port()))? + .bind_rustls_0_23( + ("0.0.0.0", edge_toolkit::ports::Services::SecureWebSocketServer.port()), + tls_config, + )? .run(); let handle = server.handle(); diff --git a/services/ws-server/src/otlp.rs b/services/ws-server/src/otlp.rs new file mode 100644 index 0000000..9a67fda --- /dev/null +++ b/services/ws-server/src/otlp.rs @@ -0,0 +1,93 @@ +use edge_toolkit::config::{OtlpConfig, OtlpProtocol}; +use opentelemetry::{KeyValue, trace::TracerProvider}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, WithExportConfig, WithHttpConfig}; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator}; +use tracing::subscriber::set_global_default; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt}; + +pub const RUST_LOG: &str = "RUST_LOG"; + +// Initialize OpenTelemetry. +pub fn init(config: &OtlpConfig) -> SdkTracerProvider { + tracing_log::LogTracer::init().unwrap(); + + let mut telemetry_collector_headers = std::collections::HashMap::new(); + if let Some(auth) = &config.auth { + auth.add_basic_auth_header(&mut telemetry_collector_headers); + } + + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + let trace_endpoint = format!("{}/traces", config.collector_url.clone()); + let log_endpoint = format!("{}/logs", config.collector_url.clone()); + + let protocol = match config.protocol { + OtlpProtocol::Binary => opentelemetry_otlp::Protocol::HttpBinary, + OtlpProtocol::JSON => opentelemetry_otlp::Protocol::HttpJson, + }; + + let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_protocol(protocol) + .with_endpoint(trace_endpoint) + .with_headers(telemetry_collector_headers.clone()) + .build() + .unwrap(); + + let mut service_descriptors = vec![KeyValue::new("service.version", env!("CARGO_PKG_VERSION").to_string())]; + if let Some(hostname) = hostname::get().ok().and_then(|h| h.into_string().ok()) { + service_descriptors.push(KeyValue::new("service.instance", hostname)); + } + + let resource = Resource::builder() + .with_service_name(config.service_label.clone()) + .with_attributes(service_descriptors) + .build(); + + let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(otlp_exporter) + .with_resource(resource.clone()) + .build(); + + let otel_tracing_layer = OpenTelemetryLayer::new(provider.tracer(config.service_label.clone())); + + let log_directives = if let Ok(level) = std::env::var(RUST_LOG) { + log::info!("{RUST_LOG}={level}"); + level + } else { + log::info!("{RUST_LOG} defaulted to info"); + "info".to_string() + }; + let env_filter = EnvFilter::try_new(log_directives).unwrap(); + + let exporter = LogExporter::builder() + .with_http() + .with_protocol(protocol) + .with_endpoint(log_endpoint) + .with_headers(telemetry_collector_headers) + .build() + .unwrap(); + + let log_provider = SdkLoggerProvider::builder() + .with_batch_exporter(exporter) + .with_resource(resource) + .build(); + + let otel_layer = OpenTelemetryTracingBridge::new(&log_provider); + + let stdout_format = tracing_subscriber::fmt::format().compact(); + + let stdout_fmt_layer = tracing_subscriber::fmt::layer().event_format(stdout_format); + + let subscriber = Registry::default() + .with(env_filter) + .with(stdout_fmt_layer) + .with(otel_tracing_layer) + .with(otel_layer); + + set_global_default(subscriber).unwrap(); + provider +} diff --git a/utilities/onnx/Cargo.toml b/utilities/onnx/Cargo.toml new file mode 100644 index 0000000..6f0130c --- /dev/null +++ b/utilities/onnx/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "et-onnx" +description = "CLI to investigate ONNX models" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +clap.workspace = true +onnx-extractor.workspace = true diff --git a/utilities/onnx/src/main.rs b/utilities/onnx/src/main.rs new file mode 100644 index 0000000..878ffdc --- /dev/null +++ b/utilities/onnx/src/main.rs @@ -0,0 +1,21 @@ +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path to the ONNX model file. + #[arg(short, long)] + filename: PathBuf, +} + +fn main() { + //} -> std::io::Result<()> { + let args = Args::parse(); + + let model = onnx_extractor::OnnxModel::load_from_file(&args.filename.to_string_lossy()).unwrap(); + + model.print_summary(); + model.print_model_info(); +}