From 257b9f29bcbbb10807e10ed33a839676c043fe3c Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 30 Jan 2026 22:43:34 -0300 Subject: [PATCH 1/5] Add buffer visualization and improve audio/video handling - Add buffer visualization tool with timeline display - Improve audio decoder with better sync handling - Enhance MSE support for video playback - Add ring buffer improvements and tests - Update latency slider and watch controls Co-Authored-By: Claude Opus 4.5 --- js/hang-demo/src/index.html | 2 +- js/hang-demo/src/mse.html | 2 +- .../components/stats/providers/video.ts | 33 +- .../src/watch/components/BufferControl.tsx | 158 ++++++++ .../src/watch/components/LatencySlider.tsx | 9 +- .../src/watch/components/WatchControls.tsx | 4 +- js/hang-ui/src/watch/context.tsx | 18 +- js/hang-ui/src/watch/hooks/use-watch-ui.ts | 4 +- js/hang-ui/src/watch/styles/index.css | 127 +++++++ js/hang/src/container/legacy.ts | 59 ++- js/hang/src/watch/audio/decoder.ts | 123 +++++- js/hang/src/watch/audio/mse.ts | 24 +- js/hang/src/watch/audio/render-worklet.ts | 18 +- js/hang/src/watch/audio/render.ts | 15 +- js/hang/src/watch/audio/ring-buffer.test.ts | 208 +++++++++-- js/hang/src/watch/audio/ring-buffer.ts | 48 ++- js/hang/src/watch/backend.ts | 53 ++- js/hang/src/watch/element.ts | 12 +- js/hang/src/watch/mse.ts | 45 --- js/hang/src/watch/video/backend.ts | 8 +- js/hang/src/watch/video/decoder.ts | 353 ++++++++++++------ js/hang/src/watch/video/mse.ts | 73 +++- js/signals/src/index.ts | 3 +- 23 files changed, 1112 insertions(+), 287 deletions(-) create mode 100644 js/hang-ui/src/watch/components/BufferControl.tsx diff --git a/js/hang-demo/src/index.html b/js/hang-demo/src/index.html index 468351a19..180b6728c 100644 --- a/js/hang-demo/src/index.html +++ b/js/hang-demo/src/index.html @@ -28,7 +28,7 @@ NOTE: Cloudflare doesn't support reload yet. --> - + diff --git a/js/hang-demo/src/mse.html b/js/hang-demo/src/mse.html index e47e6b4dd..5b493599f 100644 --- a/js/hang-demo/src/mse.html +++ b/js/hang-demo/src/mse.html @@ -20,7 +20,7 @@ - + diff --git a/js/hang-ui/src/shared/components/stats/providers/video.ts b/js/hang-ui/src/shared/components/stats/providers/video.ts index 2664cfa95..c4e3193ce 100644 --- a/js/hang-ui/src/shared/components/stats/providers/video.ts +++ b/js/hang-ui/src/shared/components/stats/providers/video.ts @@ -14,8 +14,6 @@ export class VideoProvider extends BaseProvider { /** Bound callback for display updates */ /** Previous frame count for FPS calculation */ private previousFrameCount = 0; - /** Previous timestamp for FPS calculation */ - private previousTimestamp = 0; /** Previous bytes received for bitrate calculation */ private previousBytesReceived = 0; /** Previous timestamp for accurate elapsed time calculation in bitrate */ @@ -50,14 +48,15 @@ export class VideoProvider extends BaseProvider { const stats = this.props.video.stats.peek(); const now = performance.now(); + const elapsedMs = now - this.previousWhen; + // Calculate FPS from frame count delta and timestamp delta let fps: number | undefined; - if (stats && this.previousTimestamp > 0) { + if (stats && this.previousFrameCount > 0) { const frameCountDelta = stats.frameCount - this.previousFrameCount; - const timestampDeltaUs = stats.timestamp - this.previousTimestamp; - if (timestampDeltaUs > 0 && frameCountDelta > 0) { - const elapsedSeconds = timestampDeltaUs / 1_000_000; + if (elapsedMs > 0 && frameCountDelta > 0) { + const elapsedSeconds = elapsedMs / 1_000; fps = frameCountDelta / elapsedSeconds; } } @@ -66,18 +65,15 @@ export class VideoProvider extends BaseProvider { if (stats && this.previousBytesReceived > 0) { const bytesDelta = stats.bytesReceived - this.previousBytesReceived; // Only calculate bitrate if there's actual data change - if (bytesDelta > 0) { - const elapsedMs = now - this.previousWhen; - if (elapsedMs > 0) { - const bitsPerSecond = bytesDelta * 8 * (1000 / elapsedMs); - - if (bitsPerSecond >= 1_000_000) { - bitrate = `${(bitsPerSecond / 1_000_000).toFixed(1)}Mbps`; - } else if (bitsPerSecond >= 1_000) { - bitrate = `${(bitsPerSecond / 1_000).toFixed(0)}kbps`; - } else { - bitrate = `${bitsPerSecond.toFixed(0)}bps`; - } + if (bytesDelta > 0 && elapsedMs > 0) { + const bitsPerSecond = bytesDelta * 8 * (1000 / elapsedMs); + + if (bitsPerSecond >= 1_000_000) { + bitrate = `${(bitsPerSecond / 1_000_000).toFixed(1)}Mbps`; + } else if (bitsPerSecond >= 1_000) { + bitrate = `${(bitsPerSecond / 1_000).toFixed(0)}kbps`; + } else { + bitrate = `${bitsPerSecond.toFixed(0)}bps`; } } } @@ -85,7 +81,6 @@ export class VideoProvider extends BaseProvider { // Always update previous values for next calculation, even on first call if (stats) { this.previousFrameCount = stats.frameCount; - this.previousTimestamp = stats.timestamp; this.previousBytesReceived = stats.bytesReceived; this.previousWhen = now; } diff --git a/js/hang-ui/src/watch/components/BufferControl.tsx b/js/hang-ui/src/watch/components/BufferControl.tsx new file mode 100644 index 000000000..ba8aa3722 --- /dev/null +++ b/js/hang-ui/src/watch/components/BufferControl.tsx @@ -0,0 +1,158 @@ +import { Moq } from "@moq/hang"; +import type { BufferedRange } from "@moq/hang/watch"; +import { createMemo, createSignal, For, Show } from "solid-js"; +import useWatchUIContext from "../hooks/use-watch-ui"; + +const MIN_RANGE = 0 as Moq.Time.Milli; +const RANGE_STEP = 100 as Moq.Time.Milli; + +type BufferControlProps = { + /** Maximum buffer range in milliseconds (default: 5000ms = 5s) */ + max?: Moq.Time.Milli; +}; + +export default function BufferControl(props: BufferControlProps) { + const context = useWatchUIContext(); + const maxRange = (): Moq.Time.Milli => props.max ?? (5000 as Moq.Time.Milli); + const [isDragging, setIsDragging] = createSignal(false); + + // Compute range style and overflow info relative to current timestamp + const computeRange = (range: BufferedRange, timestamp: Moq.Time.Milli, color: string) => { + const startMs = (range.start - timestamp) as Moq.Time.Milli; + const endMs = (range.end - timestamp) as Moq.Time.Milli; + const visibleStartMs = Math.max(0, startMs) as Moq.Time.Milli; + const visibleEndMs = Math.min(endMs, maxRange()) as Moq.Time.Milli; + const leftPct = (visibleStartMs / maxRange()) * 100; + const widthPct = Math.max(0.5, ((visibleEndMs - visibleStartMs) / maxRange()) * 100); + const isOverflow = endMs > maxRange(); + const overflowSec = isOverflow + ? Moq.Time.Milli.toSecond((endMs - visibleStartMs) as Moq.Time.Milli).toFixed(1) + : null; + return { + style: `left: ${leftPct}%; width: ${widthPct}%; background: ${color};`, + isOverflow, + overflowSec, + }; + }; + + // Determine color based on gap detection and buffering state + const rangeColor = (index: number, isBuffering: boolean) => { + if (isBuffering) return "#f87171"; // red + if (index > 0) return "#facc15"; // yellow + return "#4ade80"; // green + }; + + const bufferTargetPct = createMemo(() => (context.jitter() / maxRange()) * 100); + + // Handle mouse interaction to set buffer via clicking/dragging on the visualization + let containerRef: HTMLDivElement | undefined; + + const LABEL_WIDTH = 48; // px reserved for track labels + + const updateBufferFromMouseX = (clientX: number) => { + if (!containerRef) return; + const rect = containerRef.getBoundingClientRect(); + const trackWidth = rect.width - LABEL_WIDTH; + const x = Math.max(0, Math.min(clientX - rect.left - LABEL_WIDTH, trackWidth)); + const ms = (x / trackWidth) * maxRange(); + const snapped = (Math.round(ms / RANGE_STEP) * RANGE_STEP) as Moq.Time.Milli; + const clamped = Math.max(MIN_RANGE, Math.min(maxRange(), snapped)) as Moq.Time.Milli; + context.setJitter(clamped); + }; + + const onMouseDown = (e: MouseEvent) => { + setIsDragging(true); + updateBufferFromMouseX(e.clientX); + document.addEventListener("mousemove", onMouseMove); + document.addEventListener("mouseup", onMouseUp); + }; + + const onMouseMove = (e: MouseEvent) => { + if (isDragging()) { + updateBufferFromMouseX(e.clientX); + } + }; + + const onMouseUp = () => { + setIsDragging(false); + document.removeEventListener("mousemove", onMouseMove); + document.removeEventListener("mouseup", onMouseUp); + }; + + return ( +
+ {/* Buffer Visualization - interactive, click/drag to set buffer */} +
+ {/* Playhead (left edge = current time) */} +
+ + {/* Video buffer track */} +
+ Video + + {(range, i) => { + const info = () => { + const timestamp = context.timestamp(); + if (timestamp === undefined) return null; + return computeRange(range, timestamp, rangeColor(i(), context.buffering())); + }; + return ( + + {(rangeInfo) => ( +
+ + {rangeInfo().overflowSec}s + +
+ )} +
+ ); + }} +
+
+ + {/* Audio buffer track */} +
+ Audio + + {(range, i) => { + const info = () => { + const timestamp = context.timestamp(); + if (timestamp === undefined) return null; + return computeRange(range, timestamp, rangeColor(i(), context.buffering())); + }; + return ( + + {(rangeInfo) => ( +
+ + {rangeInfo().overflowSec}s + +
+ )} +
+ ); + }} +
+
+ + {/* Buffer target line (draggable) - wrapped in track-area container */} +
+
+ {`${Math.round(context.jitter())}ms`} +
+
+
+
+ ); +} diff --git a/js/hang-ui/src/watch/components/LatencySlider.tsx b/js/hang-ui/src/watch/components/LatencySlider.tsx index 93075e924..b9301e0a2 100644 --- a/js/hang-ui/src/watch/components/LatencySlider.tsx +++ b/js/hang-ui/src/watch/components/LatencySlider.tsx @@ -1,14 +1,15 @@ +import type { Moq } from "@moq/hang"; import useWatchUIContext from "../hooks/use-watch-ui"; -const MIN_RANGE = 0; -const MAX_RANGE = 5_000; -const RANGE_STEP = 100; +const MIN_RANGE = 0 as Moq.Time.Milli; +const MAX_RANGE = 5_000 as Moq.Time.Milli; +const RANGE_STEP = 100 as Moq.Time.Milli; export default function LatencySlider() { const context = useWatchUIContext(); const onInputChange = (event: Event) => { const target = event.currentTarget as HTMLInputElement; - const latency = parseFloat(target.value); + const latency = parseFloat(target.value) as Moq.Time.Milli; context.setJitter(latency); }; diff --git a/js/hang-ui/src/watch/components/WatchControls.tsx b/js/hang-ui/src/watch/components/WatchControls.tsx index b718454d8..d77281be1 100644 --- a/js/hang-ui/src/watch/components/WatchControls.tsx +++ b/js/hang-ui/src/watch/components/WatchControls.tsx @@ -1,5 +1,5 @@ +import BufferControl from "./BufferControl"; import FullscreenButton from "./FullscreenButton"; -import LatencySlider from "./LatencySlider"; import PlayPauseButton from "./PlayPauseButton"; import QualitySelector from "./QualitySelector"; import StatsButton from "./StatsButton"; @@ -17,7 +17,7 @@ export default function WatchControls() {
- +
diff --git a/js/hang-ui/src/watch/context.tsx b/js/hang-ui/src/watch/context.tsx index a81f17128..76fe7a43f 100644 --- a/js/hang-ui/src/watch/context.tsx +++ b/js/hang-ui/src/watch/context.tsx @@ -18,7 +18,7 @@ export type Rendition = { height?: number; }; -type WatchUIContextValues = { +export type WatchUIContextValues = { hangWatch: HangWatch; watchStatus: () => WatchStatus; isPlaying: () => boolean; @@ -28,8 +28,8 @@ type WatchUIContextValues = { togglePlayback: () => void; toggleMuted: () => void; buffering: () => boolean; - jitter: () => number; - setJitter: (value: number) => void; + jitter: () => Moq.Time.Milli; + setJitter: (value: Moq.Time.Milli) => void; availableRenditions: () => Rendition[]; activeRendition: () => string | undefined; setActiveRendition: (name: string | undefined) => void; @@ -37,7 +37,7 @@ type WatchUIContextValues = { setIsStatsPanelVisible: (visible: boolean) => void; isFullscreen: () => boolean; toggleFullscreen: () => void; - timestamp: () => number; + timestamp: () => Moq.Time.Milli | undefined; videoBuffered: () => BufferedRanges; audioBuffered: () => BufferedRanges; }; @@ -76,8 +76,8 @@ export default function WatchUIContextProvider(props: WatchUIContextProviderProp props.hangWatch.audio.muted.update((muted) => !muted); }; - const setJitter = (latency: number) => { - props.hangWatch.jitter.set(latency as Moq.Time.Milli); + const setJitter = (latency: Moq.Time.Milli) => { + props.hangWatch.jitter.set(latency); }; const setActiveRenditionValue = (name: string | undefined) => { @@ -88,7 +88,7 @@ export default function WatchUIContextProvider(props: WatchUIContextProviderProp }; // Use solid helper for the new signals - const timestamp = solid(props.hangWatch.timestamp); + const timestamp = solid(props.hangWatch.video.timestamp); const videoBuffered = solid(props.hangWatch.video.buffered); const audioBuffered = solid(props.hangWatch.audio.buffered); @@ -157,8 +157,8 @@ export default function WatchUIContextProvider(props: WatchUIContextProviderProp }); signals.effect((effect) => { - const buffering = effect.get(watch.buffering); - setBuffering(buffering); + const stalled = effect.get(watch.video.stalled); + setBuffering(stalled); }); signals.effect((effect) => { diff --git a/js/hang-ui/src/watch/hooks/use-watch-ui.ts b/js/hang-ui/src/watch/hooks/use-watch-ui.ts index 9a5254062..b0e3d0d94 100644 --- a/js/hang-ui/src/watch/hooks/use-watch-ui.ts +++ b/js/hang-ui/src/watch/hooks/use-watch-ui.ts @@ -1,7 +1,7 @@ import { useContext } from "solid-js"; -import { WatchUIContext } from "../context"; +import { WatchUIContext, type WatchUIContextValues } from "../context"; -export default function useWatchUIContext() { +export default function useWatchUIContext(): WatchUIContextValues { const context = useContext(WatchUIContext); if (!context) { diff --git a/js/hang-ui/src/watch/styles/index.css b/js/hang-ui/src/watch/styles/index.css index cab710734..5ad901831 100644 --- a/js/hang-ui/src/watch/styles/index.css +++ b/js/hang-ui/src/watch/styles/index.css @@ -3,6 +3,13 @@ @import "../../shared/components/button/button.css"; @import "../../shared/components/stats/styles/index.css"; +/* Color variables for buffer states */ +:root { + --buffer-green: #4ade80; + --buffer-yellow: #facc15; + --buffer-red: #f87171; +} + .watchVideoContainer { display: block; position: relative; @@ -202,3 +209,123 @@ background: #1a1a1a; color: #fff; } + +/* Buffer Control styles */ +.bufferControlContainer { + display: flex; + flex-direction: column; + gap: 8px; + padding: 8px 12px; +} + +.bufferVisualization { + position: relative; + width: 100%; + height: 52px; + background: rgba(255, 255, 255, 0.1); + border-radius: 4px; + cursor: pointer; + user-select: none; + padding-left: 48px; + box-sizing: border-box; +} + +.bufferPlayhead { + position: absolute; + left: 48px; + top: 0; + bottom: 0; + width: 2px; + background: #fff; + z-index: 3; +} + +.bufferTrack { + position: absolute; + left: 48px; + right: 0; + height: 20px; + display: flex; + align-items: center; +} + +.bufferTrack--video { + top: 4px; +} + +.bufferTrack--audio { + top: 28px; +} + +.bufferTrackLabel { + position: absolute; + left: -46px; + width: 40px; + font-size: 10px; + color: rgba(255, 255, 255, 0.6); + text-align: right; + padding-right: 4px; + pointer-events: none; + box-sizing: border-box; +} + +.bufferRange { + position: absolute; + top: 2px; + height: calc(100% - 4px); + border-radius: 2px; + min-width: 2px; + opacity: 0.85; + display: flex; + align-items: center; + justify-content: flex-end; + overflow: hidden; +} + +.bufferOverflowLabel { + font-size: 9px; + color: rgba(0, 0, 0, 0.7); + padding-right: 4px; + font-weight: 500; +} + +.bufferTargetArea { + position: absolute; + top: 0; + bottom: 0; + left: 48px; + right: 0; + pointer-events: none; +} + +.bufferTargetLine { + position: absolute; + top: 0; + bottom: 0; + width: 3px; + background: var(--color-orange-400, #fb923c); + z-index: 2; + cursor: ew-resize; + border-radius: 1px; + pointer-events: auto; +} + +.bufferTargetLabel { + position: absolute; + bottom: 100%; + left: 50%; + transform: translateX(-50%); + font-size: 11px; + font-weight: 500; + color: var(--color-orange-400, #fb923c); + white-space: nowrap; + margin-bottom: 2px; + opacity: 0; + transition: opacity 0.15s ease; + pointer-events: none; +} + +.bufferVisualization:hover .bufferTargetLabel, +.bufferVisualization.dragging .bufferTargetLabel { + opacity: 1; +} diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index 5e1b4f4b0..c3823d9e7 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -1,6 +1,6 @@ import type { Time } from "@moq/lite"; import * as Moq from "@moq/lite"; -import { Effect, Signal } from "@moq/signals"; +import { Effect, type Getter, Signal } from "@moq/signals"; export interface Source { byteLength: number; @@ -11,7 +11,6 @@ export interface Frame { data: Uint8Array; timestamp: Time.Micro; keyframe: boolean; - group: number; } // A Helper class to encode frames into a track. @@ -65,6 +64,13 @@ export interface ConsumerProps { latency?: Signal | Time.Milli; } +export interface BufferedRange { + start: Time.Milli; + end: Time.Milli; +} + +export type BufferedRanges = BufferedRange[]; + interface Group { consumer: Moq.Group; frames: Frame[]; // decode order @@ -80,6 +86,9 @@ export class Consumer { // Wake up the consumer when a new frame is available. #notify?: () => void; + #buffered = new Signal([]); + readonly buffered: Getter = this.#buffered; + #signals = new Effect(); constructor(track: Moq.Track, props?: ConsumerProps) { @@ -143,7 +152,6 @@ export class Consumer { data, timestamp, keyframe, - group: group.consumer.sequence, }; keyframe = false; @@ -154,6 +162,8 @@ export class Consumer { group.latest = timestamp; } + this.#updateBuffered(); + if (group.consumer.sequence === this.#active) { this.#notify?.(); this.#notify = undefined; @@ -219,12 +229,16 @@ export class Consumer { // NOTE: Can't be undefined, because we checked above. this.#active = this.#groups[0]?.consumer.sequence; + this.#updateBuffered(); + // Wake up any consumers waiting for a new frame. this.#notify?.(); this.#notify = undefined; } - async decode(): Promise { + // Returns the next frame in order, along with the group number. + // If frame is undefined, the group is done. + async next(): Promise<{ frame: Frame | undefined; group: number } | undefined> { for (;;) { if ( this.#groups.length > 0 && @@ -232,12 +246,19 @@ export class Consumer { this.#groups[0].consumer.sequence <= this.#active ) { const frame = this.#groups[0].frames.shift(); - if (frame) return frame; + if (frame) { + this.#updateBuffered(); + return { frame, group: this.#groups[0].consumer.sequence }; + } // Check if the group is done and then remove it. if (this.#active > this.#groups[0].consumer.sequence) { - this.#groups.shift(); - continue; + const group = this.#groups.shift(); + if (group) { + this.#updateBuffered(); + // Always true + return { frame: undefined, group: group.consumer.sequence }; + } } } @@ -263,6 +284,30 @@ export class Consumer { return { timestamp: timestamp as Time.Micro, data }; } + #updateBuffered(): void { + // Compute buffered ranges from all groups + // Each contiguous sequence of groups forms a buffered range + const ranges: BufferedRanges = []; + + for (const group of this.#groups) { + const first = group.frames.at(0); + if (!first || !group.latest) continue; + + const start = Moq.Time.Milli.fromMicro(first.timestamp); + const end = Moq.Time.Milli.fromMicro(group.latest); + + // Try to merge with the last range if contiguous + const last = ranges.at(-1); + if (last && last.end >= start) { + last.end = Math.max(last.end, end) as Time.Milli; + } else { + ranges.push({ start, end }); + } + } + + this.#buffered.set(ranges); + } + close(): void { this.#signals.close(); diff --git a/js/hang/src/watch/audio/decoder.ts b/js/hang/src/watch/audio/decoder.ts index 9bfb44977..702132ce7 100644 --- a/js/hang/src/watch/audio/decoder.ts +++ b/js/hang/src/watch/audio/decoder.ts @@ -1,5 +1,5 @@ import type * as Moq from "@moq/lite"; -import type { Time } from "@moq/lite"; +import { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; import * as Container from "../../container"; @@ -7,6 +7,7 @@ import * as Hex from "../../util/hex"; import * as libav from "../../util/libav"; import type { BufferedRanges } from "../backend"; import type * as Render from "./render"; +import type { ToMain } from "./render"; // Unfortunately, we need to use a Vite-exclusive import for now. import RenderWorklet from "./render-worklet.ts?worker&url"; import type { Source } from "./source"; @@ -40,7 +41,18 @@ export class Decoder { #stats = new Signal(undefined); readonly stats: Getter = this.#stats; - // Empty stub for WebCodecs (no traditional buffering) + // Current playback timestamp from worklet + #timestamp = new Signal(undefined); + readonly timestamp: Getter = this.#timestamp; + + // Whether the audio buffer is stalled (waiting to fill) + #stalled = new Signal(true); + readonly stalled: Getter = this.#stalled; + + // Decode buffer: audio sent to worklet but not yet played + #decodeBuffered = new Signal([]); + + // Combined buffered ranges (network jitter + decode buffer) #buffered = new Signal([]); readonly buffered: Getter = this.#buffered; @@ -54,6 +66,7 @@ export class Decoder { this.#signals.effect(this.#runWorklet.bind(this)); this.#signals.effect(this.#runEnabled.bind(this)); + this.#signals.effect(this.#runLatency.bind(this)); this.#signals.effect(this.#runDecoder.bind(this)); } @@ -99,10 +112,20 @@ export class Decoder { type: "init", rate: sampleRate, channels: channelCount, - latency: this.source.sync.latency.peek(), // TODO make it reactive + latency: this.source.sync.latency.peek(), // Updated reactively via #runLatency }; worklet.port.postMessage(init); + // Listen for state updates from worklet + worklet.port.onmessage = (event: MessageEvent) => { + if (event.data.type === "state") { + const timestamp = Time.Milli.fromMicro(event.data.timestamp); + this.#timestamp.set(timestamp); + this.#stalled.set(event.data.stalled); + this.#trimDecodeBuffered(timestamp); + } + }; + effect.set(this.#worklet, worklet); }); } @@ -117,6 +140,19 @@ export class Decoder { // NOTE: You should disconnect/reconnect the worklet to save power when disabled. } + #runLatency(effect: Effect): void { + const worklet = effect.get(this.#worklet); + if (!worklet) return; + + const latency = effect.get(this.source.sync.latency); + + const msg: Render.Latency = { + type: "latency", + latency, + }; + worklet.port.postMessage(msg); + } + #runDecoder(effect: Effect): void { const enabled = effect.get(this.enabled); if (!enabled) return; @@ -151,6 +187,13 @@ export class Decoder { }); effect.cleanup(() => consumer.close()); + // Combine network jitter buffer with decode buffer + effect.effect((inner) => { + const network = inner.get(consumer.buffered); + const decode = inner.get(this.#decodeBuffered); + this.#buffered.set(mergeBufferedRanges(network, decode)); + }); + effect.spawn(async () => { const loaded = await libav.polyfill(); if (!loaded) return; // cancelled @@ -168,8 +211,11 @@ export class Decoder { }); for (;;) { - const frame = await consumer.decode(); - if (!frame) break; + const next = await consumer.next(); + if (!next) break; + + const { frame } = next; + if (!frame) continue; // Skip over group done notifications. this.#stats.update((stats) => ({ bytesReceived: (stats?.bytesReceived ?? 0) + frame.data.byteLength, @@ -192,6 +238,13 @@ export class Decoder { const { timescale } = config.container; const description = config.description ? Hex.toBytes(config.description) : undefined; + // For CMAF, just use decode buffer (no network jitter buffer yet) + // TODO: Add CMAF consumer wrapper for latency control + effect.effect((inner) => { + const decode = inner.get(this.#decodeBuffered); + this.#buffered.set(decode); + }); + effect.spawn(async () => { const loaded = await libav.polyfill(); if (!loaded) return; // cancelled @@ -248,6 +301,7 @@ export class Decoder { #emit(sample: AudioData) { const timestamp = sample.timestamp as Time.Micro; + const timestampMilli = Time.Milli.fromMicro(timestamp); const worklet = this.#worklet.peek(); if (!worklet) { @@ -256,6 +310,14 @@ export class Decoder { return; } + // Calculate end time from sample duration + const durationMicro = ((sample.numberOfFrames / sample.sampleRate) * 1_000_000) as Time.Micro; + const durationMilli = Time.Milli.fromMicro(durationMicro); + const end = (timestampMilli + durationMilli) as Time.Milli; + + // Add to decode buffer + this.#addDecodeBuffered(timestampMilli, end); + const channelData: Float32Array[] = []; for (let channel = 0; channel < sample.numberOfChannels; channel++) { const data = new Float32Array(sample.numberOfFrames); @@ -279,6 +341,36 @@ export class Decoder { sample.close(); } + #addDecodeBuffered(start: Time.Milli, end: Time.Milli): void { + if (start > end) return; + + this.#decodeBuffered.mutate((current) => { + for (const range of current) { + // Extend range if new sample overlaps or is adjacent (1ms tolerance for float precision) + if (start <= range.end + 1 && end >= range.start) { + range.start = Math.min(range.start, start) as Time.Milli; + range.end = Math.max(range.end, end) as Time.Milli; + return; + } + } + + current.push({ start, end }); + current.sort((a, b) => a.start - b.start); + }); + } + + #trimDecodeBuffered(timestamp: Time.Milli): void { + this.#decodeBuffered.mutate((current) => { + while (current.length > 0) { + if (current[0].end >= timestamp) { + current[0].start = Math.max(current[0].start, timestamp) as Time.Milli; + break; + } + current.shift(); + } + }); + } + close() { this.#signals.close(); } @@ -292,3 +384,24 @@ async function supported(config: Catalog.AudioConfig): Promise { }); return res.supported ?? false; } + +// Merge two sets of buffered ranges into one sorted list +function mergeBufferedRanges(a: BufferedRanges, b: BufferedRanges): BufferedRanges { + if (a.length === 0) return b; + if (b.length === 0) return a; + + const result: BufferedRanges = []; + const all = [...a, ...b].sort((x, y) => x.start - y.start); + + for (const range of all) { + const last = result.at(-1); + if (last && last.end >= range.start) { + // Merge overlapping ranges + last.end = Math.max(last.end, range.end) as Time.Milli; + } else { + result.push({ ...range }); + } + } + + return result; +} diff --git a/js/hang/src/watch/audio/mse.ts b/js/hang/src/watch/audio/mse.ts index 81be9b8b1..50ea24e4c 100644 --- a/js/hang/src/watch/audio/mse.ts +++ b/js/hang/src/watch/audio/mse.ts @@ -147,15 +147,25 @@ export class Mse implements Backend { let duration: Moq.Time.Micro | undefined; // Buffer one frame so we can compute accurate duration from the next frame's timestamp - let pending = await consumer.decode(); - if (!pending) return; + let pending: Container.Legacy.Frame; + for (;;) { + const next = await consumer.next(); + if (!next) return; + if (!next.frame) continue; // Skip over group done notifications. + + pending = next.frame; + break; + } for (;;) { - const next = await consumer.decode(); + const next = await consumer.next(); + if (next && !next.frame) continue; // Skip over group done notifications. + + const frame = next?.frame; // Compute duration from next frame's timestamp, or use last known duration if stream ended - if (next) { - duration = (next.timestamp - pending.timestamp) as Moq.Time.Micro; + if (frame) { + duration = (frame.timestamp - pending.timestamp) as Moq.Time.Micro; } // Wrap raw frame in moof+mdat @@ -174,8 +184,8 @@ export class Mse implements Backend { element.currentTime = element.buffered.start(0); } - if (!next) return; - pending = next; + if (!frame) return; + pending = frame; } }); } diff --git a/js/hang/src/watch/audio/render-worklet.ts b/js/hang/src/watch/audio/render-worklet.ts index 6b3684fd3..59b1fb842 100644 --- a/js/hang/src/watch/audio/render-worklet.ts +++ b/js/hang/src/watch/audio/render-worklet.ts @@ -1,9 +1,10 @@ -import type { Message } from "./render"; +import type { Message, State } from "./render"; import { AudioRingBuffer } from "./ring-buffer"; class Render extends AudioWorkletProcessor { #buffer?: AudioRingBuffer; #underflow = 0; + #stateCounter = 0; constructor() { super(); @@ -17,6 +18,9 @@ class Render extends AudioWorkletProcessor { } else if (type === "data") { if (!this.#buffer) throw new Error("buffer not initialized"); this.#buffer.write(event.data.timestamp, event.data.data); + } else if (type === "latency") { + if (!this.#buffer) throw new Error("buffer not initialized"); + this.#buffer.resize(event.data.latency); } else { const exhaustive: never = type; throw new Error(`unknown message type: ${exhaustive}`); @@ -35,6 +39,18 @@ class Render extends AudioWorkletProcessor { this.#underflow = 0; } + // Send state update every ~5 frames (~60/sec) to avoid excessive DOM updates + this.#stateCounter++; + if (this.#buffer && this.#stateCounter >= 5) { + this.#stateCounter = 0; + const state: State = { + type: "state", + timestamp: this.#buffer.timestamp, + stalled: this.#buffer.stalled, + }; + this.port.postMessage(state); + } + return true; } } diff --git a/js/hang/src/watch/audio/render.ts b/js/hang/src/watch/audio/render.ts index e5f9d9060..9b1f838f7 100644 --- a/js/hang/src/watch/audio/render.ts +++ b/js/hang/src/watch/audio/render.ts @@ -1,6 +1,14 @@ import type { Time } from "@moq/lite"; -export type Message = Init | Data; +export type Message = Init | Data | Latency; + +export type ToMain = State; + +export interface State { + type: "state"; + timestamp: Time.Micro; + stalled: boolean; +} export interface Data { type: "data"; @@ -14,3 +22,8 @@ export interface Init { channels: number; latency: Time.Milli; } + +export interface Latency { + type: "latency"; + latency: Time.Milli; +} diff --git a/js/hang/src/watch/audio/ring-buffer.test.ts b/js/hang/src/watch/audio/ring-buffer.test.ts index 6a4e3f5be..8e8537c5c 100644 --- a/js/hang/src/watch/audio/ring-buffer.test.ts +++ b/js/hang/src/watch/audio/ring-buffer.test.ts @@ -83,9 +83,9 @@ describe("writing data", () => { // Should have filled the gap with zeros expect(buffer.length).toBe(30); // 10 + 10 (gap) + 10 - // Exit refill mode by filling buffer + // Exit stalled mode by filling buffer write(buffer, 30 as Time.Milli, 70, { channels: 2, value: 0.0 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read and verify the gap was filled with zeros const output = read(buffer, 30, 2); @@ -111,9 +111,9 @@ describe("writing data", () => { it("should handle late-arriving samples (out-of-order writes)", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); - // Fill buffer to exit refill mode + // Fill buffer to exit stalled mode write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 0.0 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read 50 samples to advance read pointer to 50 read(buffer, 50, 1); @@ -148,9 +148,9 @@ describe("writing data", () => { it("should discard samples that are too old", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - // Exit refill mode by filling buffer + // Exit stalled mode by filling buffer write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read 60 samples, readIndex now at 60 read(buffer, 60, 2); @@ -181,10 +181,10 @@ describe("reading data", () => { it("should read available data", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - // Exit refill mode by filling the buffer + // Exit stalled mode by filling the buffer write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); - // Buffer should now be out of refill mode - expect(buffer.refilling).toBe(false); + // Buffer should now be out of stalled mode + expect(buffer.stalled).toBe(false); // Read some samples to make room (readIndex at 80) read(buffer, 80, 2); @@ -219,9 +219,9 @@ describe("reading data", () => { it("should handle partial reads", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - // Exit refill mode by filling the buffer + // Exit stalled mode by filling the buffer write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read some to make room (readIndex at 80) read(buffer, 80, 2); @@ -251,27 +251,27 @@ describe("reading data", () => { }); }); -describe("refill behavior", () => { - it("should start in refill mode", () => { +describe("stall behavior", () => { + it("should start in stalled mode", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - expect(buffer.refilling).toBe(true); + expect(buffer.stalled).toBe(true); - // Should not output anything in refill mode + // Should not output anything in stalled mode write(buffer, 0 as Time.Milli, 50, { channels: 2, value: 1.0 }); const output = read(buffer, 10, 2); expect(output[0].length).toBe(0); }); - it("should exit refill mode when buffer is full", () => { + it("should exit stalled mode when buffer is full", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); // Fill the buffer completely write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 1.0 }); // Write more data to trigger overflow handling - write(buffer, 10 as Time.Milli, 50, { channels: 2, value: 2.0 }); // This should exit refill mode + write(buffer, 10 as Time.Milli, 50, { channels: 2, value: 2.0 }); // This should exit stalled mode - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Now we should be able to read const output = read(buffer, 10, 2); @@ -285,7 +285,7 @@ describe("ring buffer wrapping", () => { // Fill the buffer write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 1.0 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read 50 samples to make room (readIndex at 50) const output1 = read(buffer, 50, 1); @@ -321,9 +321,9 @@ describe("multi-channel handling", () => { it("should handle stereo data correctly", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - // Exit refill mode by filling buffer + // Exit stalled mode by filling buffer write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.5 }); - expect(buffer.refilling).toBe(false); + expect(buffer.stalled).toBe(false); // Read some to make room read(buffer, 80, 2); @@ -373,9 +373,9 @@ describe("edge cases", () => { it("should handle fractional timestamps", () => { const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); - // Exit refill mode first + // Exit stalled mode first write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); - write(buffer, 10 as Time.Milli, 10, { channels: 2, value: 0.0 }); // This exits refill mode + write(buffer, 10 as Time.Milli, 10, { channels: 2, value: 0.0 }); // This exits stalled mode read(buffer, 110, 2); // Write with fractional timestamp that rounds @@ -386,3 +386,165 @@ describe("edge cases", () => { expect(output[0].length).toBeGreaterThan(0); }); }); + +describe("resize", () => { + it("should resize to a larger buffer", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + expect(buffer.capacity).toBe(100); + + // Write 50 samples + write(buffer, 0 as Time.Milli, 50, { channels: 1, value: 1.0 }); + expect(buffer.length).toBe(50); + + // Resize to larger buffer (200ms = 200 samples) + buffer.resize(200 as Time.Milli); + + expect(buffer.capacity).toBe(200); + expect(buffer.length).toBe(50); // Samples preserved + expect(buffer.stalled).toBe(true); // Should trigger stall + }); + + it("should resize to a smaller buffer and keep the most recent samples", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + expect(buffer.capacity).toBe(100); + + // Write 80 samples: first 40 with value 1.0, next 40 with value 2.0 + write(buffer, 0 as Time.Milli, 40, { channels: 1, value: 1.0 }); + write(buffer, 40 as Time.Milli, 40, { channels: 1, value: 2.0 }); + expect(buffer.length).toBe(80); + + // Resize to smaller buffer (50ms = 50 samples) + // Should keep the most recent 50 samples (samples 30-79) + buffer.resize(50 as Time.Milli); + + expect(buffer.capacity).toBe(50); + expect(buffer.length).toBe(50); // Truncated to new capacity + expect(buffer.stalled).toBe(true); // Should trigger stall + }); + + it("should be a no-op when capacity is unchanged", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + + // Exit stalled mode + write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 1.0 }); + expect(buffer.stalled).toBe(false); + + // Resize to same capacity + buffer.resize(100 as Time.Milli); + + // Should still not be stalled (no-op) + expect(buffer.stalled).toBe(false); + expect(buffer.capacity).toBe(100); + }); + + it("should throw on zero latency", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + expect(() => buffer.resize(0 as Time.Milli)).toThrow(/empty buffer/); + }); + + it("should handle resize with stereo data", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Write stereo data with different values per channel + const data = [new Float32Array(60), new Float32Array(60)]; + for (let i = 0; i < 60; i++) { + data[0][i] = 1.0; // Left channel + data[1][i] = 2.0; // Right channel + } + buffer.write(0 as Time.Micro, data); + expect(buffer.length).toBe(60); + + // Resize to smaller buffer + buffer.resize(50 as Time.Milli); + expect(buffer.capacity).toBe(50); + expect(buffer.length).toBe(50); + expect(buffer.stalled).toBe(true); + }); + + it("should handle resize when buffer is empty", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + expect(buffer.length).toBe(0); + + // Resize empty buffer + buffer.resize(200 as Time.Milli); + + expect(buffer.capacity).toBe(200); + expect(buffer.length).toBe(0); + expect(buffer.stalled).toBe(true); + }); + + it("should handle resize after partial read", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + + // Fill and exit stalled mode + write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 1.0 }); + expect(buffer.stalled).toBe(false); + + // Read 60 samples (readIndex at 60, writeIndex at 100) + read(buffer, 60, 1); + expect(buffer.length).toBe(40); + + // Write more data + write(buffer, 100 as Time.Milli, 30, { channels: 1, value: 2.0 }); + expect(buffer.length).toBe(70); // 130 - 60 + + // Resize to 50 samples - should keep most recent 50 (samples 80-129) + buffer.resize(50 as Time.Milli); + expect(buffer.capacity).toBe(50); + expect(buffer.length).toBe(50); + expect(buffer.stalled).toBe(true); + }); + + it("should exit stall and read new data after resize", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + + // Write some initial data + write(buffer, 0 as Time.Milli, 50, { channels: 1, value: 1.0 }); + + // Resize to smaller buffer + buffer.resize(50 as Time.Milli); + expect(buffer.stalled).toBe(true); + + // Write new data to fill the buffer and exit stall + // The overflow will discard preserved samples and advance readIndex + write(buffer, 50 as Time.Milli, 50, { channels: 1, value: 2.0 }); + expect(buffer.stalled).toBe(false); + + // Read should return the new data (value 2.0) + const output = read(buffer, 50, 1); + expect(output[0].length).toBe(50); + for (let i = 0; i < 50; i++) { + expect(output[0][i]).toBe(2.0); + } + }); + + it("should preserve samples correctly when resizing larger then filling", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 50 as Time.Milli }); + expect(buffer.capacity).toBe(50); + + // Write 30 samples + write(buffer, 0 as Time.Milli, 30, { channels: 1, value: 1.0 }); + expect(buffer.length).toBe(30); + + // Resize to larger buffer (100ms = 100 samples) + buffer.resize(100 as Time.Milli); + expect(buffer.capacity).toBe(100); + expect(buffer.length).toBe(30); // All samples preserved + expect(buffer.stalled).toBe(true); + + // Write 70 more samples to fill the buffer and exit stall + write(buffer, 30 as Time.Milli, 70, { channels: 1, value: 2.0 }); + expect(buffer.stalled).toBe(false); + expect(buffer.length).toBe(100); + + // Read all - should have 30 samples of 1.0 then 70 of 2.0 + const output = read(buffer, 100, 1); + expect(output[0].length).toBe(100); + for (let i = 0; i < 30; i++) { + expect(output[0][i]).toBe(1.0); + } + for (let i = 30; i < 100; i++) { + expect(output[0][i]).toBe(2.0); + } + }); +}); diff --git a/js/hang/src/watch/audio/ring-buffer.ts b/js/hang/src/watch/audio/ring-buffer.ts index 5bb799546..d10a84eb3 100644 --- a/js/hang/src/watch/audio/ring-buffer.ts +++ b/js/hang/src/watch/audio/ring-buffer.ts @@ -7,7 +7,7 @@ export class AudioRingBuffer { readonly rate: number; readonly channels: number; - #refill = true; + #stalled = true; constructor(props: { rate: number; channels: number; latency: Time.Milli }) { if (props.channels <= 0) throw new Error("invalid channels"); @@ -26,8 +26,12 @@ export class AudioRingBuffer { } } - get refilling(): boolean { - return this.#refill; + get stalled(): boolean { + return this.#stalled; + } + + get timestamp(): Time.Micro { + return Time.Micro.fromSecond((this.#readIndex / this.rate) as Time.Second); } get length(): number { @@ -38,6 +42,38 @@ export class AudioRingBuffer { return this.#buffer[0]?.length; } + resize(latency: Time.Milli): void { + const newCapacity = Math.ceil(this.rate * Time.Second.fromMilli(latency)); + if (newCapacity === this.capacity) return; + if (newCapacity === 0) throw new Error("empty buffer"); + + const newBuffer: Float32Array[] = []; + for (let i = 0; i < this.channels; i++) { + newBuffer[i] = new Float32Array(newCapacity); + } + + // Copy existing data, preserving the most recent samples + const samplesToKeep = Math.min(this.length, newCapacity); + if (samplesToKeep > 0) { + // Copy the most recent samples (closest to writeIndex) + const copyStart = this.#writeIndex - samplesToKeep; + for (let channel = 0; channel < this.channels; channel++) { + const src = this.#buffer[channel]; + const dst = newBuffer[channel]; + for (let i = 0; i < samplesToKeep; i++) { + const srcPos = (copyStart + i) % src.length; + const dstPos = i % dst.length; + dst[dstPos] = src[srcPos]; + } + } + } + + // Update state for the new buffer and trigger stall to refill + this.#buffer = newBuffer; + this.#readIndex = this.#writeIndex - samplesToKeep; + this.#stalled = true; + } + write(timestamp: Time.Micro, data: Float32Array[]): void { if (data.length !== this.channels) throw new Error("wrong number of channels"); @@ -62,8 +98,8 @@ export class AudioRingBuffer { // Check if we need to discard old samples to prevent overflow const overflow = end - this.#readIndex - this.#buffer[0].length; if (overflow >= 0) { - // Discard old samples and exit refill mode - this.#refill = false; + // Discard old samples and exit stalled mode + this.#stalled = false; this.#readIndex += overflow; } @@ -105,7 +141,7 @@ export class AudioRingBuffer { read(output: Float32Array[]): number { if (output.length !== this.channels) throw new Error("wrong number of channels"); - if (this.#refill) return 0; + if (this.#stalled) return 0; const samples = Math.min(this.#writeIndex - this.#readIndex, output[0].length); if (samples === 0) return 0; diff --git a/js/hang/src/watch/backend.ts b/js/hang/src/watch/backend.ts index b5fb59cb4..6b2c082f9 100644 --- a/js/hang/src/watch/backend.ts +++ b/js/hang/src/watch/backend.ts @@ -1,5 +1,5 @@ -import type * as Moq from "@moq/lite"; -import { Effect, type Getter, Signal } from "@moq/signals"; +import * as Moq from "@moq/lite"; +import { Effect, Signal } from "@moq/signals"; import * as Audio from "./audio"; import type { Broadcast } from "./broadcast"; import { Muxer } from "./mse"; @@ -8,16 +8,20 @@ import * as Video from "./video"; // Serializable representation of TimeRanges export interface BufferedRange { - start: number; // seconds - end: number; // seconds + start: Moq.Time.Milli; + end: Moq.Time.Milli; } export type BufferedRanges = BufferedRange[]; // Helper to convert DOM TimeRanges export function timeRangesToArray(ranges: TimeRanges): BufferedRanges { const result: BufferedRange[] = []; + for (let i = 0; i < ranges.length; i++) { - result.push({ start: ranges.start(i), end: ranges.end(i) }); + const start = Moq.Time.Milli.fromSecond(ranges.start(i) as Moq.Time.Second); + const end = Moq.Time.Milli.fromSecond(ranges.end(i) as Moq.Time.Second); + + result.push({ start, end }); } return result; } @@ -26,12 +30,6 @@ export interface Backend { // Whether audio/video playback is paused. paused: Signal; - // Whether the video is currently buffering, false when paused. - buffering: Getter; - - // Current playback position in seconds. - timestamp: Getter; - // Video specific signals. video?: Video.Backend; @@ -60,9 +58,15 @@ class VideoBackend implements Video.Backend { // The stats of the video. stats = new Signal(undefined); - // Buffered time ranges (for MSE backend). + // We're currently stalled waiting for the next frame + stalled = new Signal(false); + + // Buffered time ranges buffered = new Signal([]); + // The timestamp of the current frame + timestamp = new Signal(Moq.Time.Milli.zero); + constructor(source: Video.Source) { this.source = source; } @@ -81,7 +85,7 @@ class AudioBackend implements Audio.Backend { // The stats of the audio. stats = new Signal(undefined); - // Buffered time ranges (for MSE backend). + // Buffered time ranges buffered = new Signal([]); constructor(source: Audio.Source) { @@ -107,12 +111,6 @@ export class MultiBackend implements Backend { // Used to sync audio and video playback at a target delay. #sync: Sync; - #buffering = new Signal(false); - readonly buffering: Getter = this.#buffering; - - #timestamp = new Signal(0); - readonly timestamp: Getter = this.#timestamp; - signals = new Effect(); constructor(props?: MultiBackendProps) { @@ -169,17 +167,11 @@ export class MultiBackend implements Backend { // Proxy the read only signals to the backend. effect.proxy(this.video.stats, videoSource.stats); effect.proxy(this.video.buffered, videoSource.buffered); + effect.proxy(this.video.stalled, videoSource.stalled); + effect.proxy(this.video.timestamp, videoSource.timestamp); effect.proxy(this.audio.stats, audioSource.stats); effect.proxy(this.audio.buffered, audioSource.buffered); - - // Derive timestamp from video stats (in lock-step with frame signal) - effect.effect((e) => { - const stats = e.get(videoSource.stats); - if (stats) { - this.#timestamp.set(stats.timestamp / 1_000_000); // microseconds to seconds - } - }); } #runMse(effect: Effect, element: HTMLVideoElement): void { @@ -187,6 +179,7 @@ export class MultiBackend implements Backend { paused: this.paused, element, }); + effect.cleanup(() => mse.close()); const video = new Video.Mse(mse, this.#videoSource); const audio = new Audio.Mse(mse, this.#audioSource, { @@ -203,10 +196,14 @@ export class MultiBackend implements Backend { // Proxy the read only signals to the backend. effect.proxy(this.video.stats, video.stats); effect.proxy(this.video.buffered, video.buffered); + effect.proxy(this.video.stalled, video.stalled); + effect.proxy(this.video.timestamp, video.timestamp); effect.proxy(this.audio.stats, audio.stats); effect.proxy(this.audio.buffered, audio.buffered); + } - effect.proxy(this.#timestamp, mse.timestamp); + close(): void { + this.signals.close(); } } diff --git a/js/hang/src/watch/element.ts b/js/hang/src/watch/element.ts index 50b1fa451..7834b706f 100644 --- a/js/hang/src/watch/element.ts +++ b/js/hang/src/watch/element.ts @@ -1,6 +1,6 @@ import type { Time } from "@moq/lite"; import * as Moq from "@moq/lite"; -import { Effect, type Getter, Signal } from "@moq/signals"; +import { Effect, Signal } from "@moq/signals"; import type * as Audio from "./audio"; import { type Backend, MultiBackend } from "./backend"; import { Broadcast } from "./broadcast"; @@ -58,7 +58,7 @@ export default class HangWatch extends HTMLElement implements Backend { this.#backend = new MultiBackend({ broadcast: this.broadcast, }); - this.signals.cleanup(() => this.#backend.signals.close()); + this.signals.cleanup(() => this.#backend.close()); // Watch to see if the canvas element is added or removed. const setElement = () => { @@ -189,14 +189,6 @@ export default class HangWatch extends HTMLElement implements Backend { get video(): Video.Backend { return this.#backend.video; } - - get buffering(): Getter { - return this.#backend.buffering; - } - - get timestamp(): Getter { - return this.#backend.timestamp; - } } customElements.define("hang-watch", HangWatch); diff --git a/js/hang/src/watch/mse.ts b/js/hang/src/watch/mse.ts index 127ad0f47..fccb634da 100644 --- a/js/hang/src/watch/mse.ts +++ b/js/hang/src/watch/mse.ts @@ -20,12 +20,6 @@ export class Muxer { #mediaSource = new Signal(undefined); readonly mediaSource: Getter = this.#mediaSource; - #buffering = new Signal(false); - readonly buffering: Getter = this.#buffering; - - #timestamp = new Signal(0); - readonly timestamp: Getter = this.#timestamp; - #signals = new Effect(); constructor(sync: Sync, props?: MuxerProps) { @@ -36,9 +30,7 @@ export class Muxer { this.#signals.effect(this.#runMediaSource.bind(this)); this.#signals.effect(this.#runSkip.bind(this)); this.#signals.effect(this.#runTrim.bind(this)); - this.#signals.effect(this.#runBuffering.bind(this)); this.#signals.effect(this.#runPaused.bind(this)); - this.#signals.effect(this.#runTimestamp.bind(this)); } #runMediaSource(effect: Effect): void { @@ -113,20 +105,6 @@ export class Muxer { }, 1000); } - #runBuffering(effect: Effect): void { - const element = effect.get(this.element); - if (!element) return; - - const update = () => { - this.#buffering.set(element.readyState <= HTMLMediaElement.HAVE_CURRENT_DATA); - }; - - // TODO Are these the correct events to use? - effect.event(element, "waiting", update); - effect.event(element, "playing", update); - effect.event(element, "seeking", update); - } - #runPaused(effect: Effect): void { const element = effect.get(this.element); if (!element) return; @@ -137,29 +115,6 @@ export class Muxer { } else if (!paused && element.paused) { element.play().catch((e) => { console.error("[MSE] MediaElement play error:", e); - this.paused.set(true); - }); - } - } - - #runTimestamp(effect: Effect): void { - const element = effect.get(this.element); - if (!element) return; - - // Use requestVideoFrameCallback if available (frame-accurate) - if ("requestVideoFrameCallback" in element) { - const video = element as HTMLVideoElement; - let handle: number; - const onFrame = () => { - this.#timestamp.set(video.currentTime); - handle = video.requestVideoFrameCallback(onFrame); - }; - handle = video.requestVideoFrameCallback(onFrame); - effect.cleanup(() => video.cancelVideoFrameCallback(handle)); - } else { - // Fallback to timeupdate event - effect.event(element, "timeupdate", () => { - this.#timestamp.set(element.currentTime); }); } } diff --git a/js/hang/src/watch/video/backend.ts b/js/hang/src/watch/video/backend.ts index 47bbe0fa4..3f37b5b31 100644 --- a/js/hang/src/watch/video/backend.ts +++ b/js/hang/src/watch/video/backend.ts @@ -1,3 +1,4 @@ +import type * as Moq from "@moq/lite"; import type { Getter } from "@moq/signals"; import type { BufferedRanges } from "../backend"; import type { Source } from "./source"; @@ -10,12 +11,17 @@ export interface Backend { // The stats of the video. stats: Getter; + // Whether the video is currently buffering + stalled: Getter; + // Buffered time ranges (for MSE backend). buffered: Getter; + + // The timestamp of the current frame. + timestamp: Getter; } export interface Stats { frameCount: number; - timestamp: number; bytesReceived: number; } diff --git a/js/hang/src/watch/video/decoder.ts b/js/hang/src/watch/video/decoder.ts index f89482920..fa82b8465 100644 --- a/js/hang/src/watch/video/decoder.ts +++ b/js/hang/src/watch/video/decoder.ts @@ -10,6 +10,7 @@ import type { Source } from "./source"; // The amount of time to wait before considering the video to be buffering. const BUFFERING = 500 as Time.Milli; +const SWITCH = 100 as Time.Milli; export type DecoderProps = { enabled?: boolean | Signal; @@ -25,8 +26,7 @@ export class Decoder implements Backend { source: Source; // The current track running, held so we can cancel it when the new track is ready. - #pending?: Effect; - #active?: Effect; + #active = new Signal(undefined); // Expose the current frame to render as a signal #frame = new Signal(undefined); @@ -40,13 +40,13 @@ export class Decoder implements Backend { #display = new Signal<{ width: number; height: number } | undefined>(undefined); readonly display: Getter<{ width: number; height: number } | undefined> = this.#display; - #buffering = new Signal(false); - readonly buffering: Getter = this.#buffering; + #stalled = new Signal(false); + readonly stalled: Getter = this.#stalled; #stats = new Signal(undefined); readonly stats: Getter = this.#stats; - // Empty stub for WebCodecs (no traditional buffering) + // Combined buffered ranges (network jitter + decode buffer) #buffered = new Signal([]); readonly buffered: Getter = this.#buffered; @@ -59,90 +59,187 @@ export class Decoder implements Backend { this.source.supported.set(supported); // super hacky this.#signals.effect(this.#runPending.bind(this)); + this.#signals.effect(this.#runActive.bind(this)); this.#signals.effect(this.#runDisplay.bind(this)); this.#signals.effect(this.#runBuffering.bind(this)); } #runPending(effect: Effect): void { - const broadcast = effect.get(this.source.broadcast); - const enabled = effect.get(this.enabled); - const track = effect.get(this.source.track); - const config = effect.get(this.source.config); - const active = broadcast ? effect.get(broadcast.active) : undefined; - - if (!active || !config || !track || !enabled) { - // Stop the active track. - this.#active?.close(); - this.#active = undefined; - - this.#frame.update((prev) => { - prev?.close(); - return undefined; + const values = effect.getAll([this.enabled, this.source.broadcast, this.source.track, this.source.config]); + if (!values) return; + const [_, source, track, config] = values; + + const broadcast = effect.get(source.active); + if (!broadcast) return; + + // Start a new pending effect. + let pending: DecoderTrack | undefined = new DecoderTrack({ + source: this.source, + broadcast, + track, + config, + stats: this.#stats, + }); + + effect.cleanup(() => pending?.close()); + + effect.effect((effect) => { + if (!pending) return; + + const active = effect.get(this.#active); + if (active) { + const pendingTimestamp = effect.get(pending.timestamp); + const activeTimestamp = effect.get(active.timestamp); + + // Switch to the new track if it's ready and we've caught up enough. + if (!pendingTimestamp) return; + if (activeTimestamp && activeTimestamp > pendingTimestamp + SWITCH) return; + } + + // Upgrade the pending track to active. + // #runActive will be in charge of it now. + this.#active.set(pending); + pending = undefined; + }); + } + + #runActive(effect: Effect): void { + const active = effect.get(this.#active); + if (!active) return; + + effect.cleanup(() => active.close()); + + effect.proxy(this.#frame, active.frame); + effect.proxy(this.#timestamp, active.timestamp); + effect.proxy(this.#buffered, active.buffered); + } + + #runDisplay(effect: Effect): void { + const catalog = effect.get(this.source.catalog); + if (!catalog) return; + + const display = catalog.display; + if (display) { + effect.set(this.#display, { + width: display.width, + height: display.height, }); + return; + } + + const frame = effect.get(this.frame); + if (!frame) return; + effect.set(this.#display, { + width: frame.displayWidth, + height: frame.displayHeight, + }); + } + + #runBuffering(effect: Effect): void { + const enabled = effect.get(this.enabled); + if (!enabled) return; + + const frame = effect.get(this.frame); + if (!frame) { + this.#stalled.set(true); return; } - // Start a new pending effect. - this.#pending = new Effect(); + this.#stalled.set(false); - // NOTE: If the track catches up in time, it'll remove itself from #pending. - // We use #pending here on purpose so we only close it when it hasn't caught up yet. - effect.cleanup(() => this.#pending?.close()); + effect.timer(() => { + this.#stalled.set(true); + }, BUFFERING); + } + close() { + this.#frame.update((prev) => { + prev?.close(); + return undefined; + }); + + this.#signals.close(); + } +} + +interface DecoderTrackProps { + source: Source; + broadcast: Moq.Broadcast; + track: string; + config: Catalog.VideoConfig; + + stats: Signal; +} + +class DecoderTrack { + source: Source; + broadcast: Moq.Broadcast; + track: string; + config: RequiredDecoderConfig; + stats: Signal; + + timestamp = new Signal(undefined); + frame = new Signal(undefined); + + // Network jitter + decode buffer. + buffered = new Signal([]); + + // Decoded frames waiting to be rendered. + #buffered = new Signal([]); + + signals = new Effect(); + + constructor(props: DecoderTrackProps) { // Remove the codedWidth/Height from the config to avoid a hard reload if nothing else has changed. - const { codedWidth: _, codedHeight: __, ...minConfig } = config; + const { codedWidth: _, codedHeight: __, ...requiredConfig } = props.config; + + this.source = props.source; + this.broadcast = props.broadcast; + this.track = props.track; + this.config = requiredConfig; + this.stats = props.stats; - this.#runTrack(this.#pending, active, track, minConfig); + this.signals.effect(this.#run.bind(this)); } - #runTrack(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { - const sub = broadcast.subscribe(name, Catalog.PRIORITY.video); + #run(effect: Effect): void { + const sub = this.broadcast.subscribe(this.track, Catalog.PRIORITY.video); effect.cleanup(() => sub.close()); - effect.cleanup(() => { - if (this.#active === effect) { - this.#timestamp.set(undefined); - } - }); - const decoder = new VideoDecoder({ output: async (frame: VideoFrame) => { try { const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); - if (timestamp < (this.#timestamp.peek() ?? 0)) { + if (timestamp < (this.timestamp.peek() ?? 0)) { // Late frame, don't render it. return; } - if (this.#frame.peek() === undefined) { + if (this.frame.peek() === undefined) { // Render something while we wait for the sync to catch up. - this.#frame.set(frame.clone()); + this.frame.set(frame.clone()); } const wait = this.source.sync.wait(timestamp).then(() => true); const ok = await Promise.race([wait, effect.cancel]); if (!ok) return; - if (timestamp < (this.#timestamp.peek() ?? 0)) { + if (timestamp < (this.timestamp.peek() ?? 0)) { // Late frame, don't render it. // NOTE: This can happen when the ref is updated, such as on playback start. return; } - this.#timestamp.set(timestamp); + this.timestamp.set(timestamp); + + // Trim the decode buffer as frames are rendered + this.#trimBuffered(timestamp); - this.#frame.update((prev) => { + this.frame.update((prev) => { prev?.close(); return frame.clone(); // avoid closing the frame here }); - - // If the track switch was pending, complete it now. - if (this.#pending === effect) { - this.#active?.close(); - this.#active = effect; - this.#pending = undefined; - } } finally { frame.close(); } @@ -156,65 +253,96 @@ export class Decoder implements Backend { effect.cleanup(() => decoder.close()); // Input processing - depends on container type - if (config.container.kind === "cmaf") { - this.#runCmafTrack(effect, sub, config, decoder); + if (this.config.container.kind === "cmaf") { + this.#runCmaf(effect, sub, decoder); } else { - this.#runLegacyTrack(effect, sub, config, decoder); + this.#runLegacy(effect, sub, decoder); } } - #runLegacyTrack(effect: Effect, sub: Moq.Track, config: RequiredDecoderConfig, decoder: VideoDecoder): void { + #runLegacy(effect: Effect, sub: Moq.Track, decoder: VideoDecoder): void { // Create consumer that reorders groups/frames up to the provided latency. const consumer = new Container.Legacy.Consumer(sub, { latency: this.source.sync.latency, }); effect.cleanup(() => consumer.close()); + // Combine network jitter buffer with decode buffer + effect.effect((inner) => { + const network = inner.get(consumer.buffered); + const decode = inner.get(this.#buffered); + this.buffered.set(mergeBufferedRanges(network, decode)); + }); + decoder.configure({ - ...config, - description: config.description ? Hex.toBytes(config.description) : undefined, - optimizeForLatency: config.optimizeForLatency ?? true, + ...this.config, + description: this.config.description ? Hex.toBytes(this.config.description) : undefined, + optimizeForLatency: this.config.optimizeForLatency ?? true, // @ts-expect-error Only supported by Chrome, so the renderer has to flip manually. flip: false, }); + let previous: { timestamp: Time.Micro; group: number; final: boolean } | undefined; + effect.spawn(async () => { for (;;) { - const next = await Promise.race([consumer.decode(), effect.cancel]); + const next = await Promise.race([consumer.next(), effect.cancel]); if (!next) break; + const { frame, group } = next; + + if (!frame) { + if (previous) { + previous.final = true; + } + // The group is done + continue; + } + // Mark that we received this frame right now. - this.source.sync.received(Time.Milli.fromMicro(next.timestamp as Time.Micro)); + this.source.sync.received(Time.Milli.fromMicro(frame.timestamp as Time.Micro)); const chunk = new EncodedVideoChunk({ - type: next.keyframe ? "key" : "delta", - data: next.data, - timestamp: next.timestamp, + type: frame.keyframe ? "key" : "delta", + data: frame.data, + timestamp: frame.timestamp, }); // Track both frame count and bytes received for stats in the UI - this.#stats.update((current) => ({ + this.stats.update((current) => ({ frameCount: (current?.frameCount ?? 0) + 1, - timestamp: next.timestamp, - bytesReceived: (current?.bytesReceived ?? 0) + next.data.byteLength, + bytesReceived: (current?.bytesReceived ?? 0) + frame.data.byteLength, })); + // Track decode buffer: frames sent to decoder but not yet rendered + if (previous?.group === group || (previous?.final && previous.group + 1 === group)) { + const start = Time.Milli.fromMicro(previous.timestamp); + const end = Time.Milli.fromMicro(frame.timestamp); + this.#addBuffered(start, end); + } + + previous = { + timestamp: frame.timestamp, + group, + final: false, + }; + decoder.decode(chunk); } }); } - #runCmafTrack(effect: Effect, sub: Moq.Track, config: RequiredDecoderConfig, decoder: VideoDecoder): void { - if (config.container.kind !== "cmaf") return; + #runCmaf(effect: Effect, sub: Moq.Track, decoder: VideoDecoder): void { + if (this.config.container.kind !== "cmaf") return; - const { timescale } = config.container; - const description = config.description ? Hex.toBytes(config.description) : undefined; + const { timescale } = this.config.container; + const description = this.config.description ? Hex.toBytes(this.config.description) : undefined; // Configure decoder with description from catalog decoder.configure({ - codec: config.codec, + codec: this.config.codec, description, - optimizeForLatency: config.optimizeForLatency ?? true, + optimizeForLatency: this.config.optimizeForLatency ?? true, // @ts-expect-error Only supported by Chrome, so the renderer has to flip manually. flip: false, }); @@ -245,9 +373,8 @@ export class Decoder implements Backend { this.source.sync.received(Time.Milli.fromMicro(sample.timestamp as Time.Micro)); // Track stats - this.#stats.update((current) => ({ + this.stats.update((current) => ({ frameCount: (current?.frameCount ?? 0) + 1, - timestamp: sample.timestamp, bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, })); @@ -262,53 +389,67 @@ export class Decoder implements Backend { }); } - #runDisplay(effect: Effect): void { - const catalog = effect.get(this.source.catalog); - if (!catalog) return; - - const display = catalog.display; - if (display) { - effect.set(this.#display, { - width: display.width, - height: display.height, - }); - return; - } - - const frame = effect.get(this.frame); - if (!frame) return; + // Add a range to the decode buffer (decoded, waiting to render) + #addBuffered(start: Time.Milli, end: Time.Milli): void { + if (start > end) return; + + this.#buffered.mutate((current) => { + for (const range of current) { + // Check if there's any overlap, then merge + if (range.start <= end && range.end >= start) { + range.start = Math.min(range.start, start) as Time.Milli; + range.end = Math.max(range.end, end) as Time.Milli; + return; + } + } - effect.set(this.#display, { - width: frame.displayWidth, - height: frame.displayHeight, + current.push({ start, end }); + current.sort((a, b) => a.start - b.start); }); } - #runBuffering(effect: Effect): void { - const enabled = effect.get(this.enabled); - if (!enabled) return; - - const frame = effect.get(this.frame); - if (!frame) { - this.#buffering.set(true); - return; - } - - this.#buffering.set(false); - - effect.timer(() => { - this.#buffering.set(true); - }, BUFFERING); + // Trim the decode buffer up to the rendered timestamp + #trimBuffered(timestamp: Time.Milli): void { + this.#buffered.mutate((current) => { + while (current.length > 0) { + if (current[0].end >= timestamp) { + current[0].start = Math.max(current[0].start, timestamp) as Time.Milli; + break; + } + current.shift(); + } + }); } - close() { - this.#frame.update((prev) => { + close(): void { + this.signals.close(); + + this.frame.update((prev) => { prev?.close(); return undefined; }); + } +} - this.#signals.close(); +// Merge two sets of buffered ranges into one sorted list +function mergeBufferedRanges(a: BufferedRanges, b: BufferedRanges): BufferedRanges { + if (a.length === 0) return b; + if (b.length === 0) return a; + + const result: BufferedRanges = []; + const all = [...a, ...b].sort((x, y) => x.start - y.start); + + for (const range of all) { + const last = result.at(-1); + if (last && last.end >= range.start) { + // Merge overlapping ranges + last.end = Math.max(last.end, range.end) as Time.Milli; + } else { + result.push({ ...range }); + } } + + return result; } async function supported(config: Catalog.VideoConfig): Promise { diff --git a/js/hang/src/watch/video/mse.ts b/js/hang/src/watch/video/mse.ts index ba990cf5f..25a577448 100644 --- a/js/hang/src/watch/video/mse.ts +++ b/js/hang/src/watch/video/mse.ts @@ -1,4 +1,4 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; import * as Container from "../../container"; @@ -22,6 +22,12 @@ export class Mse implements Backend { #buffered = new Signal([]); readonly buffered: Getter = this.#buffered; + #stalled = new Signal(false); + readonly stalled: Getter = this.#stalled; + + #timestamp = new Signal(Moq.Time.Milli.zero); + readonly timestamp: Getter = this.#timestamp; + signals = new Effect(); constructor(muxer: Muxer, source: Source) { @@ -30,6 +36,8 @@ export class Mse implements Backend { this.source.supported.set(supported); // super hacky this.signals.effect(this.#runMedia.bind(this)); + this.signals.effect(this.#runStalled.bind(this)); + this.signals.effect(this.#runTimestamp.bind(this)); } #runMedia(effect: Effect): void { @@ -147,15 +155,24 @@ export class Mse implements Backend { let duration: Moq.Time.Micro | undefined; // Buffer one frame so we can compute accurate duration from the next frame's timestamp - let pending = await consumer.decode(); - if (!pending) return; + let pending: Container.Legacy.Frame; + for (;;) { + const next = await consumer.next(); + if (!next) return; + if (!next.frame) continue; // Skip over group done notifications. + + pending = next.frame; + break; + } for (;;) { - const next = await consumer.decode(); + const next = await consumer.next(); + if (next && !next.frame) continue; // Skip over group done notifications. + const frame = next?.frame; // Compute duration from next frame's timestamp, or use last known duration if stream ended - if (next) { - duration = (next.timestamp - pending.timestamp) as Moq.Time.Micro; + if (frame) { + duration = (frame.timestamp - pending.timestamp) as Moq.Time.Micro; } // Wrap raw frame in moof+mdat @@ -174,12 +191,52 @@ export class Mse implements Backend { element.currentTime = element.buffered.start(0); } - if (!next) return; - pending = next; + if (!frame) return; + pending = frame; } }); } + #runStalled(effect: Effect): void { + const element = effect.get(this.muxer.element); + if (!element) return; + + const update = () => { + this.#stalled.set(element.readyState <= HTMLMediaElement.HAVE_CURRENT_DATA); + }; + + // TODO Are these the correct events to use? + effect.event(element, "waiting", update); + effect.event(element, "playing", update); + effect.event(element, "seeking", update); + } + + #runTimestamp(effect: Effect): void { + const element = effect.get(this.muxer.element); + if (!element) return; + + // Use requestVideoFrameCallback if available (frame-accurate) + if ("requestVideoFrameCallback" in element) { + const video = element as HTMLVideoElement; + + let handle: number; + const onFrame = () => { + const timestamp = Moq.Time.Milli.fromSecond(video.currentTime as Moq.Time.Second); + this.#timestamp.set(timestamp); + handle = video.requestVideoFrameCallback(onFrame); + }; + handle = video.requestVideoFrameCallback(onFrame); + + effect.cleanup(() => video.cancelVideoFrameCallback(handle)); + } else { + // Fallback to timeupdate event + effect.event(element, "timeupdate", () => { + const timestamp = Moq.Time.Milli.fromSecond(element.currentTime as Moq.Time.Second); + this.#timestamp.set(timestamp); + }); + } + } + close(): void { this.source.close(); this.signals.close(); diff --git a/js/signals/src/index.ts b/js/signals/src/index.ts index 4efcad230..167631be4 100644 --- a/js/signals/src/index.ts +++ b/js/signals/src/index.ts @@ -23,6 +23,7 @@ export interface Getter { export interface Setter { set(value: T | ((prev: T) => T)): void; + update(fn: (prev: T) => T): void; } export class Signal implements Getter, Setter { @@ -574,6 +575,6 @@ export class Effect { } proxy(dst: Setter, src: Getter): void { - this.subscribe(src, (value) => dst.set(value)); + this.subscribe(src, (value) => dst.update(() => value)); } } From 9f0332cd4c35dc927e0f9ada54b6daf0c3431737 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 31 Jan 2026 09:22:36 -0300 Subject: [PATCH 2/5] Fix some bugs n stuff. --- js/hang/src/container/cmaf/decode.ts | 20 ++++++++++++++++++++ js/hang/src/container/legacy.ts | 7 ++----- js/hang/src/watch/audio/mse.ts | 8 +++++++- js/hang/src/watch/mse.ts | 25 +++++++++++++++++++++++++ js/hang/src/watch/sync.ts | 16 +++++++++++----- js/hang/src/watch/video/mse.ts | 13 +++++++++++++ 6 files changed, 78 insertions(+), 11 deletions(-) diff --git a/js/hang/src/container/cmaf/decode.ts b/js/hang/src/container/cmaf/decode.ts index a42e19674..edca90ecf 100644 --- a/js/hang/src/container/cmaf/decode.ts +++ b/js/hang/src/container/cmaf/decode.ts @@ -3,6 +3,7 @@ * Used by WebCodecs to extract raw frames from CMAF container. */ +import type { Time } from "@moq/lite"; import { type MediaHeaderBox, type ParsedIsoBox, @@ -196,6 +197,25 @@ function extractDescription(entry: any): Uint8Array | undefined { return undefined; } +/** + * Extract just the base media decode time from a data segment (moof + mdat). + * This is a lighter-weight function when you only need the timestamp. + * + * @param segment - The moof + mdat data + * @param timescale - Time units per second (from init segment) + * @returns The base media decode time in microseconds + */ +export function decodeTimestamp(segment: Uint8Array, timescale: number): Time.Micro { + const boxes = readIsoBoxes(toArrayBuffer(segment), { readers: DATA_READERS }) as ParsedIsoBox[]; + + // Find moof > traf > tfdt for base media decode time + const tfdt = findBox(boxes, isBoxType("tfdt")); + const baseDecodeTime = tfdt?.baseMediaDecodeTime ?? 0; + + // Convert to microseconds + return ((baseDecodeTime * 1_000_000) / timescale) as Time.Micro; +} + /** * Parse a data segment (moof + mdat) to extract raw samples. * diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index c3823d9e7..6111ade9e 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -219,16 +219,13 @@ export class Consumer { if (this.#active !== undefined && first.consumer.sequence <= this.#active) { this.#groups.shift(); - console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); + this.#active = this.#groups[0]?.consumer.sequence; + console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#active}`); first.consumer.close(); first.frames.length = 0; } - // Advance to the next known group. - // NOTE: Can't be undefined, because we checked above. - this.#active = this.#groups[0]?.consumer.sequence; - this.#updateBuffered(); // Wake up any consumers waiting for a new frame. diff --git a/js/hang/src/watch/audio/mse.ts b/js/hang/src/watch/audio/mse.ts index 50ea24e4c..48fee4d29 100644 --- a/js/hang/src/watch/audio/mse.ts +++ b/js/hang/src/watch/audio/mse.ts @@ -1,4 +1,4 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; import * as Container from "../../container"; @@ -103,6 +103,8 @@ export class Mse implements Backend { ): void { if (config.container.kind !== "cmaf") throw new Error("unreachable"); + const timescale = config.container.timescale; + effect.spawn(async () => { // Generate init segment from catalog config (uses track_id from container) const initSegment = Container.Cmaf.createAudioInitSegment(config); @@ -114,6 +116,10 @@ export class Mse implements Backend { const frame = await sub.readFrame(); if (!frame) return; + // Extract the timestamp from the CMAF segment and mark when we received it. + const timestamp = Container.Cmaf.decodeTimestamp(frame, timescale); + this.source.sync.received(Moq.Time.Milli.fromMicro(timestamp)); + await this.#appendBuffer(sourceBuffer, frame); // Seek to the start of the buffer if we're behind it (for startup). diff --git a/js/hang/src/watch/mse.ts b/js/hang/src/watch/mse.ts index fccb634da..aa4952803 100644 --- a/js/hang/src/watch/mse.ts +++ b/js/hang/src/watch/mse.ts @@ -1,3 +1,4 @@ +import { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type { Sync } from "./sync"; @@ -31,6 +32,7 @@ export class Muxer { this.#signals.effect(this.#runSkip.bind(this)); this.#signals.effect(this.#runTrim.bind(this)); this.#signals.effect(this.#runPaused.bind(this)); + this.#signals.effect(this.#runSync.bind(this)); } #runMediaSource(effect: Effect): void { @@ -119,6 +121,29 @@ export class Muxer { } } + // Seek to the target position based on the reference and latency. + #runSync(effect: Effect): void { + const element = effect.get(this.element); + if (!element) return; + + // Don't seek when paused, otherwise we'll keep jerking around. + const paused = effect.get(this.paused); + if (paused) return; + + const reference = effect.get(this.#sync.reference); + if (reference === undefined) return; + + const latency = effect.get(this.#sync.latency); + + // Compute the target currentTime based on reference and latency. + // reference = performance.now() - frameTimestamp (in ms) when we received the earliest frame + // So the target media timestamp (in ms) at time `now` is: now - reference - latency + const target = (performance.now() - reference - latency) as Time.Milli; + + // Seek to the target position. + element.currentTime = Time.Milli.toSecond(target); + } + close(): void { this.#signals.close(); } diff --git a/js/hang/src/watch/sync.ts b/js/hang/src/watch/sync.ts index f62713e40..572ee0ce1 100644 --- a/js/hang/src/watch/sync.ts +++ b/js/hang/src/watch/sync.ts @@ -11,7 +11,8 @@ export class Sync { // The earliest time we've received a frame, relative to its timestamp. // This will keep being updated as we catch up to the live playhead then will be relatively static. // TODO Update this when RTT changes - #reference?: Time.Milli; + #reference = new Signal(undefined); + readonly reference: Signal = this.#reference; // The minimum buffer size, to account for network jitter. jitter: Signal; @@ -61,11 +62,12 @@ export class Sync { // Update the reference if this is the earliest frame we've seen, relative to its timestamp. received(timestamp: Time.Milli): void { const ref = (performance.now() - timestamp) as Time.Milli; + const current = this.#reference.peek(); - if (this.#reference && ref >= this.#reference) { + if (current !== undefined && ref >= current) { return; } - this.#reference = ref; + this.#reference.set(ref); this.#resolve(); this.#update = new Promise((resolve) => { @@ -75,7 +77,8 @@ export class Sync { // Sleep until it's time to render this frame. async wait(timestamp: Time.Milli): Promise { - if (!this.#reference) { + const reference = this.#reference.peek(); + if (reference === undefined) { throw new Error("reference not set; call update() first"); } @@ -85,7 +88,10 @@ export class Sync { const now = performance.now(); const ref = (now - timestamp) as Time.Milli; - const sleep = this.#reference - ref + this.#latency.peek(); + const currentRef = this.#reference.peek(); + if (currentRef === undefined) return; + + const sleep = currentRef - ref + this.#latency.peek(); if (sleep <= 0) return; const wait = new Promise((resolve) => setTimeout(resolve, sleep)).then(() => true); diff --git a/js/hang/src/watch/video/mse.ts b/js/hang/src/watch/video/mse.ts index 25a577448..2a23911a4 100644 --- a/js/hang/src/watch/video/mse.ts +++ b/js/hang/src/watch/video/mse.ts @@ -107,6 +107,8 @@ export class Mse implements Backend { const data = active.subscribe(track, Catalog.PRIORITY.video); effect.cleanup(() => data.close()); + const timescale = config.container.timescale; + effect.spawn(async () => { // Generate init segment from catalog config (uses track_id from container) const initSegment = Container.Cmaf.createVideoInitSegment(config); @@ -118,6 +120,10 @@ export class Mse implements Backend { const frame = await data.readFrame(); if (!frame) return; + // Extract the timestamp from the CMAF segment and mark when we received it. + const timestamp = Container.Cmaf.decodeTimestamp(frame, timescale); + this.source.sync.received(Moq.Time.Milli.fromMicro(timestamp)); + await this.#appendBuffer(sourceBuffer, frame); // Seek to the start of the buffer if we're behind it (for startup). @@ -162,6 +168,10 @@ export class Mse implements Backend { if (!next.frame) continue; // Skip over group done notifications. pending = next.frame; + + // Mark that we received this frame right now. + this.source.sync.received(Moq.Time.Milli.fromMicro(pending.timestamp as Moq.Time.Micro)); + break; } @@ -173,6 +183,9 @@ export class Mse implements Backend { // Compute duration from next frame's timestamp, or use last known duration if stream ended if (frame) { duration = (frame.timestamp - pending.timestamp) as Moq.Time.Micro; + + // Mark that we received this frame right now for latency calculation. + this.source.sync.received(Moq.Time.Milli.fromMicro(frame.timestamp as Moq.Time.Micro)); } // Wrap raw frame in moof+mdat From 2fc6c7e346e5f3ad73b4989f1b1f946f6f1d51cd Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 3 Feb 2026 12:58:37 -0800 Subject: [PATCH 3/5] PR changes --- js/hang-demo/src/index.html | 2 +- .../src/watch/components/BufferControl.tsx | 36 +++++++++++-------- js/hang-ui/src/watch/styles/index.css | 30 ++++++++-------- js/hang/src/watch/audio/mse.ts | 7 ++++ js/hang/src/watch/backend.ts | 1 - js/hang/src/watch/video/mse.ts | 3 ++ 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/js/hang-demo/src/index.html b/js/hang-demo/src/index.html index 180b6728c..38a76ee09 100644 --- a/js/hang-demo/src/index.html +++ b/js/hang-demo/src/index.html @@ -23,7 +23,7 @@ - `paused`: Video and audio are paused. - `muted`: Audio is muted. - `volume`: The audio volume (0-1). - - `buffer`: The (additional) buffer size in milliseconds. + - `jitter`: The target jitter buffer size in milliseconds. - `reload`: Whether to automatically reconnect when the broadcast goes offline/online. NOTE: Cloudflare doesn't support reload yet. diff --git a/js/hang-ui/src/watch/components/BufferControl.tsx b/js/hang-ui/src/watch/components/BufferControl.tsx index ba8aa3722..273aa941f 100644 --- a/js/hang-ui/src/watch/components/BufferControl.tsx +++ b/js/hang-ui/src/watch/components/BufferControl.tsx @@ -1,6 +1,6 @@ import { Moq } from "@moq/hang"; import type { BufferedRange } from "@moq/hang/watch"; -import { createMemo, createSignal, For, Show } from "solid-js"; +import { createMemo, createSignal, For, onCleanup, Show } from "solid-js"; import useWatchUIContext from "../hooks/use-watch-ui"; const MIN_RANGE = 0 as Moq.Time.Milli; @@ -79,11 +79,17 @@ export default function BufferControl(props: BufferControlProps) { document.removeEventListener("mouseup", onMouseUp); }; + // Cleanup listeners on unmount + onCleanup(() => { + document.removeEventListener("mousemove", onMouseMove); + document.removeEventListener("mouseup", onMouseUp); + }); + return ( -
+
{/* Buffer Visualization - interactive, click/drag to set buffer */}
{/* Playhead (left edge = current time) */} -
+
{/* Video buffer track */} -
- Video +
+ Video {(range, i) => { const info = () => { @@ -109,9 +115,9 @@ export default function BufferControl(props: BufferControlProps) { return ( {(rangeInfo) => ( -
+
- {rangeInfo().overflowSec}s + {rangeInfo().overflowSec}s
)} @@ -122,8 +128,8 @@ export default function BufferControl(props: BufferControlProps) {
{/* Audio buffer track */} -
- Audio +
+ Audio {(range, i) => { const info = () => { @@ -134,9 +140,9 @@ export default function BufferControl(props: BufferControlProps) { return ( {(rangeInfo) => ( -
+
- {rangeInfo().overflowSec}s + {rangeInfo().overflowSec}s
)} @@ -147,9 +153,9 @@ export default function BufferControl(props: BufferControlProps) {
{/* Buffer target line (draggable) - wrapped in track-area container */} -
-
- {`${Math.round(context.jitter())}ms`} +
+
+ {`${Math.round(context.jitter())}ms`}
diff --git a/js/hang-ui/src/watch/styles/index.css b/js/hang-ui/src/watch/styles/index.css index 5ad901831..995c63553 100644 --- a/js/hang-ui/src/watch/styles/index.css +++ b/js/hang-ui/src/watch/styles/index.css @@ -210,15 +210,15 @@ color: #fff; } -/* Buffer Control styles */ -.bufferControlContainer { +/* Buffer Control styles (BEM) */ +.buffer__container { display: flex; flex-direction: column; gap: 8px; padding: 8px 12px; } -.bufferVisualization { +.buffer__visualization { position: relative; width: 100%; height: 52px; @@ -230,7 +230,7 @@ box-sizing: border-box; } -.bufferPlayhead { +.buffer__playhead { position: absolute; left: 48px; top: 0; @@ -240,7 +240,7 @@ z-index: 3; } -.bufferTrack { +.buffer__track { position: absolute; left: 48px; right: 0; @@ -249,15 +249,15 @@ align-items: center; } -.bufferTrack--video { +.buffer__track--video { top: 4px; } -.bufferTrack--audio { +.buffer__track--audio { top: 28px; } -.bufferTrackLabel { +.buffer__track-label { position: absolute; left: -46px; width: 40px; @@ -269,7 +269,7 @@ box-sizing: border-box; } -.bufferRange { +.buffer__range { position: absolute; top: 2px; height: calc(100% - 4px); @@ -282,14 +282,14 @@ overflow: hidden; } -.bufferOverflowLabel { +.buffer__overflow-label { font-size: 9px; color: rgba(0, 0, 0, 0.7); padding-right: 4px; font-weight: 500; } -.bufferTargetArea { +.buffer__target-area { position: absolute; top: 0; bottom: 0; @@ -298,7 +298,7 @@ pointer-events: none; } -.bufferTargetLine { +.buffer__target-line { position: absolute; top: 0; bottom: 0; @@ -310,7 +310,7 @@ pointer-events: auto; } -.bufferTargetLabel { +.buffer__target-label { position: absolute; bottom: 100%; left: 50%; @@ -325,7 +325,7 @@ pointer-events: none; } -.bufferVisualization:hover .bufferTargetLabel, -.bufferVisualization.dragging .bufferTargetLabel { +.buffer__visualization:hover .buffer__target-label, +.buffer__visualization--dragging .buffer__target-label { opacity: 1; } diff --git a/js/hang/src/watch/audio/mse.ts b/js/hang/src/watch/audio/mse.ts index 48fee4d29..d6f3e2094 100644 --- a/js/hang/src/watch/audio/mse.ts +++ b/js/hang/src/watch/audio/mse.ts @@ -160,6 +160,10 @@ export class Mse implements Backend { if (!next.frame) continue; // Skip over group done notifications. pending = next.frame; + + // Mark that we received this frame for latency calculation. + this.source.sync.received(Moq.Time.Milli.fromMicro(pending.timestamp as Moq.Time.Micro)); + break; } @@ -172,6 +176,9 @@ export class Mse implements Backend { // Compute duration from next frame's timestamp, or use last known duration if stream ended if (frame) { duration = (frame.timestamp - pending.timestamp) as Moq.Time.Micro; + + // Mark that we received this frame for latency calculation. + this.source.sync.received(Moq.Time.Milli.fromMicro(frame.timestamp as Moq.Time.Micro)); } // Wrap raw frame in moof+mdat diff --git a/js/hang/src/watch/backend.ts b/js/hang/src/watch/backend.ts index 6b2c082f9..04e54a888 100644 --- a/js/hang/src/watch/backend.ts +++ b/js/hang/src/watch/backend.ts @@ -179,7 +179,6 @@ export class MultiBackend implements Backend { paused: this.paused, element, }); - effect.cleanup(() => mse.close()); const video = new Video.Mse(mse, this.#videoSource); const audio = new Audio.Mse(mse, this.#audioSource, { diff --git a/js/hang/src/watch/video/mse.ts b/js/hang/src/watch/video/mse.ts index 2a23911a4..6b1388b39 100644 --- a/js/hang/src/watch/video/mse.ts +++ b/js/hang/src/watch/video/mse.ts @@ -218,6 +218,9 @@ export class Mse implements Backend { this.#stalled.set(element.readyState <= HTMLMediaElement.HAVE_CURRENT_DATA); }; + // Set initial state + update(); + // TODO Are these the correct events to use? effect.event(element, "waiting", update); effect.event(element, "playing", update); From 419385af353ad20d4ac1369f67595f5710d7dc6d Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 3 Feb 2026 13:23:22 -0800 Subject: [PATCH 4/5] Fix some more incorrect .set usages. --- js/hang/src/watch/audio/decoder.ts | 4 ++-- js/hang/src/watch/video/decoder.ts | 18 +++++++++++++++++- js/signals/src/index.ts | 9 ++++++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/js/hang/src/watch/audio/decoder.ts b/js/hang/src/watch/audio/decoder.ts index 702132ce7..c06aa9f78 100644 --- a/js/hang/src/watch/audio/decoder.ts +++ b/js/hang/src/watch/audio/decoder.ts @@ -191,7 +191,7 @@ export class Decoder { effect.effect((inner) => { const network = inner.get(consumer.buffered); const decode = inner.get(this.#decodeBuffered); - this.#buffered.set(mergeBufferedRanges(network, decode)); + this.#buffered.update(() => mergeBufferedRanges(network, decode)); }); effect.spawn(async () => { @@ -242,7 +242,7 @@ export class Decoder { // TODO: Add CMAF consumer wrapper for latency control effect.effect((inner) => { const decode = inner.get(this.#decodeBuffered); - this.#buffered.set(decode); + this.#buffered.update(() => decode); }); effect.spawn(async () => { diff --git a/js/hang/src/watch/video/decoder.ts b/js/hang/src/watch/video/decoder.ts index fa82b8465..84f26405c 100644 --- a/js/hang/src/watch/video/decoder.ts +++ b/js/hang/src/watch/video/decoder.ts @@ -271,7 +271,7 @@ class DecoderTrack { effect.effect((inner) => { const network = inner.get(consumer.buffered); const decode = inner.get(this.#buffered); - this.buffered.set(mergeBufferedRanges(network, decode)); + this.buffered.update(() => mergeBufferedRanges(network, decode)); }); decoder.configure({ @@ -347,6 +347,12 @@ class DecoderTrack { flip: false, }); + // Use decode buffer directly (no network jitter buffer for CMAF yet) + effect.effect((inner) => { + const decode = inner.get(this.#buffered); + this.buffered.update(() => decode); + }); + effect.spawn(async () => { // Process data segments // TODO: Use a consumer wrapper for CMAF to support latency control @@ -355,6 +361,8 @@ class DecoderTrack { if (!group) break; effect.spawn(async () => { + let previous: Time.Micro | undefined; + try { for (;;) { const segment = await Promise.race([group.readFrame(), effect.cancel]); @@ -378,6 +386,14 @@ class DecoderTrack { bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, })); + // Track decode buffer + if (previous !== undefined) { + const start = Time.Milli.fromMicro(previous); + const end = Time.Milli.fromMicro(sample.timestamp as Time.Micro); + this.#addBuffered(start, end); + } + previous = sample.timestamp as Time.Micro; + decoder.decode(chunk); } } diff --git a/js/signals/src/index.ts b/js/signals/src/index.ts index 167631be4..cc7bf2a87 100644 --- a/js/signals/src/index.ts +++ b/js/signals/src/index.ts @@ -67,7 +67,14 @@ export class Signal implements Getter, Setter { // Don't even queue a microtask if the value is the EXACT same. // We don't use dequal here because we don't want to run it twice, only when it matters. - if (notify === undefined && old === this.#value) return; + if (notify === undefined && old === this.#value) { + if (DEV && value !== null && (typeof value === "object" || typeof value === "function")) { + console.warn( + "Signal.set() called with the same object reference. Changes won't propagate. Use update() or mutate() instead.", + ); + } + return; + } // If there are no subscribers, don't queue a microtask. if (this.#subscribers.size === 0 && this.#changed.size === 0) return; From 27092dc77ee64a43e61ab286460fcb45d2b4af5f Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 3 Feb 2026 13:41:57 -0800 Subject: [PATCH 5/5] Fix the MSE seeking. --- js/hang/src/watch/mse.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/js/hang/src/watch/mse.ts b/js/hang/src/watch/mse.ts index aa4952803..e28f34f47 100644 --- a/js/hang/src/watch/mse.ts +++ b/js/hang/src/watch/mse.ts @@ -67,7 +67,8 @@ export class Muxer { if (paused) return; // Use the computed latency (catalog jitter + user jitter) - const latency = effect.get(this.#sync.latency); + // Convert to seconds since DOM APIs use seconds + const latency = Time.Milli.toSecond(effect.get(this.#sync.latency)); effect.interval(() => { // Skip over gaps based on the effective latency. @@ -75,12 +76,13 @@ export class Muxer { if (buffered.length === 0) return; const last = buffered.end(buffered.length - 1); - const diff = last - element.currentTime; + const target = last - latency; + const seek = target - element.currentTime; - // Seek to maintain the target latency from the buffer end. - if (diff > latency && diff > 0.1) { - console.warn("skipping ahead", diff, "seconds"); - element.currentTime = last - latency; + // Seek forward if we're too far behind, or backward if we're too far ahead (>100ms) + if (seek > 0.1 || seek < -0.1) { + console.warn("seeking", seek > 0 ? "forward" : "backward", Math.abs(seek).toFixed(3), "seconds"); + element.currentTime = target; } }, 100); }