From 523fcc205ff454163ce0c11f203d2e008f2e1b3f Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:04:07 -0300 Subject: [PATCH 01/12] moq-lite: add optional Stats hooks; moq-native accept_with_stats --- js/lite/src/connection/connect.ts | 21 ++++++++++++++- rs/moq-lite/src/ietf/publisher.rs | 24 ++++++++++++++--- rs/moq-lite/src/ietf/session.rs | 13 +++++----- rs/moq-lite/src/ietf/subscriber.rs | 14 +++++++++- rs/moq-lite/src/lib.rs | 2 ++ rs/moq-lite/src/lite/publisher.rs | 30 ++++++++++++++++++---- rs/moq-lite/src/lite/session.rs | 14 +++++----- rs/moq-lite/src/lite/subscriber.rs | 8 +++++- rs/moq-lite/src/session.rs | 41 ++++++++++++++++++++++++------ rs/moq-lite/src/stats.rs | 30 ++++++++++++++++++++++ rs/moq-native/src/server.rs | 32 ++++++++++++++++++++--- 11 files changed, 192 insertions(+), 37 deletions(-) create mode 100644 rs/moq-lite/src/stats.rs diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index 3bf642d1e..f6bfcba17 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -3,6 +3,17 @@ import * as Ietf from "../ietf/index.ts"; import * as Lite from "../lite/index.ts"; import { Stream } from "../stream.ts"; import * as Hex from "../util/hex.ts"; + +// Connection type tracking for observability +let connectionTypeCallback: ((type: "webtransport" | "websocket") => void) | undefined; + +/** + * Register a callback to be notified of connection type. + * Used by observability to track WebTransport vs WebSocket usage. + */ +export function onConnectionType(callback: (type: "webtransport" | "websocket") => void) { + connectionTypeCallback = callback; +} import type { Established } from "./established.ts"; export interface WebSocketOptions { @@ -64,12 +75,20 @@ export async function connect(url: URL, props?: ConnectProps): Promise { session: S, origin: OriginConsumer, control: Control, + stats: Option>, // Drop in order to cancel the subscribe. subscribes: Lock>>, @@ -24,13 +26,20 @@ pub(super) struct Publisher { } impl Publisher { - pub fn new(session: S, origin: Option, control: Control, version: Version) -> Self { + pub fn new( + session: S, + origin: Option, + control: Control, + stats: Option>, + version: Version, + ) -> Self { // Default to a dummy origin that is immediately closed. let origin = origin.unwrap_or_else(|| Origin::produce().consumer); Self { session, origin, control, + stats, subscribes: Default::default(), version, } @@ -108,10 +117,11 @@ impl Publisher { let control = self.control.clone(); let request_id = msg.request_id; let subscribes = self.subscribes.clone(); + let stats = self.stats.clone(); let version = self.version; web_async::spawn(async move { - if let Err(err) = Self::run_track(session, track, request_id, rx, version).await { + if let Err(err) = Self::run_track(session, track, request_id, rx, stats, version).await { control .send(ietf::PublishDone { request_id, @@ -150,6 +160,7 @@ impl Publisher { mut track: TrackConsumer, request_id: RequestId, mut cancel: oneshot::Receiver<()>, + stats: Option>, version: Version, ) -> Result<(), Error> { // TODO use a BTreeMap serve the latest N groups by sequence. @@ -212,6 +223,7 @@ impl Publisher { msg, track.info.priority, group, + stats.clone(), version, )); @@ -241,6 +253,7 @@ impl Publisher { msg: ietf::GroupHeader, priority: u8, mut group: GroupConsumer, + stats: Option>, version: Version, ) -> Result<(), Error> { // TODO add a way to open in priority order. @@ -293,7 +306,12 @@ impl Publisher { }; match chunk? { - Some(mut chunk) => stream.write_all(&mut chunk).await?, + Some(mut chunk) => { + if let Some(stats) = &stats { + stats.add_tx_bytes(chunk.len() as u64); + } + stream.write_all(&mut chunk).await? + } None => break, } } diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index 45d857ff9..5312a403c 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -1,8 +1,4 @@ -use crate::{ - Error, OriginConsumer, OriginProducer, - coding::{Reader, Stream}, - ietf::{self, Control, Message, RequestId, Version}, -}; +use crate::{coding::{Reader, Stream}, ietf::{self, Control, Message, RequestId, Version}, Error, OriginConsumer, OriginProducer, Stats}; use super::{Publisher, Subscriber}; @@ -13,6 +9,7 @@ pub(crate) async fn start( client: bool, publish: Option, subscribe: Option, + stats: Option>, version: Version, ) -> Result<(), Error> { web_async::spawn(async move { @@ -23,6 +20,7 @@ pub(crate) async fn start( client, publish, subscribe, + stats, version, ) .await @@ -52,12 +50,13 @@ async fn run( client: bool, publish: Option, subscribe: Option, + stats: Option>, version: Version, ) -> Result<(), Error> { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let control = Control::new(tx, request_id_max, client, version); - let publisher = Publisher::new(session.clone(), publish, control.clone(), version); - let subscriber = Subscriber::new(session.clone(), subscribe, control.clone(), version); + let publisher = Publisher::new(session.clone(), publish, control.clone(), stats.clone(), version); + let subscriber = Subscriber::new(session.clone(), subscribe, control.clone(), stats.clone(), version); tokio::select! { res = subscriber.clone().run() => res, diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index b8ace558d..50b3f2847 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -9,6 +9,7 @@ use crate::{ coding::Reader, ietf::{self, Control, FetchHeader, FilterType, GroupFlags, GroupOrder, RequestId, Version}, model::BroadcastProducer, + Stats, }; use web_async::Lock; @@ -47,17 +48,25 @@ pub(super) struct Subscriber { origin: Option, state: Lock, control: Control, + stats: Option>, version: Version, } impl Subscriber { - pub fn new(session: S, origin: Option, control: Control, version: Version) -> Self { + pub fn new( + session: S, + origin: Option, + control: Control, + stats: Option>, + version: Version, + ) -> Self { Self { session, origin, state: Default::default(), control, + stats, version, } } @@ -391,6 +400,9 @@ impl Subscriber { while remain > 0 { let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?; + if let Some(stats) = &self.stats { + stats.add_rx_bytes(chunk.len() as u64); + } remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?; frame.write_chunk(chunk); } diff --git a/rs/moq-lite/src/lib.rs b/rs/moq-lite/src/lib.rs index 9494a1284..78d6eee72 100644 --- a/rs/moq-lite/src/lib.rs +++ b/rs/moq-lite/src/lib.rs @@ -39,6 +39,7 @@ mod model; mod path; mod session; mod setup; +mod stats; pub mod coding; pub mod ietf; @@ -48,3 +49,4 @@ pub use error::*; pub use model::*; pub use path::*; pub use session::*; +pub use stats::*; diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index d32b3aca8..b253a5589 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -10,22 +10,25 @@ use crate::{ priority::{PriorityHandle, PriorityQueue}, }, model::GroupConsumer, + Stats, }; pub(super) struct Publisher { session: S, origin: OriginConsumer, + stats: Option>, priority: PriorityQueue, version: Version, } impl Publisher { - pub fn new(session: S, origin: Option, version: Version) -> Self { + pub fn new(session: S, origin: Option, stats: Option>, version: Version) -> Self { // Default to a dummy origin that is immediately closed. let origin = origin.unwrap_or_else(|| Origin::produce().consumer); Self { session, origin, + stats, priority: Default::default(), version, } @@ -151,11 +154,13 @@ impl Publisher { let broadcast = self.origin.consume_broadcast(&subscribe.broadcast); let priority = self.priority.clone(); + let stats = self.stats.clone(); let version = self.version; let session = self.session.clone(); web_async::spawn(async move { - if let Err(err) = Self::run_subscribe(session, &mut stream, &subscribe, broadcast, priority, version).await + if let Err(err) = + Self::run_subscribe(session, &mut stream, &subscribe, broadcast, priority, stats, version).await { match &err { // TODO better classify WebTransport errors. @@ -181,6 +186,7 @@ impl Publisher { subscribe: &lite::Subscribe<'_>, consumer: Option, priority: PriorityQueue, + stats: Option>, version: Version, ) -> Result<(), Error> { let track = Track { @@ -200,7 +206,7 @@ impl Publisher { stream.writer.encode(&info).await?; tokio::select! { - res = Self::run_track(session, track, subscribe, priority, version) => res?, + res = Self::run_track(session, track, subscribe, priority, stats, version) => res?, res = stream.reader.closed() => res?, } @@ -213,6 +219,7 @@ impl Publisher { mut track: TrackConsumer, subscribe: &lite::Subscribe<'_>, priority: PriorityQueue, + stats: Option>, version: Version, ) -> Result<(), Error> { // TODO use a BTreeMap serve the latest N groups by sequence. @@ -268,7 +275,14 @@ impl Publisher { // Spawn a task to serve this group, ignoring any errors because they don't really matter. // TODO add some logging at least. - let handle = Box::pin(Self::serve_group(session.clone(), msg, priority, group, version)); + let handle = Box::pin(Self::serve_group( + session.clone(), + msg, + priority, + group, + stats.clone(), + version, + )); // Terminate the old group if it's still running. if let Some(old_sequence) = old_sequence.take() { @@ -296,6 +310,7 @@ impl Publisher { msg: lite::Group, mut priority: PriorityHandle, mut group: GroupConsumer, + stats: Option>, version: Version, ) -> Result<(), Error> { // TODO add a way to open in priority order. @@ -343,7 +358,12 @@ impl Publisher { }; match chunk? { - Some(mut chunk) => stream.write_all(&mut chunk).await?, + Some(mut chunk) => { + if let Some(stats) = &stats { + stats.add_tx_bytes(chunk.len() as u64); + } + stream.write_all(&mut chunk).await? + } None => break, } } diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index 230b6925e..fb5c49aea 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -1,10 +1,8 @@ +use std::sync::Arc; + use tokio::sync::oneshot; -use crate::{ - Error, OriginConsumer, OriginProducer, - coding::Stream, - lite::{SessionInfo, Version}, -}; +use crate::{coding::Stream, lite::{SessionInfo, Version}, Error, OriginConsumer, OriginProducer, Stats}; use super::{Publisher, Subscriber}; @@ -16,11 +14,13 @@ pub(crate) async fn start( publish: Option, // We will consume any remote broadcasts, inserting them into this origin. subscribe: Option, + // Optional application-level stats sink. + stats: Option>, // The version of the protocol to use. version: Version, ) -> Result<(), Error> { - let publisher = Publisher::new(session.clone(), publish, version); - let subscriber = Subscriber::new(session.clone(), subscribe, version); + let publisher = Publisher::new(session.clone(), publish, stats.clone(), version); + let subscriber = Subscriber::new(session.clone(), subscribe, stats.clone(), version); let init = oneshot::channel(); diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9054ca9bd..9574aa18f 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -9,6 +9,7 @@ use crate::{ coding::{Reader, Stream}, lite::{self, Version}, model::BroadcastProducer, + Stats, }; use tokio::sync::oneshot; @@ -21,16 +22,18 @@ pub(super) struct Subscriber { origin: Option, subscribes: Lock>, next_id: Arc, + stats: Option>, version: Version, } impl Subscriber { - pub fn new(session: S, origin: Option, version: Version) -> Self { + pub fn new(session: S, origin: Option, stats: Option>, version: Version) -> Self { Self { session, origin, subscribes: Default::default(), next_id: Default::default(), + stats, version, } } @@ -308,6 +311,9 @@ impl Subscriber { .read(MAX_CHUNK.min(remain as usize)) .await? .ok_or(Error::WrongSize)?; + if let Some(stats) = &self.stats { + stats.add_rx_bytes(chunk.len() as u64); + } remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?; frame.write_chunk(chunk); } diff --git a/rs/moq-lite/src/session.rs b/rs/moq-lite/src/session.rs index 0f1f6347b..849026b17 100644 --- a/rs/moq-lite/src/session.rs +++ b/rs/moq-lite/src/session.rs @@ -34,14 +34,15 @@ impl Session { } } - /// Perform the MoQ handshake as a client, negotiating the version. + /// Perform the MoQ handshake as a client, negotiating the version, with optional stats hooks. /// - /// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer]. - /// The connection remains active until the session is closed. - pub async fn connect( + /// This is equivalent to [`Session::connect`] but allows providing a [`crate::Stats`] sink + /// for application-level byte accounting (ignores transport retransmissions). + pub async fn connect_with_stats( session: S, publish: impl Into>, subscribe: impl Into>, + stats: Option>, ) -> Result { let mut stream = Stream::open(&session, setup::ServerKind::Ietf14).await?; @@ -67,7 +68,7 @@ impl Session { if let Ok(version) = lite::Version::try_from(server.version) { let stream = stream.with_version(version); - lite::start(session.clone(), stream, publish.into(), subscribe.into(), version).await?; + lite::start(session.clone(), stream, publish.into(), subscribe.into(), stats, version).await?; } else if let Ok(version) = ietf::Version::try_from(server.version) { // Decode the parameters to get the initial request ID. let parameters = ietf::Parameters::decode(&mut server.parameters, version)?; @@ -82,6 +83,7 @@ impl Session { true, publish.into(), subscribe.into(), + stats, version, ) .await?; @@ -95,14 +97,24 @@ impl Session { Ok(Self::new(session)) } - /// Perform the MoQ handshake as a server. + /// Perform the MoQ handshake as a client, negotiating the version. /// /// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer]. /// The connection remains active until the session is closed. - pub async fn accept( + pub async fn connect( session: S, publish: impl Into>, subscribe: impl Into>, + ) -> Result { + Self::connect_with_stats(session, publish, subscribe, None).await + } + + /// Perform the MoQ handshake as a server with optional stats hooks. + pub async fn accept_with_stats( + session: S, + publish: impl Into>, + subscribe: impl Into>, + stats: Option>, ) -> Result { // Accept with an initial version; we'll switch to the negotiated version later let mut stream = Stream::accept(&session, ()).await?; @@ -135,7 +147,7 @@ impl Session { if let Ok(version) = lite::Version::try_from(version) { let stream = stream.with_version(version); - lite::start(session.clone(), stream, publish.into(), subscribe.into(), version).await?; + lite::start(session.clone(), stream, publish.into(), subscribe.into(), stats, version).await?; } else if let Ok(version) = ietf::Version::try_from(version) { // Decode the parameters to get the initial request ID. let parameters = ietf::Parameters::decode(&mut server.parameters, version)?; @@ -150,6 +162,7 @@ impl Session { false, publish.into(), subscribe.into(), + stats, version, ) .await?; @@ -163,6 +176,18 @@ impl Session { Ok(Self::new(session)) } + /// Perform the MoQ handshake as a server. + /// + /// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer]. + /// The connection remains active until the session is closed. + pub async fn accept( + session: S, + publish: impl Into>, + subscribe: impl Into>, + ) -> Result { + Self::accept_with_stats(session, publish, subscribe, None).await + } + /// Close the underlying transport session. pub fn close(self, err: Error) { self.session.close(err.to_code(), err.to_string().as_ref()); diff --git a/rs/moq-lite/src/stats.rs b/rs/moq-lite/src/stats.rs new file mode 100644 index 000000000..542c266d3 --- /dev/null +++ b/rs/moq-lite/src/stats.rs @@ -0,0 +1,30 @@ +//! Lightweight application-level statistics hooks. +//! +//! The primary motivation is to measure "usefulness" of a relay in a fanout topology +//! without relying on ambiguous "cache hit" semantics. +//! +//! In particular, callers may want to compute an application-level amplification ratio: +//! output_bitrate / input_bitrate +//! +//! This intentionally ignores transport-level effects such as retransmissions. + +/// A sink for application-level byte accounting. +/// +/// Implementations should be fast and non-blocking (e.g., atomics). +pub trait Stats: Send + Sync + 'static { + /// Record payload bytes received by the MoQ session (from the network). + fn add_rx_bytes(&self, bytes: u64); + + /// Record payload bytes sent by the MoQ session (to the network). + fn add_tx_bytes(&self, bytes: u64); +} + +/// Default stats sink that does nothing. +#[derive(Default)] +pub struct NoopStats; + +impl Stats for NoopStats { + fn add_rx_bytes(&self, _bytes: u64) {} + fn add_tx_bytes(&self, _bytes: u64) {} +} + diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 36e90f3c2..d1941852b 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -355,14 +355,38 @@ impl Request { publish: impl Into>, subscribe: impl Into>, ) -> anyhow::Result { + self.accept_with_stats(publish, subscribe, None).await + } + + /// Accept the session, performing rest of the MoQ handshake, with optional stats hooks. + /// + /// The stats sink is forwarded into `moq-lite` to account application-level payload bytes. + pub async fn accept_with_stats( + self, + publish: impl Into>, + subscribe: impl Into>, + stats: Option>, + ) -> anyhow::Result { + let publish = publish.into(); + let subscribe = subscribe.into(); + let session = match self { - Request::WebTransport(request) => Session::accept(request.ok().await?, publish, subscribe).await?, - Request::Quic(request) => Session::accept(request.ok(), publish, subscribe).await?, + Request::WebTransport(request) => { + moq_lite::Session::accept_with_stats(request.ok().await?, publish, subscribe, stats).await? + } + Request::Quic(request) => { + moq_lite::Session::accept_with_stats(request.ok(), publish, subscribe, stats).await? + } #[cfg(feature = "iroh")] - Request::IrohWebTransport(request) => Session::accept(request.ok().await?, publish, subscribe).await?, + Request::IrohWebTransport(request) => { + moq_lite::Session::accept_with_stats(request.ok().await?, publish, subscribe, stats).await? + } #[cfg(feature = "iroh")] - Request::IrohQuic(request) => Session::accept(request.ok(), publish, subscribe).await?, + Request::IrohQuic(request) => { + moq_lite::Session::accept_with_stats(request.ok(), publish, subscribe, stats).await? + } }; + Ok(session) } From facf7885b4f86f797a184412f7e6f6b46c2b40d6 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:04:13 -0300 Subject: [PATCH 02/12] relay: basic session + app byte counters exposed at /metrics --- rs/moq-relay/src/cluster.rs | 4 ++ rs/moq-relay/src/connection.rs | 25 ++++++- rs/moq-relay/src/main.rs | 2 + rs/moq-relay/src/metrics.rs | 115 +++++++++++++++++++++++++++++++++ rs/moq-relay/src/web.rs | 70 +++++++++++++++++++- 5 files changed, 213 insertions(+), 3 deletions(-) create mode 100644 rs/moq-relay/src/metrics.rs diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 4637a75c8..6b534cf9a 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -67,6 +67,9 @@ pub struct Cluster { // Broadcasts announced by local clients and remote servers. pub combined: Arc>, + + /// Minimal metrics for the relay. + pub metrics: crate::MetricsTracker, } impl Cluster { @@ -78,6 +81,7 @@ impl Cluster { primary: Arc::new(Origin::produce()), secondary: Arc::new(Origin::produce()), combined: Arc::new(Origin::produce()), + metrics: crate::MetricsTracker::new(), } } diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 12d1c4c0b..8bb94af6b 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -1,6 +1,7 @@ use crate::{Auth, Cluster}; use moq_native::Request; +use std::sync::Arc; pub struct Connection { pub id: u64, @@ -12,6 +13,23 @@ pub struct Connection { impl Connection { #[tracing::instrument("conn", skip_all, fields(id = self.id))] pub async fn run(self) -> anyhow::Result<()> { + // Track WebTransport sessions. + let metrics = self.cluster.metrics.clone(); + metrics.inc_active_sessions(crate::Transport::WebTransport); + struct SessionGuard { + metrics: crate::MetricsTracker, + transport: crate::Transport, + } + impl Drop for SessionGuard { + fn drop(&mut self) { + self.metrics.dec_active_sessions(self.transport); + } + } + let _guard = SessionGuard { + metrics: metrics.clone(), + transport: crate::Transport::WebTransport, + }; + let (path, token) = match self.request.url() { Some(url) => { // Extract the path and token from the URL. @@ -50,7 +68,12 @@ impl Connection { // NOTE: subscribe and publish seem backwards because of how relays work. // We publish the tracks the client is allowed to subscribe to. // We subscribe to the tracks the client is allowed to publish. - let session = self.request.accept(subscribe, publish).await?; + let stats: Arc = + Arc::new(crate::TransportStats::new(metrics, crate::Transport::WebTransport)); + let session = self + .request + .accept_with_stats(subscribe, publish, Some(stats)) + .await?; // Wait until the session is closed. session.closed().await.map_err(Into::into) diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index 0411ea82f..aa2283c5e 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -12,12 +12,14 @@ mod auth; mod cluster; mod config; mod connection; +mod metrics; mod web; pub use auth::*; pub use cluster::*; pub use config::*; pub use connection::*; +pub use metrics::*; pub use web::*; #[tokio::main] diff --git a/rs/moq-relay/src/metrics.rs b/rs/moq-relay/src/metrics.rs new file mode 100644 index 000000000..ce506c794 --- /dev/null +++ b/rs/moq-relay/src/metrics.rs @@ -0,0 +1,115 @@ +//! Minimal metrics for the relay demo. +//! +//! Intentionally tiny and low-cardinality: active sessions + application payload bytes. + +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +/// Transport used by a MoQ session. +#[derive(Clone, Copy, Debug)] +pub enum Transport { + WebTransport, + WebSocket, +} + +impl Transport { + pub fn as_str(&self) -> &'static str { + match self { + Self::WebTransport => "webtransport", + Self::WebSocket => "websocket", + } + } +} + +/// Thread-safe counters for basic relay metrics. +#[derive(Clone, Default)] +pub struct MetricsTracker { + active_sessions_webtransport: Arc, + active_sessions_websocket: Arc, + app_bytes_sent_webtransport: Arc, + app_bytes_sent_websocket: Arc, + app_bytes_received_webtransport: Arc, + app_bytes_received_websocket: Arc, +} + +impl MetricsTracker { + pub fn new() -> Self { + Self::default() + } + + pub fn inc_active_sessions(&self, transport: Transport) { + match transport { + Transport::WebTransport => self.active_sessions_webtransport.fetch_add(1, Ordering::Relaxed), + Transport::WebSocket => self.active_sessions_websocket.fetch_add(1, Ordering::Relaxed), + }; + } + + pub fn dec_active_sessions(&self, transport: Transport) { + match transport { + Transport::WebTransport => self.active_sessions_webtransport.fetch_sub(1, Ordering::Relaxed), + Transport::WebSocket => self.active_sessions_websocket.fetch_sub(1, Ordering::Relaxed), + }; + } + + pub fn record_app_bytes_sent(&self, transport: Transport, bytes: u64) { + match transport { + Transport::WebTransport => self.app_bytes_sent_webtransport.fetch_add(bytes, Ordering::Relaxed), + Transport::WebSocket => self.app_bytes_sent_websocket.fetch_add(bytes, Ordering::Relaxed), + }; + } + + pub fn record_app_bytes_received(&self, transport: Transport, bytes: u64) { + match transport { + Transport::WebTransport => self + .app_bytes_received_webtransport + .fetch_add(bytes, Ordering::Relaxed), + Transport::WebSocket => self.app_bytes_received_websocket.fetch_add(bytes, Ordering::Relaxed), + }; + } + + pub fn active_sessions(&self, transport: Transport) -> u64 { + match transport { + Transport::WebTransport => self.active_sessions_webtransport.load(Ordering::Relaxed), + Transport::WebSocket => self.active_sessions_websocket.load(Ordering::Relaxed), + } + } + + pub fn app_bytes_sent(&self, transport: Transport) -> u64 { + match transport { + Transport::WebTransport => self.app_bytes_sent_webtransport.load(Ordering::Relaxed), + Transport::WebSocket => self.app_bytes_sent_websocket.load(Ordering::Relaxed), + } + } + + pub fn app_bytes_received(&self, transport: Transport) -> u64 { + match transport { + Transport::WebTransport => self.app_bytes_received_webtransport.load(Ordering::Relaxed), + Transport::WebSocket => self.app_bytes_received_websocket.load(Ordering::Relaxed), + } + } +} + +/// `moq-lite` stats sink that attributes payload bytes by transport. +pub struct TransportStats { + metrics: MetricsTracker, + transport: Transport, +} + +impl TransportStats { + pub fn new(metrics: MetricsTracker, transport: Transport) -> Self { + Self { metrics, transport } + } +} + +impl moq_lite::Stats for TransportStats { + fn add_rx_bytes(&self, bytes: u64) { + self.metrics.record_app_bytes_received(self.transport, bytes); + } + + fn add_tx_bytes(&self, bytes: u64) { + self.metrics.record_app_bytes_sent(self.transport, bytes); + } +} + diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index 525a79baf..a5e2585a2 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -93,6 +93,7 @@ impl Web { pub async fn run(self) -> anyhow::Result<()> { let app = Router::new() .route("/certificate.sha256", get(serve_fingerprint)) + .route("/metrics", get(serve_metrics)) .route("/announced", get(serve_announced)) .route("/announced/{*prefix}", get(serve_announced)) .route("/fetch/{*path}", get(serve_fetch)); @@ -166,6 +167,54 @@ async fn serve_fingerprint(State(state): State>) -> String { .clone() } +async fn serve_metrics(State(state): State>) -> String { + // Minimal Prometheus text exposition. + let m = &state.cluster.metrics; + + let wt = crate::Transport::WebTransport; + let ws = crate::Transport::WebSocket; + + let mut out = String::new(); + + out.push_str("# TYPE moq_relay_active_sessions gauge\n"); + out.push_str(&format!( + "moq_relay_active_sessions{{transport=\"{}\"}} {}\n", + wt.as_str(), + m.active_sessions(wt) + )); + out.push_str(&format!( + "moq_relay_active_sessions{{transport=\"{}\"}} {}\n", + ws.as_str(), + m.active_sessions(ws) + )); + + out.push_str("# TYPE moq_relay_app_bytes_sent_total counter\n"); + out.push_str(&format!( + "moq_relay_app_bytes_sent_total{{transport=\"{}\"}} {}\n", + wt.as_str(), + m.app_bytes_sent(wt) + )); + out.push_str(&format!( + "moq_relay_app_bytes_sent_total{{transport=\"{}\"}} {}\n", + ws.as_str(), + m.app_bytes_sent(ws) + )); + + out.push_str("# TYPE moq_relay_app_bytes_received_total counter\n"); + out.push_str(&format!( + "moq_relay_app_bytes_received_total{{transport=\"{}\"}} {}\n", + wt.as_str(), + m.app_bytes_received(wt) + )); + out.push_str(&format!( + "moq_relay_app_bytes_received_total{{transport=\"{}\"}} {}\n", + ws.as_str(), + m.app_bytes_received(ws) + )); + + out +} + async fn serve_ws( ws: WebSocketUpgrade, Path(path): Path, @@ -177,6 +226,7 @@ async fn serve_ws( let token = state.auth.verify(&path, params.jwt.as_deref())?; let publish = state.cluster.publisher(&token); let subscribe = state.cluster.subscriber(&token); + let metrics = state.cluster.metrics.clone(); if publish.is_none() && subscribe.is_none() { // Bad token, we can't publish or subscribe. @@ -196,7 +246,7 @@ async fn serve_ws( tungstenite::Error::ConnectionClosed }) .with(tungstenite_to_axum); - let _ = handle_socket(id, socket, publish, subscribe).await; + let _ = handle_socket(id, socket, publish, subscribe, metrics).await; })) } @@ -206,6 +256,7 @@ async fn handle_socket( socket: T, publish: Option, subscribe: Option, + metrics: crate::MetricsTracker, ) -> anyhow::Result<()> where T: futures::Stream> @@ -214,9 +265,24 @@ where + Unpin + 'static, { + metrics.inc_active_sessions(crate::Transport::WebSocket); + struct SessionGuard { + metrics: crate::MetricsTracker, + } + impl Drop for SessionGuard { + fn drop(&mut self) { + self.metrics.dec_active_sessions(crate::Transport::WebSocket); + } + } + let _guard = SessionGuard { + metrics: metrics.clone(), + }; + // Wrap the WebSocket in a WebTransport compatibility layer. let ws = web_transport_ws::Session::new(socket, true); - let session = moq_lite::Session::accept(ws, subscribe, publish).await?; + let stats: std::sync::Arc = + std::sync::Arc::new(crate::TransportStats::new(metrics, crate::Transport::WebSocket)); + let session = moq_lite::Session::accept_with_stats(ws, subscribe, publish, Some(stats)).await?; session.closed().await.map_err(Into::into) } From 436263d32d04c132205840da9f9ad110d64a36e9 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:04:20 -0300 Subject: [PATCH 03/12] hang: metrics-only OTLP (connections + startup time) --- js/hang/package.json | 3 + js/hang/src/observability/index.ts | 131 +++++++++++++++++++++++++++++ js/hang/src/watch/audio/source.ts | 13 +++ js/hang/src/watch/broadcast.ts | 11 +++ js/hang/src/watch/element.ts | 24 ++++++ js/hang/src/watch/video/source.ts | 28 +++++- 6 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 js/hang/src/observability/index.ts diff --git a/js/hang/package.json b/js/hang/package.json index c1e162c1b..813d55ba8 100644 --- a/js/hang/package.json +++ b/js/hang/package.json @@ -34,6 +34,9 @@ "@moq/lite": "workspace:^", "@moq/signals": "workspace:^", "@libav.js/variant-opus-af": "^6.8.8", + "@opentelemetry/api": "^1.8.0", + "@opentelemetry/sdk-metrics": "^1.25.0", + "@opentelemetry/exporter-metrics-otlp-http": "^0.52.0", "async-mutex": "^0.5.0", "comlink": "^4.4.2", "zod": "^4.1.5" diff --git a/js/hang/src/observability/index.ts b/js/hang/src/observability/index.ts new file mode 100644 index 000000000..bf78f2bec --- /dev/null +++ b/js/hang/src/observability/index.ts @@ -0,0 +1,131 @@ +/** + * Minimal OpenTelemetry metrics for MoQ client (browser). + * + * Kept intentionally small for reviewability: + * - `moq_client_connections_total{transport=...}` + * - `moq_client_startup_time_seconds{track_type=...}` + */ + +import { metrics } from "@opentelemetry/api"; +import type { Counter, Histogram, Meter } from "@opentelemetry/api"; +import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; +import { MeterProvider, PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; +import { Resource } from "@opentelemetry/resources"; + +let initialized = false; +let sessionId: string | undefined; + +export interface ObservabilityConfig { + /** OTLP endpoint URL (default: http://localhost:4318) */ + otlpEndpoint?: string; + /** Service name (default: moq-client) */ + serviceName?: string; + /** Enable observability (default: true if endpoint provided) */ + enabled?: boolean; + /** Per-player session id (defaults to a random UUID) */ + sessionId?: string; +} + +function createSessionId(): string { + try { + return globalThis.crypto?.randomUUID?.() ?? `sid-${Math.random().toString(16).slice(2)}-${Date.now()}`; + } catch { + return `sid-${Math.random().toString(16).slice(2)}-${Date.now()}`; + } +} + +export function initObservability(config: ObservabilityConfig = {}): void { + if (initialized) return; + + const endpoint = config.otlpEndpoint || "http://localhost:4318"; + const serviceName = config.serviceName || "moq-client"; + const enabled = config.enabled ?? !!config.otlpEndpoint; + sessionId = config.sessionId || createSessionId(); + + if (!enabled) return; + + const resource = new Resource({ + "service.name": serviceName, + "service.instance.id": sessionId, + "moq.player.session_id": sessionId, + }); + + const exporterHeaders = { "Content-Type": "application/json" }; + const metricExporter = new OTLPMetricExporter({ + url: `${endpoint}/v1/metrics`, + headers: exporterHeaders, + }); + + const reader = new PeriodicExportingMetricReader({ + exporter: metricExporter, + exportIntervalMillis: 10000, + }); + + const meterProvider = new MeterProvider({ + resource, + readers: [reader], + }); + + metrics.setGlobalMeterProvider(meterProvider); + const meter = metrics.getMeter(serviceName); + + clientMetricsInstance = new ClientMetrics(meter); + setupConnectionTracking(); + initialized = true; +} + +function setupConnectionTracking() { + // Dynamically import to avoid circular deps. + import("@moq/lite") + .then((Moq) => { + if (Moq.Connection?.onConnectionType) { + Moq.Connection.onConnectionType((type: "webtransport" | "websocket") => { + getClientMetrics()?.recordConnection(type); + }); + } + }) + .catch(() => {}); +} + +export class ClientMetrics { + private connectionCounter?: Counter; + private startupTimeHistogram?: Histogram; + + constructor(meter?: Meter) { + if (meter) { + this.connectionCounter = meter.createCounter("moq_client_connections_total", { + description: "Total client connections by transport type", + }); + + this.startupTimeHistogram = meter.createHistogram("moq_client_startup_time_seconds", { + description: "Time to first audio/video frame in seconds", + unit: "s", + }); + } + } + + recordConnection(transportType: "webtransport" | "websocket"): void { + const attrs: Record = { transport: transportType }; + if (sessionId) attrs["moq.player.session_id"] = sessionId; + this.connectionCounter?.add(1, attrs); + } + + recordStartupTime(seconds: number, attributes?: Record): void { + const attrs: Record = { ...(attributes ?? {}) }; + if (sessionId) attrs["moq.player.session_id"] = sessionId; + this.startupTimeHistogram?.record(seconds, attrs); + } +} + +let clientMetricsInstance: ClientMetrics | undefined; + +export function getClientMetrics(): ClientMetrics | undefined { + if (!clientMetricsInstance) clientMetricsInstance = new ClientMetrics(); + return clientMetricsInstance; +} + +export function recordMetric(fn: (metrics: ClientMetrics) => void): void { + const m = getClientMetrics(); + if (m) fn(m); +} + diff --git a/js/hang/src/watch/audio/source.ts b/js/hang/src/watch/audio/source.ts index fad996840..b5d69602f 100644 --- a/js/hang/src/watch/audio/source.ts +++ b/js/hang/src/watch/audio/source.ts @@ -3,6 +3,7 @@ import type { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; +import { recordMetric } from "../../observability"; import * as Hex from "../../util/hex"; import * as libav from "../../util/libav"; import type * as Render from "./render"; @@ -163,6 +164,10 @@ export class Source { const active = effect.get(this.active); if (!active) return; + // Track time-to-first-audio + const trackStartTime = performance.now(); + let firstFrameDecoded = false; + const sub = broadcast.subscribe(active, catalog.priority); effect.cleanup(() => sub.close()); @@ -195,6 +200,14 @@ export class Source { const frame = await consumer.decode(); if (!frame) break; + // Record time-to-first-audio + if (!firstFrameDecoded) { + firstFrameDecoded = true; + const ttfaSeconds = (performance.now() - trackStartTime) / 1000; + recordMetric((m) => m.recordStartupTime(ttfaSeconds, { codec: config.codec, track_type: "audio" })); + console.log(`[Audio] Time-to-first-audio: ${(ttfaSeconds * 1000).toFixed(0)}ms`); + } + this.#stats.update((stats) => ({ bytesReceived: (stats?.bytesReceived ?? 0) + frame.data.byteLength, })); diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index c8fe30a1a..62d10bf11 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -1,6 +1,7 @@ import type * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../catalog"; +import { recordMetric } from "../observability"; import { PRIORITY } from "../publish/priority"; import * as Audio from "./audio"; import { Chat, type ChatProps } from "./chat"; @@ -95,6 +96,9 @@ export class Broadcast { effect.cleanup(() => announced.close()); effect.spawn(async () => { + const startTime = performance.now(); + let firstFrame = true; + for (;;) { const update = await announced.next(); if (!update) break; @@ -105,6 +109,13 @@ export class Broadcast { continue; } + // Record startup time on first active broadcast (CMCD su) + if (firstFrame && update.active) { + const startupTime = (performance.now() - startTime) / 1000; // Convert to seconds + recordMetric((m) => m.recordStartupTime(startupTime, { track_type: "video" })); + firstFrame = false; + } + effect.set(this.#active, update.active, false); } }); diff --git a/js/hang/src/watch/element.ts b/js/hang/src/watch/element.ts index 47e45d831..66be06c98 100644 --- a/js/hang/src/watch/element.ts +++ b/js/hang/src/watch/element.ts @@ -4,6 +4,7 @@ import { Effect, Signal } from "@moq/signals"; import * as Audio from "./audio"; import { Broadcast } from "./broadcast"; import * as Video from "./video"; +import * as Observability from "../observability"; // TODO remove name; replaced with path const OBSERVED = ["url", "name", "path", "paused", "volume", "muted", "reload", "latency"] as const; @@ -18,6 +19,11 @@ const cleanup = new FinalizationRegistry((signals) => signals.close()); export default class HangWatch extends HTMLElement { static observedAttributes = OBSERVED; + // Per-player session identifier used for telemetry correlation. + // In practice this is per page/tab unless multiple players exist in one page. + private readonly sessionId = + globalThis.crypto?.randomUUID?.() ?? `sid-${Math.random().toString(16).slice(2)}-${Date.now()}`; + // The connection to the moq-relay server. connection: Moq.Connection.Reload; @@ -69,12 +75,30 @@ export default class HangWatch extends HTMLElement { cleanup.register(this, this.signals); + // Auto-initialize observability when URL is set + // Derives OTel endpoint from relay URL (same host, port 4318) + this.signals.effect((effect) => { + const url = effect.get(this.url); + if (url) { + const otelEndpoint = `http://${url.hostname}:4318`; + Observability.initObservability({ otlpEndpoint: otelEndpoint, sessionId: this.sessionId }); + } + }); + this.connection = new Moq.Connection.Reload({ url: this.url, enabled: this.#enabled, }); this.signals.cleanup(() => this.connection.close()); + // Create trace span for playback session after observability is initialized + this.signals.effect((effect) => { + const url = effect.get(this.url); + if (url) { + // Metrics-only observability; no tracing spans. + } + }); + this.broadcast = new Broadcast({ connection: this.connection.established, path: this.path, diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index 406a600bb..50c8baf34 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -3,6 +3,7 @@ import type { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; +import { recordMetric } from "../../observability"; import { PRIORITY } from "../../publish/priority"; import * as Hex from "../../util/hex"; @@ -40,6 +41,8 @@ export interface VideoStats { frameCount: number; timestamp: number; bytesReceived: number; + framesDecoded: number; + framesDropped: number; } // Only count it as buffering if we had to sleep for 200ms or more before rendering the next frame. @@ -47,6 +50,9 @@ export interface VideoStats { // TODO Maybe we need to detect b-frames and make this dynamic? const MIN_SYNC_WAIT_MS = 200 as Time.Milli; +// Minimum time to wait for a frame before counting it as a rebuffer event (ms) +const REBUFFER_THRESHOLD_MS = 100; + // The maximum number of concurrent b-frames that we support. const MAX_BFRAMES = 10; @@ -192,6 +198,10 @@ export class Source { } #runTrack(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + // Track time-to-first-frame from subscription start + const trackStartTime = performance.now(); + let firstFrameRendered = false; + const sub = broadcast.subscribe(name, PRIORITY.video); // TODO use priority from catalog effect.cleanup(() => sub.close()); @@ -265,6 +275,10 @@ export class Source { sleep = this.#reference - ref + this.latency.peek(); } + // Record buffer length: sleep > 0 means we have data buffered ahead + // The sleep time represents how far ahead of playback position we are + // Note: Large sleep means we're AHEAD (have buffer), not rebuffering + // Rebuffering is detected in the consumer.decode() wait below if (sleep > MIN_SYNC_WAIT_MS) { this.syncStatus.set({ state: "wait", bufferDuration: sleep }); } @@ -290,6 +304,14 @@ export class Source { } } + // Record time-to-first-frame on the first rendered frame + if (!firstFrameRendered) { + firstFrameRendered = true; + const ttffSeconds = (performance.now() - trackStartTime) / 1000; + recordMetric((m) => m.recordStartupTime(ttffSeconds, { codec: config.codec, track_type: "video" })); + console.log(`[Video] Time-to-first-frame: ${(ttffSeconds * 1000).toFixed(0)}ms`); + } + this.frame.update((prev) => { prev?.close(); return frame; @@ -316,14 +338,16 @@ export class Source { timestamp: next.timestamp, }); + decoder.decode(chunk); + // Track both frame count and bytes received for stats in the UI this.#stats.update((current) => ({ frameCount: (current?.frameCount ?? 0) + 1, timestamp: next.timestamp, bytesReceived: (current?.bytesReceived ?? 0) + next.data.byteLength, + framesDecoded: (current?.framesDecoded ?? 0) + 1, + framesDropped: current?.framesDropped ?? 0, })); - - decoder.decode(chunk); } }); } From 76302df56ffa7d9010330f221cedff94b0a00f7c Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:27:09 -0300 Subject: [PATCH 04/12] hang: drop @opentelemetry/resources dependency --- js/hang/src/observability/index.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/js/hang/src/observability/index.ts b/js/hang/src/observability/index.ts index bf78f2bec..801ab5b54 100644 --- a/js/hang/src/observability/index.ts +++ b/js/hang/src/observability/index.ts @@ -10,7 +10,6 @@ import { metrics } from "@opentelemetry/api"; import type { Counter, Histogram, Meter } from "@opentelemetry/api"; import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; import { MeterProvider, PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; -import { Resource } from "@opentelemetry/resources"; let initialized = false; let sessionId: string | undefined; @@ -44,12 +43,6 @@ export function initObservability(config: ObservabilityConfig = {}): void { if (!enabled) return; - const resource = new Resource({ - "service.name": serviceName, - "service.instance.id": sessionId, - "moq.player.session_id": sessionId, - }); - const exporterHeaders = { "Content-Type": "application/json" }; const metricExporter = new OTLPMetricExporter({ url: `${endpoint}/v1/metrics`, @@ -62,7 +55,6 @@ export function initObservability(config: ObservabilityConfig = {}): void { }); const meterProvider = new MeterProvider({ - resource, readers: [reader], }); From 0082adeae3091917f2aadd26a0acad88f303b192 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:45:32 -0300 Subject: [PATCH 05/12] chore: update bun.lock with new OpenTelemetry dependencies and protobufjs packages --- bun.lock | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/bun.lock b/bun.lock index b59c1a2fe..729caae4a 100644 --- a/bun.lock +++ b/bun.lock @@ -39,6 +39,9 @@ "@libav.js/variant-opus-af": "^6.8.8", "@moq/lite": "workspace:^", "@moq/signals": "workspace:^", + "@opentelemetry/api": "^1.8.0", + "@opentelemetry/exporter-metrics-otlp-http": "^0.52.0", + "@opentelemetry/sdk-metrics": "^1.25.0", "async-mutex": "^0.5.0", "comlink": "^4.4.2", "zod": "^4.1.5", @@ -409,12 +412,54 @@ "@nodelib/fs.walk": ["@nodelib/fs.walk@1.2.8", "", { "dependencies": { "@nodelib/fs.scandir": "2.1.5", "fastq": "^1.6.0" } }, "sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.0", "", {}, "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg=="], + + "@opentelemetry/api-logs": ["@opentelemetry/api-logs@0.52.1", "", { "dependencies": { "@opentelemetry/api": "^1.0.0" } }, "sha512-qnSqB2DQ9TPP96dl8cDubDvrUyWc0/sK81xHTK8eSUspzDM3bsewX903qclQFvVhgStjRWdC5bLb3kQqMkfV5A=="], + + "@opentelemetry/core": ["@opentelemetry/core@1.25.1", "", { "dependencies": { "@opentelemetry/semantic-conventions": "1.25.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-GeT/l6rBYWVQ4XArluLVB6WWQ8flHbdb6r2FCHC3smtdOAbrJBIv35tpV/yp9bmYUJf+xmZpu9DRTIeJVhFbEQ=="], + + "@opentelemetry/exporter-metrics-otlp-http": ["@opentelemetry/exporter-metrics-otlp-http@0.52.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/otlp-exporter-base": "0.52.1", "@opentelemetry/otlp-transformer": "0.52.1", "@opentelemetry/resources": "1.25.1", "@opentelemetry/sdk-metrics": "1.25.1" }, "peerDependencies": { "@opentelemetry/api": "^1.3.0" } }, "sha512-oAHPOy1sZi58bwqXaucd19F/v7+qE2EuVslQOEeLQT94CDuZJJ4tbWzx8DpYBTrOSzKqqrMtx9+PMxkrcbxOyQ=="], + + "@opentelemetry/otlp-exporter-base": ["@opentelemetry/otlp-exporter-base@0.52.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/otlp-transformer": "0.52.1" }, "peerDependencies": { "@opentelemetry/api": "^1.0.0" } }, "sha512-z175NXOtX5ihdlshtYBe5RpGeBoTXVCKPPLiQlD6FHvpM4Ch+p2B0yWKYSrBfLH24H9zjJiBdTrtD+hLlfnXEQ=="], + + "@opentelemetry/otlp-transformer": ["@opentelemetry/otlp-transformer@0.52.1", "", { "dependencies": { "@opentelemetry/api-logs": "0.52.1", "@opentelemetry/core": "1.25.1", "@opentelemetry/resources": "1.25.1", "@opentelemetry/sdk-logs": "0.52.1", "@opentelemetry/sdk-metrics": "1.25.1", "@opentelemetry/sdk-trace-base": "1.25.1", "protobufjs": "^7.3.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.3.0 <1.10.0" } }, "sha512-I88uCZSZZtVa0XniRqQWKbjAUm73I8tpEy/uJYPPYw5d7BRdVk0RfTBQw8kSUl01oVWEuqxLDa802222MYyWHg=="], + + "@opentelemetry/resources": ["@opentelemetry/resources@1.25.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/semantic-conventions": "1.25.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-pkZT+iFYIZsVn6+GzM0kSX+u3MSLCY9md+lIJOoKl/P+gJFfxJte/60Usdp8Ce4rOs8GduUpSPNe1ddGyDT1sQ=="], + + "@opentelemetry/sdk-logs": ["@opentelemetry/sdk-logs@0.52.1", "", { "dependencies": { "@opentelemetry/api-logs": "0.52.1", "@opentelemetry/core": "1.25.1", "@opentelemetry/resources": "1.25.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.4.0 <1.10.0" } }, "sha512-MBYh+WcPPsN8YpRHRmK1Hsca9pVlyyKd4BxOC4SsgHACnl/bPp4Cri9hWhVm5+2tiQ9Zf4qSc1Jshw9tOLGWQA=="], + + "@opentelemetry/sdk-metrics": ["@opentelemetry/sdk-metrics@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/resources": "1.30.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.3.0 <1.10.0" } }, "sha512-q9zcZ0Okl8jRgmy7eNW3Ku1XSgg3sDLa5evHZpCwjspw7E8Is4K/haRPDJrBcX3YSn/Y7gUvFnByNYEKQNbNog=="], + + "@opentelemetry/sdk-trace-base": ["@opentelemetry/sdk-trace-base@1.25.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/resources": "1.25.1", "@opentelemetry/semantic-conventions": "1.25.1" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-C8k4hnEbc5FamuZQ92nTOp8X/diCY56XUTnMiv9UTuJitCzaNNHAVsdm5+HLCdI8SLQsLWIrG38tddMxLVoftw=="], + + "@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.25.1", "", {}, "sha512-ZDjMJJQRlyk8A1KZFCc+bCbsyrn1wTwdNt56F7twdfUfnHUZUq77/WfONCj8p72NZOyP7pNTdUWSTYC3GTbuuQ=="], + "@poppinss/colors": ["@poppinss/colors@4.1.6", "", { "dependencies": { "kleur": "^4.1.5" } }, "sha512-H9xkIdFswbS8n1d6vmRd8+c10t2Qe+rZITbbDHHkQixH5+2x1FDGmi/0K+WgWiqQFKPSlIYB7jlH6Kpfn6Fleg=="], "@poppinss/dumper": ["@poppinss/dumper@0.6.5", "", { "dependencies": { "@poppinss/colors": "^4.1.5", "@sindresorhus/is": "^7.0.2", "supports-color": "^10.0.0" } }, "sha512-NBdYIb90J7LfOI32dOewKI1r7wnkiH6m920puQ3qHUeZkxNkQiFnXVWoE6YtFSv6QOiPPf7ys6i+HWWecDz7sw=="], "@poppinss/exception": ["@poppinss/exception@1.2.3", "", {}, "sha512-dCED+QRChTVatE9ibtoaxc+WkdzOSjYTKi/+uacHWIsfodVfpsueo3+DKpgU5Px8qXjgmXkSvhXvSCz3fnP9lw=="], + "@protobufjs/aspromise": ["@protobufjs/aspromise@1.1.2", "", {}, "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ=="], + + "@protobufjs/base64": ["@protobufjs/base64@1.1.2", "", {}, "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg=="], + + "@protobufjs/codegen": ["@protobufjs/codegen@2.0.4", "", {}, "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg=="], + + "@protobufjs/eventemitter": ["@protobufjs/eventemitter@1.1.0", "", {}, "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q=="], + + "@protobufjs/fetch": ["@protobufjs/fetch@1.1.0", "", { "dependencies": { "@protobufjs/aspromise": "^1.1.1", "@protobufjs/inquire": "^1.1.0" } }, "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ=="], + + "@protobufjs/float": ["@protobufjs/float@1.0.2", "", {}, "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ=="], + + "@protobufjs/inquire": ["@protobufjs/inquire@1.1.0", "", {}, "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q=="], + + "@protobufjs/path": ["@protobufjs/path@1.1.2", "", {}, "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA=="], + + "@protobufjs/pool": ["@protobufjs/pool@1.1.0", "", {}, "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw=="], + + "@protobufjs/utf8": ["@protobufjs/utf8@1.1.0", "", {}, "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw=="], + "@rollup/pluginutils": ["@rollup/pluginutils@5.3.0", "", { "dependencies": { "@types/estree": "^1.0.0", "estree-walker": "^2.0.2", "picomatch": "^4.0.2" }, "peerDependencies": { "rollup": "^1.20.0||^2.0.0||^3.0.0||^4.0.0" }, "optionalPeers": ["rollup"] }, "sha512-5EdhGZtnu3V88ces7s53hhfK5KSASnJZv8Lulpc04cWO3REESroJXg73DFsOmgbU2BhwV0E20bu2IDZb3VKW4Q=="], "@rollup/rollup-android-arm-eabi": ["@rollup/rollup-android-arm-eabi@4.55.1", "", { "os": "android", "cpu": "arm" }, "sha512-9R0DM/ykwfGIlNu6+2U09ga0WXeZ9MRC2Ter8jnz8415VbuIykVuc6bhdrbORFZANDmTDvq26mJrEVTl8TdnDg=="], @@ -987,8 +1032,12 @@ "lightningcss-win32-x64-msvc": ["lightningcss-win32-x64-msvc@1.30.2", "", { "os": "win32", "cpu": "x64" }, "sha512-5g1yc73p+iAkid5phb4oVFMB45417DkRevRbt/El/gKXJk4jid+vPFF/AXbxn05Aky8PapwzZrdJShv5C0avjw=="], + "lodash.merge": ["lodash.merge@4.6.2", "", {}, "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ=="], + "log-symbols": ["log-symbols@4.1.0", "", { "dependencies": { "chalk": "^4.1.0", "is-unicode-supported": "^0.1.0" } }, "sha512-8XPvpAA8uyhfteu8pIvQxpJZ7SYYdpUivZpGy6sFsBuKRY/7rQGavedeB8aK+Zkyq6upMFVL/9AW6vOYzfRyLg=="], + "long": ["long@5.3.2", "", {}, "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA=="], + "loupe": ["loupe@3.2.1", "", {}, "sha512-CdzqowRJCeLU72bHvWqwRBBlLcMEtIvGrlvef74kMnV2AolS9Y8xUv1I0U/MNAWMhBlKIoyuEgoJ0t/bbwHbLQ=="], "lower-case": ["lower-case@2.0.2", "", { "dependencies": { "tslib": "^2.0.3" } }, "sha512-7fm3l3NAF9WfN6W3JOmf5drwpVqX78JtoGJ3A6W0a6ZnldM41w2fV5D490psKFTpMds8TJse/eHLFFsNHHjHgg=="], @@ -1113,6 +1162,8 @@ "property-information": ["property-information@7.1.0", "", {}, "sha512-TwEZ+X+yCJmYfL7TPUOcvBZ4QfoT5YenQiJuX//0th53DE6w0xxLEtfK3iyryQFddXuvkIk51EEgrJQ0WJkOmQ=="], + "protobufjs": ["protobufjs@7.5.4", "", { "dependencies": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", "@protobufjs/codegen": "^2.0.4", "@protobufjs/eventemitter": "^1.1.0", "@protobufjs/fetch": "^1.1.0", "@protobufjs/float": "^1.0.2", "@protobufjs/inquire": "^1.1.0", "@protobufjs/path": "^1.1.2", "@protobufjs/pool": "^1.1.0", "@protobufjs/utf8": "^1.1.0", "@types/node": ">=13.7.0", "long": "^5.0.0" } }, "sha512-CvexbZtbov6jW2eXAvLukXjXUW1TzFaivC46BpWc/3BpcCysb5Vffu+B3XHMm8lVEuy2Mm4XGex8hBSg1yapPg=="], + "queue-microtask": ["queue-microtask@1.2.3", "", {}, "sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A=="], "quote-unquote": ["quote-unquote@1.0.0", "", {}, "sha512-twwRO/ilhlG/FIgYeKGFqyHhoEhqgnKVkcmqMKi2r524gz3ZbDTcyFt38E9xjJI2vT+KbRNHVbnJ/e0I25Azwg=="], @@ -1357,6 +1408,14 @@ "@moq/hang-ui/vite": ["vite@7.3.1", "", { "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", "picomatch": "^4.0.3", "postcss": "^8.5.6", "rollup": "^4.43.0", "tinyglobby": "^0.2.15" }, "optionalDependencies": { "fsevents": "~2.3.3" }, "peerDependencies": { "@types/node": "^20.19.0 || >=22.12.0", "jiti": ">=1.21.0", "less": "^4.0.0", "lightningcss": "^1.21.0", "sass": "^1.70.0", "sass-embedded": "^1.70.0", "stylus": ">=0.54.8", "sugarss": "^5.0.0", "terser": "^5.16.0", "tsx": "^4.8.1", "yaml": "^2.4.2" }, "optionalPeers": ["@types/node", "jiti", "less", "lightningcss", "sass", "sass-embedded", "stylus", "sugarss", "terser", "tsx", "yaml"], "bin": { "vite": "bin/vite.js" } }, "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA=="], + "@opentelemetry/exporter-metrics-otlp-http/@opentelemetry/sdk-metrics": ["@opentelemetry/sdk-metrics@1.25.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/resources": "1.25.1", "lodash.merge": "^4.6.2" }, "peerDependencies": { "@opentelemetry/api": ">=1.3.0 <1.10.0" } }, "sha512-9Mb7q5ioFL4E4dDrc4wC/A3NTHDat44v4I3p2pLPSxRvqUbDIQyMVr9uK+EU69+HWhlET1VaSrRzwdckWqY15Q=="], + + "@opentelemetry/otlp-transformer/@opentelemetry/sdk-metrics": ["@opentelemetry/sdk-metrics@1.25.1", "", { "dependencies": { "@opentelemetry/core": "1.25.1", "@opentelemetry/resources": "1.25.1", "lodash.merge": "^4.6.2" }, "peerDependencies": { "@opentelemetry/api": ">=1.3.0 <1.10.0" } }, "sha512-9Mb7q5ioFL4E4dDrc4wC/A3NTHDat44v4I3p2pLPSxRvqUbDIQyMVr9uK+EU69+HWhlET1VaSrRzwdckWqY15Q=="], + + "@opentelemetry/sdk-metrics/@opentelemetry/core": ["@opentelemetry/core@1.30.1", "", { "dependencies": { "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-OOCM2C/QIURhJMuKaekP3TRBxBKxG/TWWA0TL2J6nXUtDnuCtccy49LUJF8xPFXMX+0LMcxFpCo8M9cGY1W6rQ=="], + + "@opentelemetry/sdk-metrics/@opentelemetry/resources": ["@opentelemetry/resources@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-5UxZqiAgLYGFjS4s9qm5mBVo433u+dSPUFWVWXmLAD4wB65oMCoXaJP1KJa9DIYYMeHu3z4BZcStG3LC593cWA=="], + "@poppinss/dumper/supports-color": ["supports-color@10.2.2", "", {}, "sha512-SS+jx45GF1QjgEXQx4NJZV9ImqmO2NPz5FNsIHrsDjh2YsHnawpan7SNQ1o8NuhrbHZy9AZhIoCUiCeaW/C80g=="], "@rollup/pluginutils/estree-walker": ["estree-walker@2.0.2", "", {}, "sha512-Rfkk/Mp/DL7JVje3u18FxFujQlTNR2q6QfMSMB7AvCBx91NGj/ba3kCfza0f6dVDbw7YlRf/nDrn7pQrCCyQ/w=="], @@ -1447,6 +1506,10 @@ "@moq/hang-ui/vite/esbuild": ["esbuild@0.27.2", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.27.2", "@esbuild/android-arm": "0.27.2", "@esbuild/android-arm64": "0.27.2", "@esbuild/android-x64": "0.27.2", "@esbuild/darwin-arm64": "0.27.2", "@esbuild/darwin-x64": "0.27.2", "@esbuild/freebsd-arm64": "0.27.2", "@esbuild/freebsd-x64": "0.27.2", "@esbuild/linux-arm": "0.27.2", "@esbuild/linux-arm64": "0.27.2", "@esbuild/linux-ia32": "0.27.2", "@esbuild/linux-loong64": "0.27.2", "@esbuild/linux-mips64el": "0.27.2", "@esbuild/linux-ppc64": "0.27.2", "@esbuild/linux-riscv64": "0.27.2", "@esbuild/linux-s390x": "0.27.2", "@esbuild/linux-x64": "0.27.2", "@esbuild/netbsd-arm64": "0.27.2", "@esbuild/netbsd-x64": "0.27.2", "@esbuild/openbsd-arm64": "0.27.2", "@esbuild/openbsd-x64": "0.27.2", "@esbuild/openharmony-arm64": "0.27.2", "@esbuild/sunos-x64": "0.27.2", "@esbuild/win32-arm64": "0.27.2", "@esbuild/win32-ia32": "0.27.2", "@esbuild/win32-x64": "0.27.2" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-HyNQImnsOC7X9PMNaCIeAm4ISCQXs5a5YasTXVliKv4uuBo1dKrG0A+uQS8M5eXjVMnLg3WgXaKvprHlFJQffw=="], + "@opentelemetry/sdk-metrics/@opentelemetry/core/@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + + "@opentelemetry/sdk-metrics/@opentelemetry/resources/@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + "happy-dom/@types/node/undici-types": ["undici-types@6.21.0", "", {}, "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ=="], "module-lookup-amd/glob/minimatch": ["minimatch@3.1.2", "", { "dependencies": { "brace-expansion": "^1.1.7" } }, "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw=="], From 3ec2d84ea090818e88222b523dbb9fc7495cf281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabi=C3=A1n=20Cancela?= <62720589+fcancela@users.noreply.github.com> Date: Fri, 16 Jan 2026 16:54:55 -0300 Subject: [PATCH 06/12] Update js/hang/src/observability/index.ts Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- js/hang/src/observability/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/hang/src/observability/index.ts b/js/hang/src/observability/index.ts index 801ab5b54..d9508ec16 100644 --- a/js/hang/src/observability/index.ts +++ b/js/hang/src/observability/index.ts @@ -76,7 +76,7 @@ function setupConnectionTracking() { }); } }) - .catch(() => {}); + .catch((error) => console.warn("Failed to set up connection tracking for observability:", error)); } export class ClientMetrics { From 0f617b0512f972cf383fdc61de81e0a19732a519 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:57:57 -0300 Subject: [PATCH 07/12] observability: optional prom+grafana+otel compose for demo metrics --- observability/README.md | 468 ++++++++++++++++++ observability/docker-compose.yml | 56 +++ .../provisioning/datasources/datasources.yml | 11 + observability/otel-collector-config.yaml | 31 ++ observability/prometheus.yml | 17 + 5 files changed, 583 insertions(+) create mode 100644 observability/README.md create mode 100644 observability/docker-compose.yml create mode 100644 observability/grafana/provisioning/datasources/datasources.yml create mode 100644 observability/otel-collector-config.yaml create mode 100644 observability/prometheus.yml diff --git a/observability/README.md b/observability/README.md new file mode 100644 index 000000000..518f22be2 --- /dev/null +++ b/observability/README.md @@ -0,0 +1,468 @@ +# Optional observability stack (Grafana + Prometheus + OTel Collector) + +This folder is **optional tooling** to help reviewers/operators quickly spin up the minimum infra to view metrics emitted by this PR: + +- Relay metrics: served at `http://localhost:4443/metrics` +- Browser metrics: exported via OTLP/HTTP to `http://localhost:4318/v1/metrics` + +## Start + +```bash +cd observability +docker compose up -d +``` + +- **Grafana**: `http://localhost:3050` (anonymous admin enabled) +- **Prometheus**: `http://localhost:9090` + +## Verify data is flowing + +### Prometheus targets +Open Prometheus targets page and confirm both are **UP**: + +- `otel-collector` (scrapes `otel-collector:8889`) +- `moq-relay` (scrapes `host.docker.internal:4443/metrics`) + +### Example queries +In Prometheus or Grafana Explore: + +- `moq_relay_active_sessions` +- `moq_relay_app_bytes_sent_total` +- `moq_relay_app_bytes_received_total` +- `moq_client_connections_total` +- `moq_client_startup_time_seconds_count` + +## Linux / WSL2 notes + +The Prometheus config scrapes the relay using `host.docker.internal:4443`. The compose file includes: + +- `extra_hosts: ["host.docker.internal:host-gateway"]` + +If your Docker setup doesn’t support `host-gateway`, edit `observability/prometheus.yml` and replace the target with your host IP. + +# MoQ Observability Stack + +Real-time monitoring and debugging for Media over QUIC (MoQ) streaming infrastructure. + +## Overview + +This observability stack provides end-to-end visibility into the MoQ streaming pipeline, from client players to relay servers. It collects metrics, traces, and logs to help you: + +- **Monitor** active viewers, streams, and connections in real-time +- **Debug** playback issues with per-session traces +- **Analyze** client experience (buffer health, startup time, quality switches) +- **Alert** on performance degradation or failures + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ DATA SOURCES │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ Browser │ │ MoQ Relay │ │ +│ │ Player │ │ (Rust) │ │ +│ │ │ │ │ │ +│ │ ┌──────────┐ │ │ ┌──────────┐ │ │ +│ │ │ OTel JS │ │ │ │ OTel SDK │ │ │ +│ │ │ SDK │ │ │ │ (Rust) │ │ │ +│ │ └────┬─────┘ │ │ └────┬─────┘ │ │ +│ └──────┼───────┘ └──────┼───────┘ │ +│ │ │ │ +│ │ OTLP/HTTP │ OTLP/gRPC │ +│ │ (metrics, traces) │ (metrics, traces) │ +│ │ │ │ +└──────────┼─────────────────────────────────────────────┼────────────────────────┘ + │ │ + ▼ ▼ +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ COLLECTION LAYER │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌────────────────────────┐ │ +│ │ OpenTelemetry │ │ +│ │ Collector │ │ +│ │ │ │ +│ │ ┌─────────────────┐ │ │ +│ │ │ OTLP Receivers │ │ ← Receives all telemetry │ +│ │ │ (gRPC + HTTP) │ │ │ +│ │ └────────┬────────┘ │ │ +│ │ │ │ │ +│ │ ┌────────▼────────┐ │ │ +│ │ │ Batch Processor │ │ ← Batches for efficiency │ +│ │ └────────┬────────┘ │ │ +│ │ │ │ │ +│ │ ┌────────▼────────┐ │ │ +│ │ │ Exporters │ │ ← Routes to backends │ +│ │ └─────────────────┘ │ │ +│ └───────────┬────────────┘ │ +│ │ │ +│ ┌─────────────────────┼─────────────────────┐ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ +│ │ Prometheus │ │ Tempo │ │ Loki │ │ +│ │ (Metrics) │ │ (Traces) │ │ (Logs) │ │ +│ └───────────────┘ └───────────────┘ └───────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ VISUALIZATION LAYER │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────┐ │ +│ │ Grafana │ │ +│ │ │ │ +│ │ ┌───────────────┐ │ │ +│ │ │ Dashboards │ │ │ +│ │ │ - MoQ Overview│ │ │ +│ │ | - MoQ Pipeline│ │ │ +│ │ │ - Node Stats │ │ │ +│ │ └───────────────┘ │ │ +│ │ │ │ +│ │ http://localhost:3050 │ +│ └─────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +## Data Flow + +### 1. Client (Browser) → OTel Collector → Prometheus/Tempo + +``` +Browser Player + │ + │ Records metrics: + │ - Buffer length (how much video is buffered) + │ - Startup time (time to first frame) + │ - Quality switches (adaptive bitrate changes) + │ - Connection type (WebTransport vs WebSocket) + │ + ▼ +OpenTelemetry JS SDK (in browser) + │ + │ Exports via OTLP/HTTP every 10 seconds + │ POST http://localhost:4318/v1/metrics + │ POST http://localhost:4318/v1/traces + │ + ▼ +OTel Collector (port 4318) + │ + ├──► Prometheus (port 9090) ──► Grafana dashboards + │ Metrics like: + │ moq_client_buffer_length_seconds + │ moq_client_startup_time_seconds + │ moq_client_connections_total{transport="websocket"} + │ + └──► Tempo (port 3200) ──► Grafana trace explorer + Traces for debugging individual sessions +``` + +### 2. Relay (Rust) → OTel Collector → Prometheus/Tempo + +``` +MoQ Relay Server + │ + │ Records metrics: + │ - Active streams/subscribers/connections + │ - Bytes sent/received (bandwidth) + │ - Connection lifecycle events + │ - QUIC stats (RTT, packet loss) - when using WebTransport + │ + ▼ +OpenTelemetry Rust SDK + │ + │ Exports via OTLP/gRPC every 10 seconds + │ grpc://localhost:4317 + │ + ▼ +OTel Collector (port 4317) + │ + ├──► Prometheus (port 9090) + │ Metrics like: + │ moq_relay_active_subscribers + │ moq_relay_active_streams + │ moq_relay_bytes_sent_total + │ + └──► Tempo (port 3200) + Traces for connection lifecycle +``` + +### 3. Docker Logs → Alloy → Loki + +``` +Docker Containers (relay, etc.) + │ + │ JSON structured logs with trace_id + │ + ▼ +Grafana Alloy (log collector) + │ + │ Scrapes Docker container logs + │ Parses JSON, extracts labels + │ + ▼ +Loki (port 3100) + │ + └──► Grafana log explorer + Can link logs ↔ traces via trace_id +``` + +## Quick Start + +### 1. Start the observability stack + +```bash +cd observability +docker compose up -d +``` + +### 2. Import dashboards + +```bash +./import-dashboards.sh +``` + +### 3. Start the MoQ relay (from project root) + +```bash +just dev +``` + +### 4. Access Grafana + +Open http://localhost:3050 (login: admin/admin) + +## Available Metrics + +Metrics are split into two layers to align with MoQ's "relay stays dumb" philosophy: + +### MoQ Layer (relay, media-agnostic) + +The relay operates on MoQ-native units (objects, groups) without understanding media semantics. + +| Metric | Type | Description | +|--------|------|-------------| +| `moq_relay_active_subscribers` | Gauge | Current viewer count | +| `moq_relay_active_streams` | Gauge | Current stream count | +| `moq_relay_active_connections` | Gauge | Current connection count | +| `moq_relay_connections_total` | Counter | Total connections over time | +| `moq_relay_bytes_sent_total` | Counter | Total bytes transmitted | +| `moq_relay_bytes_received_total` | Counter | Total bytes received | +| `moq_relay_app_bytes_sent_total` | Counter | App-level payload bytes sent (use for amplification; excludes retransmits) | +| `moq_relay_app_bytes_received_total` | Counter | App-level payload bytes received (use for amplification; excludes retransmits) | +| `moq_relay_objects_sent_total` | Counter | Total MoQ objects transmitted | +| `moq_relay_objects_received_total` | Counter | Total MoQ objects received | +| `moq_relay_groups_sent_total` | Counter | Total MoQ groups transmitted | +| `moq_relay_groups_received_total` | Counter | Total MoQ groups received | +| `moq_relay_cache_hits_total` | Counter | Experimental: “served without upstream work” (definition TBD; fanout-sensitive) | +| `moq_relay_cache_misses_total` | Counter | Experimental: “required upstream work” (definition TBD; fanout-sensitive) | +| `moq_relay_dedup_upstream_saved_total` | Counter | Upstream work avoided via subscription dedup (fanout-relay “cache effectiveness”) | +| `moq_relay_fanout` | Histogram | Effective fanout (currently derived periodically, not group-accurate) | +| `moq_relay_queue_depth` | Gauge | Pending objects in delivery queue | +| `moq_relay_drops_total` | Counter | Objects dropped (backpressure) | +| `moq_relay_errors_total` | Counter | Connection errors | + +**Note on cache metrics in fanout relays:** In a Producer/Consumer fanout architecture, “cache hit rate” can be misleading unless it’s defined precisely (per-consumer delivery vs per-upstream work vs late-join retention). Prefer `moq_relay_dedup_upstream_saved_total` plus `moq_relay_app_bytes_{sent,received}_total` (amplification) until `cache_hits_total`/`cache_misses_total` are fully defined and wired. + +**Labels:** +- `relay_instance`: Relay identifier +- `namespace`: Stream namespace +- `region`: Deployment region + +### Hang Layer (media-aware, client-side) + +Media-specific metrics are collected in the browser by the hang player, not the relay. + +**Client Experience (CMCD-aligned):** + +| Metric | Type | Description | +|--------|------|-------------| +| `moq_client_buffer_length_seconds` | Histogram | Video buffer length in seconds | +| `moq_client_startup_time_seconds` | Histogram | Time to first frame | +| `moq_client_latency_seconds` | Histogram | Latency to live edge | +| `moq_client_bitrate_bps` | Histogram | Current playback bitrate | +| `moq_client_quality_switches_total` | Counter | Quality/bitrate switches | +| `moq_client_connections_total` | Counter | Connections by transport type | +| `moq_client_rebuffer_count_total` | Counter | Rebuffering events | + +**Decode/Render Metrics:** + +| Metric | Type | Description | +|--------|------|-------------| +| `moq_client_frames_decoded_total` | Counter | Successfully decoded frames | +| `moq_client_frames_dropped_total` | Counter | Dropped frames (congestion) | +| `moq_client_keyframe_interval_seconds` | Histogram | Time between keyframes | +| `moq_client_decode_time_seconds` | Histogram | Frame decode latency | +| `moq_client_av_sync_drift_seconds` | Histogram | Audio/video sync drift | + +**Labels:** +- `transport`: `webtransport` or `websocket` +- `codec`: e.g., `avc1.64001f` +- `track_type`: `video` or `audio` + +## Dashboards + +### MoQ Overview +The main dashboard showing: +- **Top row**: Key stats (viewers, streams, connections, transport distribution, startup time) +- **Client Experience**: Buffer health, startup time distribution, quality switches +- **Relay Performance**: Viewers over time, connection rate, bandwidth, objects/groups rate +- **Relay Effectiveness**: Cache hit rate, dedup savings, fanout distribution, queue depth +- **SLO Status**: Time-to-first-frame p95, end-to-end latency p95, stall ratio + +### MoQ Pipeline +Detailed technical metrics for debugging. + +### Node Exporter Full +System metrics (CPU, memory, disk, network) for the host. + +## Ports Reference + +| Service | Port | Purpose | +|---------|------|---------| +| Grafana | 3050 | Dashboards UI | +| Prometheus | 9090 | Metrics storage & queries | +| Tempo | 3200 | Trace storage | +| Loki | 3100 | Log storage | +| OTel Collector (gRPC) | 4317 | Relay telemetry ingestion | +| OTel Collector (HTTP) | 4318 | Browser telemetry ingestion | +| OTel Collector (Prometheus) | 8889 | Metrics export for scraping | +| Node Exporter | 9100 | System metrics | + +## Configuration Files + +| File | Purpose | +|------|---------| +| `docker-compose.yml` | All observability services | +| `otel-collector-config.yaml` | OTel Collector pipelines | +| `prometheus.yml` | Prometheus scrape config | +| `tempo-config.yaml` | Tempo trace storage | +| `alloy-config.alloy` | Log collection config | +| `grafana/provisioning/` | Datasources & dashboards | + +## Troubleshooting + +### No client metrics appearing + +1. Check browser console for `[Observability] Initialized` +2. Verify CORS: Browser should successfully POST to `localhost:4318` +3. Check OTel Collector logs: `docker logs observability-otel-collector-1` + +### No relay metrics appearing + +1. Check relay logs for OTel initialization +2. Verify Prometheus is scraping: http://localhost:9090/targets +3. Query directly: `curl 'http://localhost:9090/api/v1/query?query=moq_relay_active_streams'` + +### Dashboard shows "No data" + +1. Verify time range (top right) is recent +2. Check datasource connection in panel edit mode +3. Run `./import-dashboards.sh` to re-import + +### WebSocket fallback instead of WebTransport + +Check `moq_client_connections_total` by transport label: +```promql +sum by (transport) (moq_client_connections_total) +``` + +If all connections are `websocket`, WebTransport may not be supported or configured. + +## Design Decisions + +### Why no session_id in metrics? + +Session IDs are **high-cardinality** and would cause Prometheus to run out of memory with many users. Instead: + +| Signal | Per-Session | Use Case | +|--------|-------------|----------| +| Metrics | ❌ Aggregates | "How many users? Avg latency?" | +| Traces | ✅ Per-session | "Debug THIS user's issue" | +| Logs | ✅ Per-session | "What happened to session X?" | + +### Why OpenTelemetry? + +- **Vendor-neutral**: Switch backends without code changes +- **Standard protocol**: OTLP is widely supported +- **Single SDK**: Metrics, traces, logs in one library +- **Future-proof**: CNCF graduated project + +### Why this stack (Prometheus/Tempo/Loki/Grafana)? + +- **All Grafana ecosystem**: Seamless integration +- **Proven at scale**: Used by major companies +- **Open source**: No vendor lock-in +- **Rich querying**: PromQL, TraceQL, LogQL + +### Browser OTLP in Production + +In development, browsers send telemetry directly to `localhost:4318`. In production, browser OTLP must be sent to a reachable endpoint to avoid CORS and network reachability issues: + +| Option | Description | +|--------|-------------| +| **Same-origin proxy** | Add `/otel` path on the relay that proxies to the collector | +| **Dedicated ingress** | Deploy collector with proper CORS headers on a public endpoint | +| **Edge collector** | Run collector at CDN edge (Cloudflare Workers, etc.) | + +**Example nginx proxy configuration:** +```nginx +location /otel/ { + proxy_pass http://otel-collector:4318/; + proxy_set_header Host $host; + + # CORS headers for browser requests + add_header 'Access-Control-Allow-Origin' '*'; + add_header 'Access-Control-Allow-Methods' 'POST, OPTIONS'; + add_header 'Access-Control-Allow-Headers' 'Content-Type'; +} +``` + +**Client configuration for production:** +```typescript +initObservability({ + // Use same-origin path to avoid CORS issues + otlpEndpoint: `${window.location.origin}/otel`, + serviceName: "moq-client", +}); +``` + +### Trace/Log/qlog Correlation + +All telemetry signals (traces, logs, qlog) share a common `connection_id` for correlation: + +| Signal | Location | How to correlate | +|--------|----------|------------------| +| **Traces** | Tempo | Filter by `connection_id` span attribute | +| **Logs** | Loki | Filter by `connection_id` field in JSON logs | +| **qlog** | File system | qlog files are named `qlog/{connection_id}/trace.json` | + +**Correlation workflow:** + +1. Find an interesting trace in Grafana/Tempo +2. Copy the `connection_id` from the span attributes +3. Search logs in Loki: `{service="moq-relay"} | json | connection_id="conn-123"` +4. If deeper QUIC debugging is needed, check the `qlog_path` attribute in the trace to find the qlog file + +**Enabling qlog for QUIC forensics:** +```bash +# Enable qlog with 10% sampling +MOQ_QLOG_ENABLED=true MOQ_QLOG_SAMPLE_RATE=0.1 just dev +``` + +qlog files can be visualized with [qvis](https://qvis.quictools.info/) for detailed QUIC protocol analysis. + +## Next Steps + +To extend this observability setup: + +1. **Add alerting**: Define rules in `alerts.yml` +2. **Add broadcast labels**: Track per-stream metrics +3. **Enable QUIC stats**: When WebTransport is working +4. **Add geographic labels**: Track viewer distribution +5. **Set up dashboards for specific use cases**: Live events, VOD, etc. diff --git a/observability/docker-compose.yml b/observability/docker-compose.yml new file mode 100644 index 000000000..da1ccb876 --- /dev/null +++ b/observability/docker-compose.yml @@ -0,0 +1,56 @@ +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro + ports: + - "4318:4318" # OTLP HTTP (browser) + - "8889:8889" # Prometheus exporter (scraped by Prometheus) + networks: + - moq-observability + + prometheus: + image: prom/prometheus:latest + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--storage.tsdb.retention.time=15d" + - "--web.enable-lifecycle" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + ports: + - "9090:9090" + networks: + - moq-observability + depends_on: + - otel-collector + extra_hosts: + # Enables host access on Linux / Docker Engine (including WSL2). + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:latest + environment: + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_AUTH_DISABLE_LOGIN_FORM=false + volumes: + - grafana-data:/var/lib/grafana + - ./grafana/provisioning:/etc/grafana/provisioning:ro + ports: + - "3050:3000" + networks: + - moq-observability + depends_on: + - prometheus + +networks: + moq-observability: + driver: bridge + +volumes: + prometheus-data: + grafana-data: + diff --git a/observability/grafana/provisioning/datasources/datasources.yml b/observability/grafana/provisioning/datasources/datasources.yml new file mode 100644 index 000000000..af9f7e709 --- /dev/null +++ b/observability/grafana/provisioning/datasources/datasources.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + uid: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: true + diff --git a/observability/otel-collector-config.yaml b/observability/otel-collector-config.yaml new file mode 100644 index 000000000..6ed7ae623 --- /dev/null +++ b/observability/otel-collector-config.yaml @@ -0,0 +1,31 @@ +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + cors: + allowed_origins: + - "*" + allowed_headers: + - "*" + max_age: 7200 + +processors: + batch: + timeout: 10s + send_batch_size: 1024 + +exporters: + prometheus: + endpoint: 0.0.0.0:8889 + +service: + telemetry: + logs: + level: info + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] + diff --git a/observability/prometheus.yml b/observability/prometheus.yml new file mode 100644 index 000000000..036fb60f4 --- /dev/null +++ b/observability/prometheus.yml @@ -0,0 +1,17 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + # Scrape the OTel Collector Prometheus exporter (browser OTLP/HTTP metrics land here). + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:8889'] + + # Scrape the relay's /metrics endpoint from the host. + # `host.docker.internal` is provided via docker-compose extra_hosts. + - job_name: 'moq-relay' + metrics_path: /metrics + static_configs: + - targets: ['host.docker.internal:4443'] + From 5acf87202b6ffdfaf469d485e685d0e7e79482d2 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 16:59:10 -0300 Subject: [PATCH 08/12] just: add observability helper recipes --- justfile | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/justfile b/justfile index 391f3c9e4..7f1f6075c 100644 --- a/justfile +++ b/justfile @@ -413,3 +413,15 @@ pub-console: # Serve the documentation locally. doc: cd doc && bun run dev + +# Start the optional observability stack (Prometheus, Grafana, OTel Collector). +observability: + cd observability && docker compose up -d + +# Stop the optional observability stack. +observability-stop: + cd observability && docker compose down + +# Follow observability stack logs. +observability-logs: + cd observability && docker compose logs -f From cca8aaf3a128c9209ab3dcec7e6f76426677d546 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 17:00:58 -0300 Subject: [PATCH 09/12] just: make observability run stack + dev --- justfile | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/justfile b/justfile index 7f1f6075c..6b7d534b8 100644 --- a/justfile +++ b/justfile @@ -414,14 +414,9 @@ pub-console: doc: cd doc && bun run dev -# Start the optional observability stack (Prometheus, Grafana, OTel Collector). +# Start the optional observability stack, then run the full dev demo. observability: - cd observability && docker compose up -d - -# Stop the optional observability stack. -observability-stop: - cd observability && docker compose down - -# Follow observability stack logs. -observability-logs: - cd observability && docker compose logs -f + @echo ">>> Starting observability stack..." + (cd observability && docker compose up -d) + @echo ">>> Starting demo (just dev)..." + just dev From c20b708a23f50723614e107f04bd38797561214f Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 17:11:10 -0300 Subject: [PATCH 10/12] chore: update dependencies for OpenTelemetry and clean up observability code --- deno.lock | 3 +++ js/hang/src/observability/index.ts | 4 ++-- js/hang/src/watch/broadcast.ts | 11 ----------- js/hang/src/watch/element.ts | 2 +- js/hang/src/watch/video/source.ts | 15 +++++++++------ js/lite/src/connection/connect.ts | 2 +- 6 files changed, 16 insertions(+), 21 deletions(-) diff --git a/deno.lock b/deno.lock index 01f2f162d..76a9c5210 100644 --- a/deno.lock +++ b/deno.lock @@ -37,6 +37,9 @@ "dependencies": [ "npm:@kixelated/libavjs-webcodecs-polyfill@~0.5.5", "npm:@libav.js/variant-opus-af@^6.8.8", + "npm:@opentelemetry/api@^1.8.0", + "npm:@opentelemetry/exporter-metrics-otlp-http@0.52", + "npm:@opentelemetry/sdk-metrics@^1.25.0", "npm:@types/audioworklet@^0.0.77", "npm:@types/web@^0.0.241", "npm:async-mutex@0.5", diff --git a/js/hang/src/observability/index.ts b/js/hang/src/observability/index.ts index d9508ec16..f65b13cce 100644 --- a/js/hang/src/observability/index.ts +++ b/js/hang/src/observability/index.ts @@ -6,8 +6,8 @@ * - `moq_client_startup_time_seconds{track_type=...}` */ -import { metrics } from "@opentelemetry/api"; import type { Counter, Histogram, Meter } from "@opentelemetry/api"; +import { metrics } from "@opentelemetry/api"; import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; import { MeterProvider, PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; @@ -76,7 +76,7 @@ function setupConnectionTracking() { }); } }) - .catch((error) => console.warn("Failed to set up connection tracking for observability:", error)); + .catch((error) => console.warn("Failed to set up connection tracking for observability:", error)); } export class ClientMetrics { diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index 62d10bf11..c8fe30a1a 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -1,7 +1,6 @@ import type * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../catalog"; -import { recordMetric } from "../observability"; import { PRIORITY } from "../publish/priority"; import * as Audio from "./audio"; import { Chat, type ChatProps } from "./chat"; @@ -96,9 +95,6 @@ export class Broadcast { effect.cleanup(() => announced.close()); effect.spawn(async () => { - const startTime = performance.now(); - let firstFrame = true; - for (;;) { const update = await announced.next(); if (!update) break; @@ -109,13 +105,6 @@ export class Broadcast { continue; } - // Record startup time on first active broadcast (CMCD su) - if (firstFrame && update.active) { - const startupTime = (performance.now() - startTime) / 1000; // Convert to seconds - recordMetric((m) => m.recordStartupTime(startupTime, { track_type: "video" })); - firstFrame = false; - } - effect.set(this.#active, update.active, false); } }); diff --git a/js/hang/src/watch/element.ts b/js/hang/src/watch/element.ts index 66be06c98..ee53572e4 100644 --- a/js/hang/src/watch/element.ts +++ b/js/hang/src/watch/element.ts @@ -1,10 +1,10 @@ import type { Time } from "@moq/lite"; import * as Moq from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; +import * as Observability from "../observability"; import * as Audio from "./audio"; import { Broadcast } from "./broadcast"; import * as Video from "./video"; -import * as Observability from "../observability"; // TODO remove name; replaced with path const OBSERVED = ["url", "name", "path", "paused", "volume", "muted", "reload", "latency"] as const; diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index 50c8baf34..615eb6ebd 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -42,7 +42,6 @@ export interface VideoStats { timestamp: number; bytesReceived: number; framesDecoded: number; - framesDropped: number; } // Only count it as buffering if we had to sleep for 200ms or more before rendering the next frame. @@ -50,9 +49,6 @@ export interface VideoStats { // TODO Maybe we need to detect b-frames and make this dynamic? const MIN_SYNC_WAIT_MS = 200 as Time.Milli; -// Minimum time to wait for a frame before counting it as a rebuffer event (ms) -const REBUFFER_THRESHOLD_MS = 100; - // The maximum number of concurrent b-frames that we support. const MAX_BFRAMES = 10; @@ -243,6 +239,14 @@ export class Source { const decoder = new VideoDecoder({ output: async (frame: VideoFrame) => { + // Count actual decoded frames here (not per encoded chunk). + this.#stats.update((current) => ({ + frameCount: current?.frameCount ?? 0, + timestamp: current?.timestamp ?? 0, + bytesReceived: current?.bytesReceived ?? 0, + framesDecoded: (current?.framesDecoded ?? 0) + 1, + })); + // Insert into a queue so we can perform ordered sleeps. // If this were to block, I believe WritableStream is still ordered. try { @@ -345,8 +349,7 @@ export class Source { frameCount: (current?.frameCount ?? 0) + 1, timestamp: next.timestamp, bytesReceived: (current?.bytesReceived ?? 0) + next.data.byteLength, - framesDecoded: (current?.framesDecoded ?? 0) + 1, - framesDropped: current?.framesDropped ?? 0, + framesDecoded: current?.framesDecoded ?? 0, })); } }); diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index f6bfcba17..69aee5c18 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -1,4 +1,5 @@ import WebTransportWs from "@moq/web-transport-ws"; +import type { Established } from "./established.ts"; import * as Ietf from "../ietf/index.ts"; import * as Lite from "../lite/index.ts"; import { Stream } from "../stream.ts"; @@ -14,7 +15,6 @@ let connectionTypeCallback: ((type: "webtransport" | "websocket") => void) | und export function onConnectionType(callback: (type: "webtransport" | "websocket") => void) { connectionTypeCallback = callback; } -import type { Established } from "./established.ts"; export interface WebSocketOptions { // If true (default), enable the WebSocket fallback. From 0f68bb22d5fc6de4ab0095c48fe533cf447dac64 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 17:19:19 -0300 Subject: [PATCH 11/12] fix: remove unnecessary newline in observability index and reorder import in connection file --- js/hang/src/observability/index.ts | 1 - js/lite/src/connection/connect.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/js/hang/src/observability/index.ts b/js/hang/src/observability/index.ts index f65b13cce..f09fd7287 100644 --- a/js/hang/src/observability/index.ts +++ b/js/hang/src/observability/index.ts @@ -120,4 +120,3 @@ export function recordMetric(fn: (metrics: ClientMetrics) => void): void { const m = getClientMetrics(); if (m) fn(m); } - diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index 69aee5c18..5cd4f39f8 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -1,9 +1,9 @@ import WebTransportWs from "@moq/web-transport-ws"; -import type { Established } from "./established.ts"; import * as Ietf from "../ietf/index.ts"; import * as Lite from "../lite/index.ts"; import { Stream } from "../stream.ts"; import * as Hex from "../util/hex.ts"; +import type { Established } from "./established.ts"; // Connection type tracking for observability let connectionTypeCallback: ((type: "webtransport" | "websocket") => void) | undefined; From 2f7a19fa4623862fd0140fde4faa31a1147d43e1 Mon Sep 17 00:00:00 2001 From: fcancela Date: Fri, 16 Jan 2026 17:19:26 -0300 Subject: [PATCH 12/12] refactor: clean up session and stats imports, format function calls for better readability --- rs/moq-lite/src/ietf/publisher.rs | 3 +-- rs/moq-lite/src/ietf/session.rs | 8 +++++++- rs/moq-lite/src/ietf/subscriber.rs | 3 +-- rs/moq-lite/src/lite/publisher.rs | 3 +-- rs/moq-lite/src/lite/session.rs | 6 +++++- rs/moq-lite/src/lite/subscriber.rs | 3 +-- rs/moq-lite/src/session.rs | 20 ++++++++++++++++++-- rs/moq-lite/src/stats.rs | 1 - rs/moq-native/src/server.rs | 4 +--- rs/moq-relay/src/connection.rs | 5 +---- rs/moq-relay/src/metrics.rs | 5 +---- 11 files changed, 37 insertions(+), 24 deletions(-) diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 190e2639c..9991de734 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -5,11 +5,10 @@ use web_async::{FuturesExt, Lock}; use web_transport_trait::SendStream; use crate::{ - Error, Origin, OriginConsumer, Track, TrackConsumer, + Error, Origin, OriginConsumer, Stats, Track, TrackConsumer, coding::Writer, ietf::{self, Control, FetchHeader, FetchType, FilterType, GroupOrder, Location, RequestId, Version}, model::GroupConsumer, - Stats, }; #[derive(Clone)] diff --git a/rs/moq-lite/src/ietf/session.rs b/rs/moq-lite/src/ietf/session.rs index 5312a403c..04ce97c0e 100644 --- a/rs/moq-lite/src/ietf/session.rs +++ b/rs/moq-lite/src/ietf/session.rs @@ -1,7 +1,12 @@ -use crate::{coding::{Reader, Stream}, ietf::{self, Control, Message, RequestId, Version}, Error, OriginConsumer, OriginProducer, Stats}; +use crate::{ + Error, OriginConsumer, OriginProducer, Stats, + coding::{Reader, Stream}, + ietf::{self, Control, Message, RequestId, Version}, +}; use super::{Publisher, Subscriber}; +#[allow(clippy::too_many_arguments)] pub(crate) async fn start( session: S, setup: Stream, @@ -43,6 +48,7 @@ pub(crate) async fn start( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn run( session: S, setup: Stream, diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index 50b3f2847..fb1c7c180 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -4,12 +4,11 @@ use std::{ }; use crate::{ - Broadcast, Error, Frame, FrameProducer, Group, GroupProducer, OriginProducer, Path, PathOwned, Track, + Broadcast, Error, Frame, FrameProducer, Group, GroupProducer, OriginProducer, Path, PathOwned, Stats, Track, TrackProducer, coding::Reader, ietf::{self, Control, FetchHeader, FilterType, GroupFlags, GroupOrder, RequestId, Version}, model::BroadcastProducer, - Stats, }; use web_async::Lock; diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index b253a5589..7e6191365 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use web_async::FuturesExt; use crate::{ - AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, Track, TrackConsumer, + AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, Stats, Track, TrackConsumer, coding::{Stream, Writer}, lite::{ self, Version, priority::{PriorityHandle, PriorityQueue}, }, model::GroupConsumer, - Stats, }; pub(super) struct Publisher { diff --git a/rs/moq-lite/src/lite/session.rs b/rs/moq-lite/src/lite/session.rs index fb5c49aea..c8be8fef2 100644 --- a/rs/moq-lite/src/lite/session.rs +++ b/rs/moq-lite/src/lite/session.rs @@ -2,7 +2,11 @@ use std::sync::Arc; use tokio::sync::oneshot; -use crate::{coding::Stream, lite::{SessionInfo, Version}, Error, OriginConsumer, OriginProducer, Stats}; +use crate::{ + Error, OriginConsumer, OriginProducer, Stats, + coding::Stream, + lite::{SessionInfo, Version}, +}; use super::{Publisher, Subscriber}; diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9574aa18f..4d807fd86 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -4,12 +4,11 @@ use std::{ }; use crate::{ - AsPath, Broadcast, Error, Frame, FrameProducer, Group, GroupProducer, OriginProducer, Path, PathOwned, + AsPath, Broadcast, Error, Frame, FrameProducer, Group, GroupProducer, OriginProducer, Path, PathOwned, Stats, TrackProducer, coding::{Reader, Stream}, lite::{self, Version}, model::BroadcastProducer, - Stats, }; use tokio::sync::oneshot; diff --git a/rs/moq-lite/src/session.rs b/rs/moq-lite/src/session.rs index 849026b17..add8a1eac 100644 --- a/rs/moq-lite/src/session.rs +++ b/rs/moq-lite/src/session.rs @@ -68,7 +68,15 @@ impl Session { if let Ok(version) = lite::Version::try_from(server.version) { let stream = stream.with_version(version); - lite::start(session.clone(), stream, publish.into(), subscribe.into(), stats, version).await?; + lite::start( + session.clone(), + stream, + publish.into(), + subscribe.into(), + stats, + version, + ) + .await?; } else if let Ok(version) = ietf::Version::try_from(server.version) { // Decode the parameters to get the initial request ID. let parameters = ietf::Parameters::decode(&mut server.parameters, version)?; @@ -147,7 +155,15 @@ impl Session { if let Ok(version) = lite::Version::try_from(version) { let stream = stream.with_version(version); - lite::start(session.clone(), stream, publish.into(), subscribe.into(), stats, version).await?; + lite::start( + session.clone(), + stream, + publish.into(), + subscribe.into(), + stats, + version, + ) + .await?; } else if let Ok(version) = ietf::Version::try_from(version) { // Decode the parameters to get the initial request ID. let parameters = ietf::Parameters::decode(&mut server.parameters, version)?; diff --git a/rs/moq-lite/src/stats.rs b/rs/moq-lite/src/stats.rs index 542c266d3..eecfd9806 100644 --- a/rs/moq-lite/src/stats.rs +++ b/rs/moq-lite/src/stats.rs @@ -27,4 +27,3 @@ impl Stats for NoopStats { fn add_rx_bytes(&self, _bytes: u64) {} fn add_tx_bytes(&self, _bytes: u64) {} } - diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index d1941852b..d1b667c82 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -382,9 +382,7 @@ impl Request { moq_lite::Session::accept_with_stats(request.ok().await?, publish, subscribe, stats).await? } #[cfg(feature = "iroh")] - Request::IrohQuic(request) => { - moq_lite::Session::accept_with_stats(request.ok(), publish, subscribe, stats).await? - } + Request::IrohQuic(request) => moq_lite::Session::accept_with_stats(request.ok(), publish, subscribe, stats).await?, }; Ok(session) diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 8bb94af6b..b2d6cd815 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -70,10 +70,7 @@ impl Connection { // We subscribe to the tracks the client is allowed to publish. let stats: Arc = Arc::new(crate::TransportStats::new(metrics, crate::Transport::WebTransport)); - let session = self - .request - .accept_with_stats(subscribe, publish, Some(stats)) - .await?; + let session = self.request.accept_with_stats(subscribe, publish, Some(stats)).await?; // Wait until the session is closed. session.closed().await.map_err(Into::into) diff --git a/rs/moq-relay/src/metrics.rs b/rs/moq-relay/src/metrics.rs index ce506c794..b8d2181cc 100644 --- a/rs/moq-relay/src/metrics.rs +++ b/rs/moq-relay/src/metrics.rs @@ -62,9 +62,7 @@ impl MetricsTracker { pub fn record_app_bytes_received(&self, transport: Transport, bytes: u64) { match transport { - Transport::WebTransport => self - .app_bytes_received_webtransport - .fetch_add(bytes, Ordering::Relaxed), + Transport::WebTransport => self.app_bytes_received_webtransport.fetch_add(bytes, Ordering::Relaxed), Transport::WebSocket => self.app_bytes_received_websocket.fetch_add(bytes, Ordering::Relaxed), }; } @@ -112,4 +110,3 @@ impl moq_lite::Stats for TransportStats { self.metrics.record_app_bytes_sent(self.transport, bytes); } } -