diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 3230125e0..1b25b1c4b 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1 +1 @@ -buy_me_a_coffee: https://buymeacoffee.com/kingrt +buy_me_a_coffee: kingrt diff --git a/demo/video-chat/backend-king-php/run-dev.sh b/demo/video-chat/backend-king-php/run-dev.sh index 425bd7ddb..4999fc957 100755 --- a/demo/video-chat/backend-king-php/run-dev.sh +++ b/demo/video-chat/backend-king-php/run-dev.sh @@ -118,9 +118,14 @@ start_backend() { local bind_port="$2" local worker_count worker_count="$(worker_count_for_mode "${mode}")" + local reuseport_value="${KING_HTTP1_ENABLE_REUSEPORT:-}" + if [[ -z "${reuseport_value}" && "${worker_count}" -gt 1 ]]; then + reuseport_value="1" + fi local worker_index=1 while (( worker_index <= worker_count )); do + KING_HTTP1_ENABLE_REUSEPORT="${reuseport_value}" \ VIDEOCHAT_KING_PORT="${bind_port}" \ VIDEOCHAT_KING_SERVER_MODE="${mode}" \ VIDEOCHAT_KING_WORKER_INDEX="${worker_index}" \ @@ -131,6 +136,9 @@ start_backend() { done echo "[video-chat][king-php-backend] started ${worker_count} worker(s) in ${mode} mode on port ${bind_port}" + if [[ "${reuseport_value}" == "1" && "${worker_count}" -gt 1 ]]; then + echo "[video-chat][king-php-backend] enabled SO_REUSEPORT for trusted ${mode} worker group" + fi } normalized_mode_override="$(echo "${SERVER_MODE_OVERRIDE}" | tr '[:upper:]' '[:lower:]' | xargs || true)" diff --git a/demo/video-chat/docker-compose.v1.yml b/demo/video-chat/docker-compose.v1.yml index e6e0b0990..4bac36615 100644 --- a/demo/video-chat/docker-compose.v1.yml +++ b/demo/video-chat/docker-compose.v1.yml @@ -126,7 +126,7 @@ services: VIDEOCHAT_V1_TURN_STATIC_AUTH_SECRET_FILE: "${VIDEOCHAT_V1_TURN_STATIC_AUTH_SECRET_FILE:-}" VIDEOCHAT_V1_TURN_EXTERNAL_IP: "${VIDEOCHAT_V1_TURN_EXTERNAL_IP:-}" VIDEOCHAT_V1_TURN_RELAY_MIN_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}" - VIDEOCHAT_V1_TURN_RELAY_MAX_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}" + VIDEOCHAT_V1_TURN_RELAY_MAX_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}" entrypoint: - sh - -lc @@ -161,7 +161,7 @@ services: ports: - "${VIDEOCHAT_V1_TURN_PORT:-3478}:3478/tcp" - "${VIDEOCHAT_V1_TURN_PORT:-3478}:3478/udp" - - "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}:${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}/udp" + - "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}:${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}/udp" restart: unless-stopped videochat-frontend-v1: @@ -254,6 +254,8 @@ services: VIDEOCHAT_EDGE_TURN_DOMAIN: "${VIDEOCHAT_DEPLOY_TURN_DOMAIN:-turn.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}" VIDEOCHAT_EDGE_CDN_DOMAIN: "${VIDEOCHAT_DEPLOY_CDN_DOMAIN:-cdn.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}" VIDEOCHAT_EDGE_CDN_ALIASES: "${VIDEOCHAT_DEPLOY_CDN_ALIASES:-cnd.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}" + VIDEOCHAT_EDGE_EXTERNAL_DOMAINS: "${VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS:-}" + VIDEOCHAT_EDGE_EXTERNAL_UPSTREAM: "${VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM:-}" VIDEOCHAT_EDGE_CERT_FILE: /run/certs/live/fullchain.pem VIDEOCHAT_EDGE_KEY_FILE: /run/certs/live/privkey.pem VIDEOCHAT_EDGE_API_UPSTREAM: videochat-backend-v1:18080 diff --git a/demo/video-chat/edge/edge.php b/demo/video-chat/edge/edge.php index d15a9458d..277134354 100644 --- a/demo/video-chat/edge/edge.php +++ b/demo/video-chat/edge/edge.php @@ -12,6 +12,15 @@ $turnDomain = strtolower(trim((string) (getenv('VIDEOCHAT_EDGE_TURN_DOMAIN') ?: 'turn.' . $domain))); $cdnDomain = strtolower(trim((string) (getenv('VIDEOCHAT_EDGE_CDN_DOMAIN') ?: 'cdn.' . $domain))); $cdnAliasInput = trim((string) (getenv('VIDEOCHAT_EDGE_CDN_ALIASES') ?: 'cnd.' . $domain)); +$externalDomainInput = trim((string) getenv('VIDEOCHAT_EDGE_EXTERNAL_DOMAINS')); +$externalDomains = []; +foreach (preg_split('/\s*,\s*/', $externalDomainInput) ?: [] as $externalDomain) { + $externalDomain = strtolower(trim((string) $externalDomain)); + if ($externalDomain !== '') { + $externalDomains[] = $externalDomain; + } +} +$externalDomains = array_values(array_unique($externalDomains)); $cdnDomains = [$cdnDomain]; foreach (preg_split('/\s*,\s*/', $cdnAliasInput) ?: [] as $alias) { $alias = strtolower(trim((string) $alias)); @@ -26,6 +35,8 @@ $apiUpstream = getenv('VIDEOCHAT_EDGE_API_UPSTREAM') ?: 'videochat-backend-v1:18080'; $wsUpstream = getenv('VIDEOCHAT_EDGE_WS_UPSTREAM') ?: 'videochat-backend-ws-v1:18080'; $sfuUpstream = getenv('VIDEOCHAT_EDGE_SFU_UPSTREAM') ?: 'videochat-backend-sfu-v1:18080'; +$externalUpstream = trim((string) getenv('VIDEOCHAT_EDGE_EXTERNAL_UPSTREAM')); +$socialPreviewImagePath = getenv('VIDEOCHAT_EDGE_SOCIAL_PREVIEW_IMAGE') ?: '/assets/orgas/kingrt/social/invitation-preview.png'; $maxHeaderBytes = (int) (getenv('VIDEOCHAT_EDGE_MAX_HEADER_BYTES') ?: '65536'); $connectTimeout = (float) (getenv('VIDEOCHAT_EDGE_CONNECT_TIMEOUT_SECONDS') ?: '5'); $httpIdleTimeout = (int) (getenv('VIDEOCHAT_EDGE_HTTP_IDLE_TIMEOUT_SECONDS') ?: '60'); @@ -286,7 +297,61 @@ }; }; -$serveStatic = static function ($client, array $request) use ($staticRoot, $writeResponse, $contentType, $cdnDomains, $assetVersion): void { +$escapeHtml = static function (string $value): string { + return htmlspecialchars($value, ENT_QUOTES | ENT_SUBSTITUTE, 'UTF-8'); +}; + +$absoluteHttpsUrl = static function (string $host, string $target) use ($domain): string { + $host = trim($host) !== '' ? $host : $domain; + $target = trim($target) !== '' ? $target : '/'; + if ($target[0] !== '/') { + $target = '/' . $target; + } + return 'https://' . $host . $target; +}; + +$injectSocialPreview = static function (string $body, array $request) use ($domain, $cdnDomain, $socialPreviewImagePath, $escapeHtml, $absoluteHttpsUrl): string { + $target = (string) ($request['target'] ?: ($request['path'] ?? '/')); + $path = (string) ($request['path'] ?? '/'); + $host = (string) ($request['host'] ?: $domain); + $assetHost = $cdnDomain !== '' ? $cdnDomain : $host; + $pageUrl = $absoluteHttpsUrl($host, $target); + $imageUrl = $absoluteHttpsUrl($assetHost, $socialPreviewImagePath); + $isInvite = str_starts_with($path, '/join/'); + $title = $isInvite ? "You're invited to a KINGRT video call" : 'KINGRT Video Chat'; + $description = $isInvite + ? 'Join the video call on KINGRT.' + : 'Run your own calls with KINGRT open-source video collaboration.'; + + $tags = [ + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + ]; + $meta = implode("\n ", $tags); + + if (str_contains($body, '')) { + return str_replace('', $meta, $body); + } + + return str_replace('', " {$meta}\n ", $body); +}; + +$serveStatic = static function ($client, array $request) use ($staticRoot, $writeResponse, $contentType, $cdnDomains, $assetVersion, $injectSocialPreview): void { $path = rawurldecode((string) $request['path']); $isCdnAsset = in_array($request['host'], $cdnDomains, true) || str_starts_with($path, '/cdn/'); $corsHeaders = $isCdnAsset @@ -324,6 +389,9 @@ } $body = (string) @file_get_contents($candidate); + if (basename($candidate) === 'index.html') { + $body = $injectSocialPreview($body, $request); + } $headers = [ 'Content-Type' => $contentType($candidate), 'Cache-Control' => basename($candidate) === 'index.html' @@ -395,11 +463,27 @@ $lastUpstreamReadProgress = $lastActivity; $lastClientWriteProgress = $lastActivity; $lastUpstreamWriteProgress = $lastActivity; + $closeWebSocketTunnel = static function () use (&$clientOpen, &$upstreamOpen, &$toClient, &$toUpstream): void { + // WebSocket tunnels cannot stay half-open: otherwise browser requests + // remain pending while the closed upstream socket sits in CLOSE_WAIT. + $clientOpen = false; + $upstreamOpen = false; + $toClient = ''; + $toUpstream = ''; + }; while ($clientOpen || $upstreamOpen || $toUpstream !== '' || $toClient !== '') { if ((microtime(true) - $lastActivity) > $idleTimeout) { break; } + // Upstream may reject a websocket handshake with HTTP bytes, then close. + // Keep the client side alive until that buffered response is flushed. + if ($isWebSocket && !$upstreamOpen && $toClient === '') { + $clientOpen = false; + } + if ($isWebSocket && !$clientOpen && $toUpstream === '') { + $upstreamOpen = false; + } if (!$clientOpen) { $toClient = ''; } @@ -447,6 +531,21 @@ foreach ($read as $stream) { $chunk = @fread($stream, 16384); if ($chunk === false) { + if ($isWebSocket) { + if ($stream === $upstreamStream && $toClient !== '') { + $upstreamOpen = false; + $madeProgress = true; + continue; + } + if ($stream === $client && $toUpstream !== '') { + $clientOpen = false; + $madeProgress = true; + continue; + } + $closeWebSocketTunnel(); + $madeProgress = true; + continue; + } if ($stream === $client) { $clientOpen = false; } else { @@ -456,6 +555,21 @@ } if ($chunk === '') { if (feof($stream)) { + if ($isWebSocket) { + if ($stream === $upstreamStream && $toClient !== '') { + $upstreamOpen = false; + $madeProgress = true; + continue; + } + if ($stream === $client && $toUpstream !== '') { + $clientOpen = false; + $madeProgress = true; + continue; + } + $closeWebSocketTunnel(); + $madeProgress = true; + continue; + } if ($stream === $client) { $clientOpen = false; } else { @@ -510,12 +624,20 @@ if ($stream === $upstreamStream && $toUpstream !== '') { $written = @fwrite($upstreamStream, $toUpstream); if ($written === false) { + if ($isWebSocket) { + $closeWebSocketTunnel(); + continue; + } $upstreamOpen = false; $toUpstream = ''; continue; } if ($written === 0) { if ((microtime(true) - $lastUpstreamWriteProgress) >= $writeStallTimeout) { + if ($isWebSocket) { + $closeWebSocketTunnel(); + continue; + } $upstreamOpen = false; $toUpstream = ''; } else { @@ -531,12 +653,20 @@ if ($stream === $client && $toClient !== '') { $written = @fwrite($client, $toClient); if ($written === false) { + if ($isWebSocket) { + $closeWebSocketTunnel(); + continue; + } $clientOpen = false; $toClient = ''; continue; } if ($written === 0) { if ((microtime(true) - $lastClientWriteProgress) >= $writeStallTimeout) { + if ($isWebSocket) { + $closeWebSocketTunnel(); + continue; + } $clientOpen = false; $toClient = ''; } else { @@ -559,9 +689,12 @@ @fclose($upstreamStream); }; -$route = static function (array $request) use ($domain, $apiDomain, $wsDomain, $sfuDomain, $turnDomain, $cdnDomains, $apiUpstream, $wsUpstream, $sfuUpstream): ?string { +$route = static function (array $request) use ($domain, $apiDomain, $wsDomain, $sfuDomain, $turnDomain, $cdnDomains, $externalDomains, $apiUpstream, $wsUpstream, $sfuUpstream, $externalUpstream): ?string { $host = $request['host']; $path = $request['path']; + if ($externalUpstream !== '' && in_array($host, $externalDomains, true)) { + return $externalUpstream; + } if (in_array($host, $cdnDomains, true)) { return 'static'; } diff --git a/demo/video-chat/frontend-vue/index.html b/demo/video-chat/frontend-vue/index.html index bab8a3839..e26f279fc 100644 --- a/demo/video-chat/frontend-vue/index.html +++ b/demo/video-chat/frontend-vue/index.html @@ -4,7 +4,8 @@ - King Video Chat (Vue) + + KINGRT Video Chat
diff --git a/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png b/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png new file mode 100644 index 000000000..cdfa5c784 Binary files /dev/null and b/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png differ diff --git a/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue b/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue index de63dc73f..309c7f530 100644 --- a/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue +++ b/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue @@ -218,6 +218,7 @@ import { handleAssetVersionSocketPayload, } from '../../../support/assetVersion'; import { attachForegroundReconnectHandlers } from '../../../support/foregroundReconnect'; +import { buildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints'; import { applyCallBackgroundPreset as applyBackgroundPreset, attachCallMediaDeviceWatcher, @@ -272,7 +273,11 @@ const { playSpeakerTestSound, startPreview, stopPreview, -} = createJoinAccessPreviewController({ previewVideoRef, state }); +} = createJoinAccessPreviewController({ + previewVideoRef, + state, + buildOptionalCallAudioCaptureConstraints, +}); function normalizeAccessId(value) { return String(value || '').trim().toLowerCase(); diff --git a/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js b/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js index 8f542921d..af7b759fc 100644 --- a/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js +++ b/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js @@ -1,6 +1,6 @@ import { nextTick } from 'vue'; import { BackgroundFilterController } from '../../realtime/background/controller'; -import { buildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints'; +import { buildOptionalCallAudioCaptureConstraints as defaultBuildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints'; import { callMediaPrefs } from '../../realtime/media/preferences'; function finiteNumber(value, fallback) { @@ -74,7 +74,9 @@ function applyVolumeToStreams(streams) { } } -function buildPreviewConstraints() { +function buildPreviewConstraints( + buildOptionalCallAudioCaptureConstraints = defaultBuildOptionalCallAudioCaptureConstraints, +) { const cameraDeviceId = String(callMediaPrefs.selectedCameraId || '').trim(); const microphoneDeviceId = String(callMediaPrefs.selectedMicrophoneId || '').trim(); return { @@ -94,7 +96,11 @@ function stopStreams(streams) { } } -export function createJoinAccessPreviewController({ previewVideoRef, state }) { +export function createJoinAccessPreviewController({ + previewVideoRef, + state, + buildOptionalCallAudioCaptureConstraints = defaultBuildOptionalCallAudioCaptureConstraints, +}) { const backgroundController = new BackgroundFilterController(); let rawStream = null; let previewStream = null; @@ -133,7 +139,7 @@ export function createJoinAccessPreviewController({ previewVideoRef, state }) { } try { - rawStream = await navigator.mediaDevices.getUserMedia(buildPreviewConstraints()); + rawStream = await navigator.mediaDevices.getUserMedia(buildPreviewConstraints(buildOptionalCallAudioCaptureConstraints)); applyVolumeToStreams([rawStream]); previewStream = rawStream; diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue b/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue index 34931e549..e471b8cfa 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue +++ b/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue @@ -512,17 +512,16 @@ const { getRoomId: () => activeRoomId.value, }); -let clearRemoteVideoStallTimer = () => {}; +let clearRemoteVideoStallTimer; let isNativeWebRtcRuntimePath = () => false; let isWlvcRuntimePath = () => false; -let nativeAudioBridgeFailureMessage = () => defaultNativeAudioBridgeFailureMessage(); -let resetWlvcEncoderAfterDroppedEncodedFrame = () => {}; -let resetBackgroundRuntimeMetrics = () => {}; -let restartSfuAfterVideoStall = () => {}; -let shouldBlockNativeRuntimeSignaling = () => false; -let shouldUseNativeAudioBridge = () => false; -let startRemoteVideoStallTimer = () => {}; -let stopActivityMonitor = () => {}; +let nativeAudioBridgeFailureMessage; +let resetBackgroundRuntimeMetrics; +let restartSfuAfterVideoStall; +let shouldBlockNativeRuntimeSignaling; +let shouldUseNativeAudioBridge; +let startRemoteVideoStallTimer; +let stopActivityMonitor; const routeCallRef = computed(() => String(route.params.callRef || '').trim()); const desiredRoomId = computed(() => normalizeRoomId(routeCallResolve.roomId || routeCallRef.value || 'lobby')); @@ -606,34 +605,30 @@ const showLeftSidebarRestoreButton = computed(() => { return !isCompactViewport.value && isShellLeftSidebarCollapsed.value; }); -let canProtectCurrentNativeTargets = () => false; -let canProtectCurrentSfuTargets = () => false; -let clearMediaSecurityHandshakeWatchdog = () => {}; -let clearMediaSecurityResyncTimer = () => {}; -let clearMediaSecuritySfuPublisherSeen = () => {}; -let clearMediaSecuritySignalCaches = () => {}; -let clearNativeAudioBridgeQuarantine = () => false; -let currentMediaSecurityRuntimePath = () => 'wlvc_sfu'; -let ensureMediaSecuritySession = () => null; -let ensureNativeAudioBridgeSecurityReady = async () => false; -let handleMediaSecuritySignal = async () => {}; -let hintMediaSecuritySync = () => {}; -let mediaSecurityEligibleTargetIds = () => []; -let mediaSecurityTargetIds = () => []; -let nativeAudioBridgeIsQuarantined = () => false; -let nativeAudioSecurityBannerMessage = computed(() => ''); -let noteMediaSecuritySfuPublisherSeen = () => {}; -let recoverMediaSecurityForPublisher = () => {}; -let reportNativeAudioBridgeFailure = () => {}; -let resyncNativeAudioBridgePeerAfterSecurityReady = () => false; -let scheduleMediaSecurityParticipantSync = () => {}; -let sendMediaSecurityHello = async () => false; -let sendMediaSecuritySenderKey = async () => false; -let shouldBypassNativeAudioProtectionForPeer = () => false; -let shouldRecoverMediaSecurityFromFrameError = () => false; -let shouldSendTransportOnlySfuFrame = () => false; -let startMediaSecurityHandshakeWatchdog = () => {}; -let syncMediaSecurityWithParticipants = async () => {}; +let canProtectCurrentSfuTargets; +let clearMediaSecurityHandshakeWatchdog; +let clearMediaSecurityResyncTimer; +let clearMediaSecuritySfuPublisherSeen; +let clearMediaSecuritySignalCaches; +let currentMediaSecurityRuntimePath; +let ensureMediaSecuritySession; +let ensureNativeAudioBridgeSecurityReady; +let handleMediaSecuritySignal; +let hintMediaSecuritySync; +let mediaSecurityTargetIds; +let nativeAudioBridgeIsQuarantined; +let nativeAudioSecurityBannerMessage; +let noteMediaSecuritySfuPublisherSeen; +let recoverMediaSecurityForPublisher; +let reportNativeAudioBridgeFailure; +let resyncNativeAudioBridgePeerAfterSecurityReady; +let scheduleMediaSecurityParticipantSync; +let sendMediaSecurityHello; +let shouldBypassNativeAudioProtectionForPeer; +let shouldRecoverMediaSecurityFromFrameError; +let shouldSendTransportOnlySfuFrame; +let startMediaSecurityHandshakeWatchdog; +let syncMediaSecurityWithParticipants; let appendChatMessage = () => {}; let applyActivitySnapshot = () => {}; let applyCallLayoutPayload = () => {}; @@ -642,53 +637,48 @@ let applyReactionEvent = () => {}; let applyRemoteControlState = () => false; let applyTypingEvent = () => {}; let clearAdmissionGate = () => {}; -let clearErrors = () => {}; -let clearLobbyActionText = () => {}; -let clearLobbyToastTimer = () => {}; -let clearTransientActivityPublishErrorNotice = () => {}; -let closeNativePeerConnection = () => {}; +let clearErrors; +let clearLobbyActionText; +let clearLobbyToastTimer; +let clearTransientActivityPublishErrorNotice; +let closeNativePeerConnection; let hangupCall = () => {}; let hideLobbyJoinToast = () => {}; let markParticipantActivity = () => {}; let normalizeLobbyEntry = (entry) => entry; let nativeAudioSecurityTelemetrySnapshot = () => null; let notifyLobbyJoinRequests = () => {}; -let peerControlSnapshot = () => ({ - handRaised: false, - cameraEnabled: true, - micEnabled: true, - screenEnabled: false, -}); +let peerControlSnapshot; let pruneParticipantActivity = () => {}; -let refreshUsersDirectory = async () => {}; +let refreshUsersDirectory; let refreshUsersDirectoryPresentation = () => {}; let reportNativeAudioSdpRejected = () => {}; let requestRoomSnapshot = () => {}; -let resetPeerControlState = () => {}; +let resetPeerControlState; let scheduleNativePeerAudioTrackDeadline = () => {}; -let sendRoomJoin = () => false; +let sendRoomJoin; let sendNativeOffer = async () => {}; -let setAdmissionGate = () => {}; -let setActiveTab = () => {}; +let setAdmissionGate; +let setActiveTab; let setNativePeerAudioBridgeState = () => {}; let setNotice = () => {}; let shouldSyncNativeLocalTracksBeforeOffer = () => false; -let shouldSuppressCallAckNotice = () => false; +let shouldSuppressCallAckNotice; let syncNativePeerConnectionsWithRoster = () => {}; let syncNativePeerLocalTracks = () => {}; let synchronizeNativePeerMediaElements = () => {}; let ensureNativePeerConnection = () => null; -let shouldSuppressExpectedSignalingError = () => false; -let syncControlStateToPeers = async () => false; -let syncModerationStateToPeers = async () => false; -let tryDirectJoinWithModeratorBypass = () => false; +let shouldSuppressExpectedSignalingError; +let syncControlStateToPeers; +let syncModerationStateToPeers; +let tryDirectJoinWithModeratorBypass; let applyCallOutputPreferences = () => {}; let currentSfuVideoProfile = computed(() => 'quality'); let downgradeSfuVideoQualityAfterEncodePressure = () => false; let initSFU = () => {}; let maybeFallbackToNativeRuntime = async () => false; -let removeSfuRemotePeersForUserId = () => false; -let setMediaRuntimePath = () => false; +let removeSfuRemotePeersForUserId; +let setMediaRuntimePath; let stopSfuTrackAnnounceTimer = () => {}; let switchMediaRuntimePath = async () => false; let teardownRemotePeer = () => {}; @@ -913,19 +903,16 @@ const mediaSecurityRuntimeState = { }; ({ - canProtectCurrentNativeTargets, canProtectCurrentSfuTargets, clearMediaSecurityHandshakeWatchdog, clearMediaSecurityResyncTimer, clearMediaSecuritySfuPublisherSeen, clearMediaSecuritySignalCaches, - clearNativeAudioBridgeQuarantine, currentMediaSecurityRuntimePath, ensureMediaSecuritySession, ensureNativeAudioBridgeSecurityReady, handleMediaSecuritySignal, hintMediaSecuritySync, - mediaSecurityEligibleTargetIds, mediaSecurityTargetIds, nativeAudioBridgeIsQuarantined, nativeAudioSecurityBannerMessage, @@ -935,7 +922,6 @@ const mediaSecurityRuntimeState = { resyncNativeAudioBridgePeerAfterSecurityReady, scheduleMediaSecurityParticipantSync, sendMediaSecurityHello, - sendMediaSecuritySenderKey, shouldBypassNativeAudioProtectionForPeer, shouldRecoverMediaSecurityFromFrameError, shouldSendTransportOnlySfuFrame, @@ -1091,11 +1077,11 @@ const localPublisherPipelineState = { const mediaStack = createCallWorkspaceMediaStack({ callbacks: { applyCallOutputPreferences: (...args) => applyCallOutputPreferences(...args), - canProtectCurrentSfuTargets, + canProtectCurrentSfuTargets: (...args) => canProtectCurrentSfuTargets(...args), captureClientDiagnostic, captureClientDiagnosticError, clearRemoteVideoContainer, - clearTransientActivityPublishErrorNotice, + clearTransientActivityPublishErrorNotice: (...args) => clearTransientActivityPublishErrorNotice(...args), currentSfuVideoProfile: (...args) => ( typeof currentSfuVideoProfile === 'function' ? currentSfuVideoProfile(...args) @@ -1261,7 +1247,6 @@ const mediaStack = createCallWorkspaceMediaStack({ isNativeWebRtcRuntimePath, isWlvcRuntimePath, nativeAudioBridgeFailureMessage, - resetWlvcEncoderAfterDroppedEncodedFrame, shouldBlockNativeRuntimeSignaling, shouldUseNativeAudioBridge, startRemoteVideoStallTimer, @@ -1410,9 +1395,9 @@ const nativeStack = createCallWorkspaceNativeStack({ createNativePeerAudioElement, createNativePeerVideoElement, currentMediaSecurityRuntimePath, - currentNativeAudioBridgeFailureMessage: nativeAudioBridgeFailureMessage, - currentShouldUseNativeAudioBridge: shouldUseNativeAudioBridge, - shouldUseNativeAudioBridge, + currentNativeAudioBridgeFailureMessage: (...args) => nativeAudioBridgeFailureMessage(...args), + currentShouldUseNativeAudioBridge: (...args) => shouldUseNativeAudioBridge(...args), + shouldUseNativeAudioBridge: (...args) => shouldUseNativeAudioBridge(...args), currentUserId: () => currentUserId.value, ensureLocalMediaForPublish: () => publishLocalTracks(), ensureMediaSecuritySession, @@ -1420,8 +1405,8 @@ const nativeStack = createCallWorkspaceNativeStack({ extractDiagnosticMessage, getMediaRuntimePath: () => mediaRuntimePath.value, getPeerByUserId: (userId) => nativePeerConnectionsRef.value.get(userId) || null, - getPeerControlSnapshot: peerControlSnapshot, - isNativeWebRtcRuntimePath, + getPeerControlSnapshot: (...args) => peerControlSnapshot(...args), + isNativeWebRtcRuntimePath: (...args) => isNativeWebRtcRuntimePath(...args), markParticipantActivity, mediaDebugLog, nativeAudioBridgeFailureMessage, @@ -1429,10 +1414,10 @@ const nativeStack = createCallWorkspaceNativeStack({ nativePeerHasLocalLiveAudioSender, renderCallVideoLayout: () => renderCallVideoLayout(), renderNativeRemoteVideos, - reportNativeAudioBridgeFailure, + reportNativeAudioBridgeFailure: (...args) => reportNativeAudioBridgeFailure(...args), reportNativeAudioSdpRejected, reconfigureLocalTracksFromSelectedDevices: (...args) => reconfigureLocalTracksFromSelectedDevices(...args), - resyncNativeAudioBridgePeerAfterSecurityReady, + resyncNativeAudioBridgePeerAfterSecurityReady: (...args) => resyncNativeAudioBridgePeerAfterSecurityReady(...args), sendSocketFrame, sessionToken: () => sessionState.sessionToken, shouldBypassNativeAudioProtectionForPeer, @@ -1565,11 +1550,11 @@ const { applyViewerContext, appendChatMessage: (...args) => appendChatMessage(...args), captureClientDiagnostic, - clearAdmissionGate, - clearErrors, - clearLobbyActionText, - clearTransientActivityPublishErrorNotice, - closeNativePeerConnection, + clearAdmissionGate: (...args) => clearAdmissionGate(...args), + clearErrors: (...args) => clearErrors(...args), + clearLobbyActionText: (...args) => clearLobbyActionText(...args), + clearTransientActivityPublishErrorNotice: (...args) => clearTransientActivityPublishErrorNotice(...args), + closeNativePeerConnection: (...args) => closeNativePeerConnection(...args), closeSocketLocal: (...args) => closeSocket(...args), downgradeSfuVideoQualityAfterEncodePressure: (...args) => downgradeSfuVideoQualityAfterEncodePressure(...args), ensureRoomBuckets, @@ -1578,28 +1563,29 @@ const { handleAssetVersionConnectionFailure, handleAssetVersionSocketClose, handleAssetVersionSocketPayload, - handleMediaSecuritySignal, + handleMediaSecuritySignal: (...args) => handleMediaSecuritySignal(...args), handleNativeSignalingEvent, - hideLobbyJoinToast, + hideLobbyJoinToast: (...args) => hideLobbyJoinToast(...args), mediaDebugLog, normalizeRoomId, redirectInvitedRouteToJoinModal, - refreshUsersDirectory, - refreshUsersDirectoryPresentation, + refreshUsersDirectory: (...args) => refreshUsersDirectory(...args), + refreshUsersDirectoryPresentation: (...args) => refreshUsersDirectoryPresentation(...args), removeParticipantFromSnapshot, - removeSfuRemotePeersForUserId, requestWlvcFullFrameKeyframe: (...args) => requestWlvcFullFrameKeyframe(...args), + removeSfuRemotePeersForUserId: (...args) => removeSfuRemotePeersForUserId(...args), + requestWlvcFullFrameKeyframe: (...args) => requestWlvcFullFrameKeyframe(...args), requestHeaders, requestRoomSnapshot, - resetPeerControlState, + resetPeerControlState: (...args) => resetPeerControlState(...args), scheduleNativeOfferRetryForUserId, sendMediaSecuritySync: (isReconnectOpen) => syncMediaSecurityWithParticipants(isReconnectOpen), - sendRoomJoin, - setAdmissionGate, + sendRoomJoin: (...args) => sendRoomJoin(...args), + setAdmissionGate: (...args) => setAdmissionGate(...args), setBackendWebSocketOrigin, - setNotice, - syncControlStateToPeers, - syncModerationStateToPeers, - tryDirectJoinWithModeratorBypass, + setNotice: (...args) => setNotice(...args), + syncControlStateToPeers: (...args) => syncControlStateToPeers(...args), + syncModerationStateToPeers: (...args) => syncModerationStateToPeers(...args), + tryDirectJoinWithModeratorBypass: (...args) => tryDirectJoinWithModeratorBypass(...args), }, constants: { callStateSignalTypes: CALL_STATE_SIGNAL_TYPES, @@ -1630,9 +1616,9 @@ const { serverRoomId, sessionState, sfuTransportState, - shouldBlockNativeRuntimeSignaling, - shouldSuppressCallAckNotice, - shouldSuppressExpectedSignalingError, + shouldBlockNativeRuntimeSignaling: (...args) => shouldBlockNativeRuntimeSignaling(...args), + shouldSuppressCallAckNotice: (...args) => shouldSuppressCallAckNotice(...args), + shouldSuppressExpectedSignalingError: (...args) => shouldSuppressExpectedSignalingError(...args), showAdmissionGate, socketRef, socketUrlForRoom, diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js index b10f8b388..97970d1ec 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js @@ -123,7 +123,7 @@ async function handleReadback(payload = {}) { let drawImageMs; let readbackMs; let imageData; - let frameSize; + let frameSize = null; try { frameSize = resolveWorkerFrameSize(source, payload); const context = ensureCaptureCanvas(frameSize.frameWidth, frameSize.frameHeight); diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js index 80a311287..d7e4ecf4a 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js @@ -546,7 +546,6 @@ export function createLocalPublisherPipelineHelpers({ mediaDebugLog('[SFU] WLVC encoder unavailable during source aspect sizing'); return; } - let frameImageData = imageData; let tilePatchMetadata = null; let tilePatchTransportMetrics = null; let encoded = null; @@ -580,7 +579,6 @@ export function createLocalPublisherPipelineHelpers({ videoProfile.frameQuality, ); if (patchEncoder) { - frameImageData = selectivePatchPlan.patchImageData; tilePatchMetadata = selectivePatchPlan.tilePatch; tilePatchTransportMetrics = { selection_tile_count: selectivePatchPlan.changedTileCount, @@ -588,7 +586,7 @@ export function createLocalPublisherPipelineHelpers({ selection_tile_ratio: Number(selectivePatchPlan.selectedTileRatio.toFixed(6)), selection_mask_guided: selectivePatchPlan.matteGuided, }; - encoded = patchEncoder.encodeFrame(frameImageData, timestamp); + encoded = patchEncoder.encodeFrame(selectivePatchPlan.patchImageData, timestamp); encodedFrameType = 'keyframe'; } } @@ -618,7 +616,6 @@ export function createLocalPublisherPipelineHelpers({ videoProfile.frameQuality, ); if (patchEncoder) { - frameImageData = backgroundSnapshotPlan.patchImageData; tilePatchMetadata = backgroundSnapshotPlan.tilePatch; tilePatchTransportMetrics = { selection_tile_count: backgroundSnapshotPlan.changedTileCount, @@ -626,7 +623,7 @@ export function createLocalPublisherPipelineHelpers({ selection_tile_ratio: Number(backgroundSnapshotPlan.selectedTileRatio.toFixed(6)), selection_mask_guided: backgroundSnapshotPlan.matteGuided, }; - encoded = patchEncoder.encodeFrame(frameImageData, timestamp); + encoded = patchEncoder.encodeFrame(backgroundSnapshotPlan.patchImageData, timestamp); encodedFrameType = 'keyframe'; } } diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js b/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js index 35cceb15d..42c6cf227 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js @@ -64,9 +64,11 @@ function nativeEncodedFrameAadTrackId(trackKind = 'data') { return 'native_data'; } +const VALID_PROTECTED_CODEC_IDS = new Set(['wlvc_wasm', 'wlvc_ts', 'webcodecs_vp8', 'wlvc_unknown']); + function normalizeProtectedCodecId(value, runtimePath = '') { const normalized = asString(value).toLowerCase(); - if (normalized === 'wlvc_wasm' || normalized === 'wlvc_ts' || normalized === 'webcodecs_vp8' || normalized === 'wlvc_unknown') return normalized; + if (VALID_PROTECTED_CODEC_IDS.has(normalized)) return normalized; if (asString(runtimePath).toLowerCase() === 'webrtc_native') return 'webrtc_native'; return 'wlvc_unknown'; } @@ -385,7 +387,7 @@ export class MediaSecuritySession { try { await this.handleSenderKeySignal(sender, pending); } catch (error) { - if (asString(error?.message || error).trim().toLowerCase() === 'participant_set_mismatch') { + if (this.isParticipantSetMismatchError(error)) { continue; } throw error; @@ -394,6 +396,15 @@ export class MediaSecuritySession { return true; } + isParticipantSetMismatchError(error) { + const code = asString(error?.code).trim().toUpperCase(); + if (code === 'PARTICIPANT_SET_MISMATCH') { + return true; + } + // Backward compatibility for older throw sites that only set message text. + return asString(error?.message || error).trim().toLowerCase() === 'participant_set_mismatch'; + } + async buildSenderKeySignal(targetUserId) { if (!(await this.ensureReady())) return null; const target = normalizeUserId(targetUserId); @@ -418,7 +429,7 @@ export class MediaSecuritySession { asString(peer.participantSetHash) !== participantSetHash || asString(peer.transcriptHash) !== transcriptHash ) { - throw new Error('participant_set_mismatch'); + throw new Error('Participant set mismatch detected (participant_set_mismatch)'); } peer.participantSetHash = participantSetHash; peer.transcriptHash = transcriptHash; @@ -472,7 +483,7 @@ export class MediaSecuritySession { if (payload.contract_name !== SESSION_CONTRACT_NAME || payload.contract_version !== CONTRACT_VERSION) return false; const payloadKexSuite = normalizeKexSuite(payload.kex_suite); if (payloadKexSuite === '' || payloadKexSuite !== keyContext.kexSuite || payload.media_suite !== MEDIA_SUITE) { - throw new Error('downgrade_attempt'); + throw new Error('KEX/media suite downgrade attempt detected'); } const participantSetHash = await this.participantHashForPeer(sender); const transcriptHash = await this.transcriptHashForPeer({ @@ -498,7 +509,8 @@ export class MediaSecuritySession { const epoch = Number(payload.epoch || 0); const senderKeyId = asString(payload.sender_key_id); - if (epoch < 1 || senderKeyId === '') throw new Error('wrong_key_id'); + if (epoch < 1) throw new Error('invalid_epoch'); + if (senderKeyId === '') throw new Error('missing_sender_key_id'); const subtle = subtleCrypto(); const nonce = base64UrlToBytes(payload.nonce); @@ -803,10 +815,11 @@ export class MediaSecuritySession { if (!sender || typeof sender.createEncodedStreams !== 'function') return false; if (this.nativeSenders.has(sender)) return true; const streams = sender.createEncodedStreams(); - if (!streams?.readable || !streams?.writable || typeof TransformStream !== 'function') return false; + const NativeTransformStream = globalThis.TransformStream; + if (!streams?.readable || !streams?.writable || typeof NativeTransformStream !== 'function') return false; this.nativeSenders.add(sender); streams.readable - .pipeThrough(new TransformStream({ + .pipeThrough(new NativeTransformStream({ transform: async (encodedFrame, controller) => { try { encodedFrame.data = await this.protectNativeEncodedFrame(encodedFrame, { trackKind, trackId }); @@ -828,10 +841,11 @@ export class MediaSecuritySession { if (!receiver || typeof receiver.createEncodedStreams !== 'function') return false; if (this.nativeReceivers.has(receiver)) return true; const streams = receiver.createEncodedStreams(); - if (!streams?.readable || !streams?.writable || typeof TransformStream !== 'function') return false; + const NativeTransformStream = globalThis.TransformStream; + if (!streams?.readable || !streams?.writable || typeof NativeTransformStream !== 'function') return false; this.nativeReceivers.add(receiver); streams.readable - .pipeThrough(new TransformStream({ + .pipeThrough(new NativeTransformStream({ transform: async (encodedFrame, controller) => { try { encodedFrame.data = await this.decryptNativeEncodedFrame(encodedFrame, senderUserId, { trackId }); @@ -854,7 +868,7 @@ export class MediaSecuritySession { && typeof RTCRtpSender.prototype?.createEncodedStreams === 'function' && typeof RTCRtpReceiver !== 'undefined' && typeof RTCRtpReceiver.prototype?.createEncodedStreams === 'function' - && typeof TransformStream === 'function'; + && typeof globalThis.TransformStream === 'function'; } } diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js index eb0ee498f..33907bc28 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js @@ -35,9 +35,9 @@ export const MEDIA_SECURITY_HANDSHAKE_TIMEOUT_MS = MEDIA_SECURITY_HANDSHAKE_RETR export const MEDIA_SECURITY_HANDSHAKE_WATCHDOG_INTERVAL_MS = 1000; export const MEDIA_SECURITY_SFU_TARGET_SETTLE_MS = 50; export const SFU_AUTO_QUALITY_DOWNGRADE_COOLDOWN_MS = 2500; -export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 1000; -export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 2; -export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 2; +export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 750; +export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 1; +export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 1; export const SFU_AUTO_QUALITY_RECOVERY_STABLE_WINDOW_MS = 15_000; export const SFU_AUTO_QUALITY_RECOVERY_MIN_INTERVAL_MS = 30_000; export const SFU_AUTO_QUALITY_RECOVERY_MAX_READBACK_BUDGET_RATIO = 0.6; diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js index 272768238..e91461a3e 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js @@ -3,6 +3,8 @@ import { shouldRequestSfuFullKeyframeForReason, } from '../../sfu/recoveryReasons'; +const WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000; + export function createCallWorkspaceSocketHelpers({ callbacks, constants, @@ -528,6 +530,7 @@ export function createCallWorkspaceSocketHelpers({ const leaveRoom = options?.leaveRoom === true; clearReconnectTimer(); clearPingTimer(); + state.connectInFlight = false; refs.hasRealtimeRoomSync.value = false; hideLobbyJoinToast(); const socket = refs.socketRef.value; @@ -624,9 +627,17 @@ export function createCallWorkspaceSocketHelpers({ } async function connectSocket() { + if (state.connectInFlight && !state.manualSocketClose) return; const generation = ++state.connectGeneration; + state.connectInFlight = true; + const finishConnectInFlight = () => { + if (generation === state.connectGeneration) { + state.connectInFlight = false; + } + }; const token = String(refs.sessionState.sessionToken || '').trim(); if (token === '') { + finishConnectInFlight(); refs.connectionReason.value = 'missing_session'; refs.connectionState.value = 'expired'; return; @@ -657,6 +668,7 @@ export function createCallWorkspaceSocketHelpers({ const sessionProbe = await probeWorkspaceSession(); if (generation !== state.connectGeneration || state.manualSocketClose) { + finishConnectInFlight(); return; } if (!sessionProbe.ok) { @@ -667,6 +679,7 @@ export function createCallWorkspaceSocketHelpers({ } else { refs.connectionState.value = sessionProbe.state; setNotice(sessionProbe.message, 'error'); + finishConnectInFlight(); return; } } @@ -676,6 +689,7 @@ export function createCallWorkspaceSocketHelpers({ refs.connectionState.value = 'blocked'; refs.connectionReason.value = 'secure_transport_required'; setNotice('Secure WebSocket transport is required. Configure HTTPS/WSS backend origins.', 'error'); + finishConnectInFlight(); return; } @@ -684,6 +698,7 @@ export function createCallWorkspaceSocketHelpers({ if (originIndex >= orderedSocketOrigins.length) { refs.connectionState.value = 'retrying'; refs.connectionReason.value = 'socket_unreachable'; + finishConnectInFlight(); scheduleReconnect(); return; } @@ -701,25 +716,41 @@ export function createCallWorkspaceSocketHelpers({ } catch { // ignore } + finishConnectInFlight(); return; } refs.socketRef.value = socket; let opened = false; let failedOver = false; + let failoverAfterClose = false; + let negotiationTimer = null; + const clearNegotiationTimer = () => { + if (negotiationTimer === null) return; + clearTimeout(negotiationTimer); + negotiationTimer = null; + }; + const connectNextOrigin = () => { + connectWithOriginAt(originIndex + 1); + }; - const failOverToNextOrigin = () => { + const failOverToNextOrigin = (closeReason = 'failover') => { if (failedOver) return; failedOver = true; + clearNegotiationTimer(); if (refs.socketRef.value === socket) { refs.socketRef.value = null; } try { - socket.close(1000, 'failover'); + socket.close(1000, closeReason); } catch { // ignore } - connectWithOriginAt(originIndex + 1); + if (socket.readyState === WebSocket.CONNECTING || socket.readyState === WebSocket.CLOSING) { + failoverAfterClose = true; + return; + } + connectNextOrigin(); }; const failOverAfterAssetVersionProbe = () => { @@ -728,19 +759,26 @@ export function createCallWorkspaceSocketHelpers({ : false; if (assetVersionProbe && typeof assetVersionProbe.then === 'function') { assetVersionProbe.then((handled) => { - if (handled) return; + if (handled) { + finishConnectInFlight(); + return; + } failOverToNextOrigin(); }).catch(() => { failOverToNextOrigin(); }); return; } - if (assetVersionProbe) return; + if (assetVersionProbe) { + finishConnectInFlight(); + return; + } failOverToNextOrigin(); }; socket.addEventListener('open', () => { if (generation !== state.connectGeneration || state.manualSocketClose) { + clearNegotiationTimer(); try { socket.close(1000, 'stale_connect'); } catch { @@ -750,6 +788,8 @@ export function createCallWorkspaceSocketHelpers({ } opened = true; + clearNegotiationTimer(); + finishConnectInFlight(); const isReconnectOpen = refs.reconnectAttempt.value > 0; refs.reconnectAttempt.value = 0; refs.connectionState.value = 'online'; @@ -784,7 +824,8 @@ export function createCallWorkspaceSocketHelpers({ socket.addEventListener('error', () => { if (generation !== state.connectGeneration || state.manualSocketClose) return; if (!opened) { - failOverAfterAssetVersionProbe(); + // The browser will emit close after a failed handshake. Origin failover + // is intentionally deferred until then to keep connection attempts single-flight. return; } refs.connectionState.value = 'retrying'; @@ -794,6 +835,7 @@ export function createCallWorkspaceSocketHelpers({ socket.addEventListener('close', (event) => { if (generation !== state.connectGeneration) return; + clearNegotiationTimer(); clearPingTimer(); refs.clearMediaSecurityHandshakeWatchdog(); if (refs.socketRef.value === socket) { @@ -802,9 +844,15 @@ export function createCallWorkspaceSocketHelpers({ refs.hasRealtimeRoomSync.value = false; if (state.manualSocketClose) { + finishConnectInFlight(); return; } if (handleAssetVersionSocketClose(event)) { + finishConnectInFlight(); + return; + } + if (failoverAfterClose) { + connectNextOrigin(); return; } @@ -813,12 +861,14 @@ export function createCallWorkspaceSocketHelpers({ refs.connectionState.value = 'expired'; refs.connectionReason.value = closeReason; state.manualSocketClose = true; + finishConnectInFlight(); return; } if (closeReason === 'auth_backend_error' || (event?.code === 1008 && closeReason !== '')) { refs.connectionState.value = 'blocked'; refs.connectionReason.value = closeReason || 'blocked'; state.manualSocketClose = true; + finishConnectInFlight(); return; } if (!opened) { @@ -830,6 +880,25 @@ export function createCallWorkspaceSocketHelpers({ refs.connectionReason.value = closeReason || 'socket_closed'; scheduleReconnect(); }); + + negotiationTimer = setTimeout(() => { + if (generation !== state.connectGeneration || state.manualSocketClose) return; + if (opened || failedOver) return; + refs.connectionState.value = 'retrying'; + refs.connectionReason.value = 'socket_negotiation_timeout'; + captureClientDiagnostic({ + category: 'realtime', + level: 'warning', + eventType: 'websocket_negotiation_timeout', + code: 'websocket_negotiation_timeout', + message: 'Realtime websocket negotiation timed out before the browser opened the socket.', + payload: { + origin: socketOrigin, + negotiation_timeout_ms: WEBSOCKET_NEGOTIATION_TIMEOUT_MS, + }, + }); + failOverToNextOrigin('negotiation_timeout'); + }, WEBSOCKET_NEGOTIATION_TIMEOUT_MS); }; connectWithOriginAt(0); diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js index 4141e68d2..4dea1acaf 100644 --- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js +++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js @@ -32,9 +32,9 @@ export const SFU_WLVC_FRAME_HEIGHT = 720; export const SFU_WLVC_FRAME_QUALITY = 43; export const SFU_WLVC_KEYFRAME_INTERVAL = 8; export const SFU_WLVC_ENCODE_INTERVAL_MS = 92; -export const SFU_WLVC_SEND_BUFFER_HIGH_WATER_BYTES = 4 * 1024 * 1024; -export const SFU_WLVC_SEND_BUFFER_LOW_WATER_BYTES = 1024 * 1024; -export const SFU_WLVC_SEND_BUFFER_CRITICAL_BYTES = 10 * 1024 * 1024; +export const SFU_WLVC_SEND_BUFFER_HIGH_WATER_BYTES = 2 * 1024 * 1024; +export const SFU_WLVC_SEND_BUFFER_LOW_WATER_BYTES = 512 * 1024; +export const SFU_WLVC_SEND_BUFFER_CRITICAL_BYTES = 6 * 1024 * 1024; export const SFU_WLVC_BACKPRESSURE_MIN_PAUSE_MS = 350; export const SFU_WLVC_BACKPRESSURE_MAX_PAUSE_MS = 2500; export const SFU_WLVC_BACKPRESSURE_HARD_RESET_AFTER_MS = 30_000; @@ -43,12 +43,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({ rescue: Object.freeze({ maxEncodedBytesPerFrame: 2048 * 1024, maxKeyframeBytesPerFrame: 2560 * 1024, - maxWireBytesPerSecond: 3200 * 1024, + maxWireBytesPerSecond: 2200 * 1024, maxEncodeMs: 78, maxDrawImageMs: 24, maxReadbackMs: 40, maxQueueAgeMs: 260, - maxBufferedBytes: 3 * 1024 * 1024, + maxBufferedBytes: 2 * 1024 * 1024, payloadSoftLimitRatio: 0.94, minKeyframeRetryMs: 1400, expectedRecovery: 'hold_rescue_until_socket_low_water', @@ -56,12 +56,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({ realtime: Object.freeze({ maxEncodedBytesPerFrame: 3072 * 1024, maxKeyframeBytesPerFrame: 3840 * 1024, - maxWireBytesPerSecond: 4600 * 1024, + maxWireBytesPerSecond: 3000 * 1024, maxEncodeMs: 92, maxDrawImageMs: 28, maxReadbackMs: 40, maxQueueAgeMs: 300, - maxBufferedBytes: 4 * 1024 * 1024, + maxBufferedBytes: 2560 * 1024, payloadSoftLimitRatio: 0.93, minKeyframeRetryMs: 1100, expectedRecovery: 'downshift_to_rescue_before_critical_buffer', @@ -69,12 +69,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({ balanced: Object.freeze({ maxEncodedBytesPerFrame: 4608 * 1024, maxKeyframeBytesPerFrame: 5632 * 1024, - maxWireBytesPerSecond: 6500 * 1024, + maxWireBytesPerSecond: 4200 * 1024, maxEncodeMs: 120, maxDrawImageMs: 36, maxReadbackMs: 52, maxQueueAgeMs: 360, - maxBufferedBytes: 6500 * 1024, + maxBufferedBytes: 3 * 1024 * 1024, payloadSoftLimitRatio: 0.91, minKeyframeRetryMs: 900, expectedRecovery: 'downshift_to_realtime_before_critical_buffer', @@ -82,12 +82,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({ quality: Object.freeze({ maxEncodedBytesPerFrame: 5632 * 1024, maxKeyframeBytesPerFrame: 6656 * 1024, - maxWireBytesPerSecond: 8400 * 1024, + maxWireBytesPerSecond: 5200 * 1024, maxEncodeMs: 150, maxDrawImageMs: 42, maxReadbackMs: 64, maxQueueAgeMs: 420, - maxBufferedBytes: 8 * 1024 * 1024, + maxBufferedBytes: 4 * 1024 * 1024, payloadSoftLimitRatio: 0.9, minKeyframeRetryMs: 800, expectedRecovery: 'downshift_to_balanced_before_critical_buffer', diff --git a/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts b/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts index a716f00d8..c1fb3f2f3 100644 --- a/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts +++ b/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts @@ -1,4 +1,6 @@ const SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES = 192 * 1024 +const SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO = 0.25 +const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES = 512 * 1024 export const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS = 160 export const SFU_FRAME_WIRE_BUDGET_WINDOW_MS = 1000 @@ -30,9 +32,10 @@ function normalizedBudgetNumber(value: unknown): number { export function resolveSfuSendDrainTargetBytes(metrics: Record): number { const bufferedBudgetBytes = normalizedBudgetNumber(metrics.budget_max_buffered_bytes) if (bufferedBudgetBytes <= 0) return SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES + const profileDrainTargetBytes = Math.floor(bufferedBudgetBytes * SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO) return Math.max( SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES, - Math.floor(bufferedBudgetBytes * 0.5), + Math.min(SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES, profileDrainTargetBytes), ) } diff --git a/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts b/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts index 3f8deb603..ca49e2089 100644 --- a/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts +++ b/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts @@ -86,6 +86,7 @@ const SFU_FRAME_TRANSPORT_SAMPLE_COOLDOWN_MS = 2000 const SFU_PUBLISHER_FRAME_STALL_CHECK_INTERVAL_MS = 1000 const SFU_PUBLISHER_FRAME_STALL_RESUBSCRIBE_AFTER_MS = 6000 const SFU_PUBLISHER_FRAME_STALL_RECOVERY_COOLDOWN_MS = 5000 +const SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000 interface SendBufferDrainResult { ok: boolean @@ -120,6 +121,7 @@ export class SFUClient { private lastFrameTransportSample: SfuFrameTransportSample | null = null private publisherFrameHealthById = new Map() private publisherFrameStallTimer: ReturnType | null = null + private connectAttemptInFlight = false private mediaTransport: SfuWebSocketFallbackMediaTransport constructor(cb: SFUClientCallbacks) { @@ -170,6 +172,7 @@ export class SFUClient { ): void { if (generation !== this.connectGeneration) return if (index >= candidates.length) { + this.connectAttemptInFlight = false reportClientDiagnostic({ category: 'media', level: 'error', @@ -198,38 +201,73 @@ export class SFUClient { this.ws = ws let opened = false let failedOver = false + let failoverAfterClose = false + let negotiationTimer: ReturnType | null = null + + const clearNegotiationTimer = () => { + if (negotiationTimer === null) return + clearTimeout(negotiationTimer) + negotiationTimer = null + } + + const connectNextCandidate = () => { + this.connectWithCandidates(candidates, index + 1, query, roomId, generation) + } const failToNextCandidate = () => { if (generation !== this.connectGeneration) return if (opened) return if (failedOver) return failedOver = true + clearNegotiationTimer() if (this.ws === ws) { this.ws = null } - this.connectWithCandidates(candidates, index + 1, query, roomId, generation) + connectNextCandidate() + } + + const failToNextCandidateAfterSocketClose = (closeReason = 'failover') => { + if (generation !== this.connectGeneration) return + if (opened) return + if (failedOver) return + failedOver = true + clearNegotiationTimer() + if (this.ws === ws) { + this.ws = null + } + try { + ws.close(1000, closeReason) + } catch {} + if (ws.readyState === WebSocket.CONNECTING || ws.readyState === WebSocket.CLOSING) { + failoverAfterClose = true + return + } + connectNextCandidate() } const failToNextCandidateAfterAssetVersionProbe = (): void => { - const assetVersionProbe = handleAssetVersionConnectionFailure() - if (assetVersionProbe && typeof assetVersionProbe.then === 'function') { - assetVersionProbe.then((handled) => { - if (handled) return + Promise.resolve(handleAssetVersionConnectionFailure()) + .then((handled) => { + if (handled) { + this.connectAttemptInFlight = false + return + } failToNextCandidate() - }).catch(() => { + }) + .catch(() => { failToNextCandidate() }) - return - } - failToNextCandidate() } ws.onopen = () => { if (generation !== this.connectGeneration) { + clearNegotiationTimer() try { ws.close() } catch {} return } opened = true + clearNegotiationTimer() + this.connectAttemptInFlight = false this.disconnectNotified = false setBackendSfuOrigin(candidates[index] || '') this.send({ type: 'sfu/join', room_id: roomId, role: 'publisher' }) @@ -259,7 +297,15 @@ export class SFUClient { ws.onclose = (event) => { if (generation !== this.connectGeneration) return - if (handleAssetVersionSocketClose(event)) return + clearNegotiationTimer() + if (handleAssetVersionSocketClose(event)) { + this.connectAttemptInFlight = false + return + } + if (failoverAfterClose) { + connectNextCandidate() + return + } if (!opened) { failToNextCandidateAfterAssetVersionProbe() return @@ -288,18 +334,40 @@ export class SFUClient { ws.onerror = () => { if (generation !== this.connectGeneration) return if (!opened) { - failToNextCandidateAfterAssetVersionProbe() + // Browsers follow pre-open errors with close; wait for that terminal + // event before trying the next origin so CONNECTING sockets do not pile up. return } try { ws.close() } catch {} } + + negotiationTimer = setTimeout(() => { + if (generation !== this.connectGeneration) return + if (opened || failedOver) return + reportClientDiagnostic({ + category: 'media', + level: 'warning', + eventType: 'sfu_socket_negotiation_timeout', + code: 'sfu_socket_negotiation_timeout', + message: 'SFU websocket negotiation timed out before the browser opened the socket.', + roomId, + payload: { + room_id: roomId, + candidate_origin: String(candidates[index] || ''), + negotiation_timeout_ms: SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS, + }, + }) + failToNextCandidateAfterSocketClose('negotiation_timeout') + }, SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS) } connect(session: { userId: string; token: string; name: string }, roomId: string, callId = ''): void { + if (this.connectAttemptInFlight) return this.connectGeneration += 1 this.outboundMediaGeneration += 1 + this.connectAttemptInFlight = true this.disconnectNotified = false this.inboundFrameAssembler.clear() this.outboundFrameSequenceByTrack.clear() @@ -311,7 +379,7 @@ export class SFUClient { const generation = this.connectGeneration if (this.ws) { - this.retireSocket(this.ws) + this.retireSocket(this.ws, true) this.ws = null } @@ -419,6 +487,7 @@ export class SFUClient { leave(): void { this.connectGeneration += 1 this.outboundMediaGeneration += 1 + this.connectAttemptInFlight = false this.disconnectNotified = false this.inboundFrameAssembler.clear() this.outboundFrameSequenceByTrack.clear() diff --git a/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp b/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp index a69e55819..9810b0be6 100644 --- a/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp +++ b/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp @@ -2,9 +2,48 @@ #include #include #include +#include namespace wlvc { +namespace { + +size_t plane_count(int width, int height) { + return static_cast(width) * static_cast(height); +} + +size_t rgba_offset(size_t pixel_index) { + return pixel_index * static_cast(4); +} + +size_t rgba_offset(int row, int col, int width) { + return rgba_offset(static_cast(row) * static_cast(width) + + static_cast(col)); +} + +bool fits_int(size_t value) { + return value <= static_cast(std::numeric_limits::max()); +} + +int count_to_int(size_t value) { + return fits_int(value) ? static_cast(value) : -1; +} + +size_t int16_plane_bytes(size_t value_count) { + return value_count * sizeof(int16_t); +} + +size_t float_plane_bytes(size_t value_count) { + return value_count * sizeof(float); +} + +size_t rle_max_bytes_for_count(size_t value_count) { + return static_cast(RLE_HEADER_BYTES) + + value_count * static_cast(RLE_PAIR_BYTES); +} + +} // namespace + // --------------------------------------------------------------------------- // Encoder // --------------------------------------------------------------------------- @@ -13,21 +52,23 @@ Encoder::Encoder(const EncoderConfig& cfg) : cfg_(cfg) { const int w = cfg_.width, h = cfg_.height; const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w; const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h; - - Y_.resize(w * h); - U_.resize(uvW * uvH); - V_.resize(uvW * uvH); - Ydelta_.resize(w * h); - Udelta_.resize(uvW * uvH); - Vdelta_.resize(uvW * uvH); - prevY_.resize(w * h); - prevU_.resize(uvW * uvH); - prevV_.resize(uvW * uvH); - Yq_.resize(w * h); - Uq_.resize(uvW * uvH); - Vq_.resize(uvW * uvH); + const size_t y_count = plane_count(w, h); + const size_t uv_count = plane_count(uvW, uvH); + + Y_.resize(y_count); + U_.resize(uv_count); + V_.resize(uv_count); + Ydelta_.resize(y_count); + Udelta_.resize(uv_count); + Vdelta_.resize(uv_count); + prevY_.resize(y_count); + prevU_.resize(uv_count); + prevV_.resize(uv_count); + Yq_.resize(y_count); + Uq_.resize(uv_count); + Vq_.resize(uv_count); tmp_.resize(std::max(w, h)); - rle_buf_.resize(rle_max_bytes(std::max(w * h, uvW * uvH))); + rle_buf_.resize(rle_max_bytes_for_count(std::max(y_count, uv_count))); } void Encoder::rgba_to_yuv(const uint8_t* rgba) { @@ -38,24 +79,29 @@ void Encoder::rgba_to_yuv(const uint8_t* rgba) { if (cfg_.color_space == kYUV) { for (int row = 0; row < h; ++row) { for (int col = 0; col < w; ++col) { - const int i = (row * w + col) * 4; + const size_t i = rgba_offset(row, col, w); const float r = rgba[i], g = rgba[i + 1], b = rgba[i + 2]; - Y_[row * w + col] = 0.299f * r + 0.587f * g + 0.114f * b - 128.0f; + Y_[static_cast(row) * static_cast(w) + static_cast(col)] = + 0.299f * r + 0.587f * g + 0.114f * b - 128.0f; } } for (int row = 0; row < uvH; ++row) { for (int col = 0; col < uvW; ++col) { - const int i = (row * 2 * w + col * 2) * 4; + const size_t i = rgba_offset(static_cast(row) * static_cast(2) * static_cast(w) + + static_cast(col) * static_cast(2)); const float r = rgba[i], g = rgba[i + 1], b = rgba[i + 2]; - U_[row * uvW + col] = -0.147f * r - 0.289f * g + 0.436f * b; - V_[row * uvW + col] = 0.615f * r - 0.515f * g - 0.100f * b; + const size_t uv_idx = static_cast(row) * static_cast(uvW) + + static_cast(col); + U_[uv_idx] = -0.147f * r - 0.289f * g + 0.436f * b; + V_[uv_idx] = 0.615f * r - 0.515f * g - 0.100f * b; } } } else { for (int row = 0; row < h; ++row) { for (int col = 0; col < w; ++col) { - const int i = (row * w + col) * 4; - const int idx = row * w + col; + const size_t i = rgba_offset(row, col, w); + const size_t idx = static_cast(row) * static_cast(w) + + static_cast(col); Y_[idx] = rgba[i]; U_[idx] = rgba[i + 1]; V_[idx] = rgba[i + 2]; @@ -66,9 +112,17 @@ void Encoder::rgba_to_yuv(const uint8_t* rgba) { int Encoder::encode(const uint8_t* rgba, double timestamp_us, uint8_t* out_buf, int out_capacity) { + if (out_capacity < 0) return -1; + const int w = cfg_.width, h = cfg_.height; const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w; const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h; + const size_t y_count = plane_count(w, h); + const size_t uv_count = plane_count(uvW, uvH); + const int y_count_i = count_to_int(y_count); + const int uv_count_i = count_to_int(uv_count); + if (y_count_i < 0 || uv_count_i < 0) return -1; + const bool is_key = (frame_count_ % cfg_.key_frame_interval) == 0; ++frame_count_; @@ -81,9 +135,9 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us, float* v_enc = V_.data(); const bool use_temporal_residual = cfg_.motion_estimation && !is_key && frame_count_ > 1; if (use_temporal_residual) { - for (int i = 0; i < w * h; ++i) + for (size_t i = 0; i < y_count; ++i) Ydelta_[i] = Y_[i] - prevY_[i]; - for (int i = 0; i < uvW * uvH; ++i) { + for (size_t i = 0; i < uv_count; ++i) { Udelta_[i] = U_[i] - prevU_[i]; Vdelta_[i] = V_[i] - prevV_[i]; } @@ -91,9 +145,9 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us, u_enc = Udelta_.data(); v_enc = Vdelta_.data(); } - std::memcpy(prevY_.data(), Y_.data(), w * h * sizeof(float)); - std::memcpy(prevU_.data(), U_.data(), uvW * uvH * sizeof(float)); - std::memcpy(prevV_.data(), V_.data(), uvW * uvH * sizeof(float)); + std::memcpy(prevY_.data(), Y_.data(), float_plane_bytes(y_count)); + std::memcpy(prevU_.data(), U_.data(), float_plane_bytes(uv_count)); + std::memcpy(prevV_.data(), V_.data(), float_plane_bytes(uv_count)); // Forward DWT dwt_forward(y_enc, w, h, cfg_.levels, tmp_.data()); @@ -109,19 +163,39 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us, size_t y_rle_sz = 0; size_t u_rle_sz = 0; size_t v_rle_sz = 0; + size_t y_rle_off = 0; + size_t u_rle_off = 0; + size_t v_rle_off = 0; if (cfg_.entropy_coding == kNone) { - y_rle_sz = static_cast(w * h * sizeof(int16_t)); - u_rle_sz = static_cast(uvW * uvH * sizeof(int16_t)); - v_rle_sz = static_cast(uvW * uvH * sizeof(int16_t)); + y_rle_sz = int16_plane_bytes(y_count); + u_rle_sz = int16_plane_bytes(uv_count); + v_rle_sz = int16_plane_bytes(uv_count); } else { - y_rle_sz = rle_encode(Yq_.data(), w * h, rle_buf_.data()); - u_rle_sz = rle_encode(Uq_.data(), uvW * uvH, rle_buf_.data()); - v_rle_sz = rle_encode(Vq_.data(), uvW * uvH, rle_buf_.data()); + const size_t y_cap = int16_plane_bytes(y_count_i); + const size_t u_cap = int16_plane_bytes(uv_count_i); + const size_t v_cap = int16_plane_bytes(uv_count_i); + if (y_cap > std::numeric_limits::max() - u_cap) return -1; + const size_t yu_cap = y_cap + u_cap; + if (yu_cap > std::numeric_limits::max() - v_cap) return -1; + const size_t total_rle_cap = yu_cap + v_cap; + rle_buf_.resize(total_rle_cap); + + y_rle_off = 0; + u_rle_off = y_cap; + v_rle_off = y_cap + u_cap; + + y_rle_sz = rle_encode(Yq_.data(), y_count_i, rle_buf_.data() + y_rle_off); + u_rle_sz = rle_encode(Uq_.data(), uv_count_i, rle_buf_.data() + u_rle_off); + v_rle_sz = rle_encode(Vq_.data(), uv_count_i, rle_buf_.data() + v_rle_off); } // Pack header + payload - const int total = kHeaderBytes + static_cast(y_rle_sz + u_rle_sz + v_rle_sz); - if (total > out_capacity) return -1; + const size_t total = static_cast(kHeaderBytes) + y_rle_sz + u_rle_sz + v_rle_sz; + if (y_rle_sz > static_cast(std::numeric_limits::max()) + || u_rle_sz > static_cast(std::numeric_limits::max()) + || v_rle_sz > static_cast(std::numeric_limits::max())) return -1; + if (total > static_cast(std::numeric_limits::max()) + || total > static_cast(out_capacity)) return -1; uint8_t* p = out_buf; auto w32 = [&](uint32_t v) { @@ -167,11 +241,11 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us, std::memcpy(p, Vq_.data(), v_rle_sz); p += v_rle_sz; } else { - const size_t y_bytes = rle_encode(Yq_.data(), w * h, p); + const size_t y_bytes = rle_encode(Yq_.data(), y_count_i, p); p += y_bytes; - const size_t u_bytes = rle_encode(Uq_.data(), uvW * uvH, p); + const size_t u_bytes = rle_encode(Uq_.data(), uv_count_i, p); p += u_bytes; - const size_t v_bytes = rle_encode(Vq_.data(), uvW * uvH, p); + const size_t v_bytes = rle_encode(Vq_.data(), uv_count_i, p); p += v_bytes; } @@ -182,12 +256,21 @@ int Encoder::max_encoded_bytes() const { const int w = cfg_.width, h = cfg_.height; const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w; const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h; + const size_t y_count = plane_count(w, h); + const size_t uv_count = plane_count(uvW, uvH); + size_t total = 0; if (cfg_.entropy_coding == kNone) { - return kHeaderBytes + (w * h + uvW * uvH + uvW * uvH) * static_cast(sizeof(int16_t)); + total = static_cast(kHeaderBytes) + + int16_plane_bytes(y_count) + + int16_plane_bytes(uv_count) + + int16_plane_bytes(uv_count); + } else { + total = static_cast(kHeaderBytes) + + rle_max_bytes_for_count(y_count) + + rle_max_bytes_for_count(uv_count) + + rle_max_bytes_for_count(uv_count); } - return kHeaderBytes - + static_cast(rle_max_bytes(w * h)) - + static_cast(rle_max_bytes(uvW * uvH)) * 2; + return count_to_int(total); } void Encoder::reset() { @@ -205,16 +288,18 @@ Decoder::Decoder(const DecoderConfig& cfg) : cfg_(cfg) { const int w = cfg_.width, h = cfg_.height; const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w; const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h; - - Y_.resize(w * h); - U_.resize(uvW * uvH); - V_.resize(uvW * uvH); - prevY_.resize(w * h); - prevU_.resize(uvW * uvH); - prevV_.resize(uvW * uvH); - Yq_.resize(w * h); - Uq_.resize(uvW * uvH); - Vq_.resize(uvW * uvH); + const size_t y_count = plane_count(w, h); + const size_t uv_count = plane_count(uvW, uvH); + + Y_.resize(y_count); + U_.resize(uv_count); + V_.resize(uv_count); + prevY_.resize(y_count); + prevU_.resize(uv_count); + prevV_.resize(uv_count); + Yq_.resize(y_count); + Uq_.resize(uv_count); + Vq_.resize(uv_count); tmp_.resize(std::max(w, h)); } @@ -261,16 +346,26 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) { r8(); // blur radius } - if (kHeaderBytes + y_bytes + u_bytes + v_bytes > static_cast(enc_size)) + const size_t payload_end = static_cast(kHeaderBytes) + + static_cast(y_bytes) + + static_cast(u_bytes) + + static_cast(v_bytes); + if (payload_end > static_cast(enc_size)) return -1; + const size_t y_count = plane_count(w, h); + const size_t uv_count = plane_count(uvW, uvH); + const int y_count_i = count_to_int(y_count); + const int uv_count_i = count_to_int(uv_count); + if (y_count_i < 0 || uv_count_i < 0) return -1; + int yn = 0; int un = 0; int vn = 0; if (entropyMode == kNone) { - const size_t y_expected = static_cast(w * h * sizeof(int16_t)); - const size_t u_expected = static_cast(uvW * uvH * sizeof(int16_t)); - const size_t v_expected = static_cast(uvW * uvH * sizeof(int16_t)); + const size_t y_expected = int16_plane_bytes(y_count); + const size_t u_expected = int16_plane_bytes(uv_count); + const size_t v_expected = int16_plane_bytes(uv_count); if (y_bytes != y_expected || u_bytes != u_expected || v_bytes != v_expected) return -1; std::memcpy(Yq_.data(), p, y_bytes); p += y_bytes; @@ -278,19 +373,19 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) { p += u_bytes; std::memcpy(Vq_.data(), p, v_bytes); p += v_bytes; - yn = w * h; - un = uvW * uvH; - vn = uvW * uvH; + yn = y_count_i; + un = uv_count_i; + vn = uv_count_i; } else { - yn = rle_decode(p, y_bytes, Yq_.data(), w * h); + yn = rle_decode(p, y_bytes, Yq_.data(), y_count_i); p += y_bytes; - un = rle_decode(p, u_bytes, Uq_.data(), uvW * uvH); + un = rle_decode(p, u_bytes, Uq_.data(), uv_count_i); p += u_bytes; - vn = rle_decode(p, v_bytes, Vq_.data(), uvW * uvH); + vn = rle_decode(p, v_bytes, Vq_.data(), uv_count_i); p += v_bytes; } - if (yn != w * h || un != uvW * uvH || vn != uvW * uvH) return -1; + if (yn != y_count_i || un != uv_count_i || vn != uv_count_i) return -1; // Dequantise QuantConfig qcfg = { quality, levels }; @@ -305,22 +400,22 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) { // Temporal residual reconstruction if (!is_key) { - for (int i = 0; i < w * h; ++i) + for (size_t i = 0; i < y_count; ++i) Y_[i] += prevY_[i]; } if (!is_key && (flags & kFrameFlagChromaTemporalResidual)) { - for (int i = 0; i < uvW * uvH; ++i) { + for (size_t i = 0; i < uv_count; ++i) { U_[i] += prevU_[i]; V_[i] += prevV_[i]; } } - std::memcpy(prevY_.data(), Y_.data(), w * h * sizeof(float)); - std::memcpy(prevU_.data(), U_.data(), uvW * uvH * sizeof(float)); - std::memcpy(prevV_.data(), V_.data(), uvW * uvH * sizeof(float)); + std::memcpy(prevY_.data(), Y_.data(), float_plane_bytes(y_count)); + std::memcpy(prevU_.data(), U_.data(), float_plane_bytes(uv_count)); + std::memcpy(prevV_.data(), V_.data(), float_plane_bytes(uv_count)); if (colorSpace == kRGB) { - for (int i = 0; i < w * h; ++i) { - const int pi = i * 4; + for (size_t i = 0; i < y_count; ++i) { + const size_t pi = rgba_offset(i); rgba_out[pi] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, Y_[i]))); rgba_out[pi + 1] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, U_[i]))); rgba_out[pi + 2] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, V_[i]))); @@ -337,8 +432,10 @@ void Decoder::yuv_to_rgba(uint8_t* rgba, int w, int h, int uvW, int uvH) { for (int row = 0; row < h; ++row) { for (int col = 0; col < w; ++col) { - const int yi = row * w + col; - const int uvi = (row >> 1) * uvW + (col >> 1); + const size_t yi = static_cast(row) * static_cast(w) + + static_cast(col); + const size_t uvi = static_cast(row >> 1) * static_cast(uvW) + + static_cast(col >> 1); const float y = Y[yi] + 128.0f; const float u = U[uvi]; @@ -348,7 +445,7 @@ void Decoder::yuv_to_rgba(uint8_t* rgba, int w, int h, const float g = y - 0.39465f * u - 0.58060f * v; const float b = y + 2.03211f * u; - const int pi = yi * 4; + const size_t pi = rgba_offset(yi); rgba[pi] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, r))); rgba[pi + 1] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, g))); rgba[pi + 2] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, b))); diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs index bc9d16954..1029b7336 100644 --- a/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs +++ b/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs @@ -29,6 +29,8 @@ try { const publisherBackpressureController = read('src/domain/realtime/workspace/callWorkspace/publisherBackpressureController.js'); requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES = 192 * 1024', 'client has an explicit low-water send resume target'); + requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO = 0.25', 'client drains to a low fraction of the active profile budget'); + requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES = 512 * 1024', 'client caps send-buffer drain target below one megabyte'); requireContains(outboundFrameBudget, 'export const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS = 160', 'client does not hide media behind a 500ms browser drain wait'); requireContains(outboundFrameBudget, 'export function resolveSfuSendDrainTargetBytes(metrics', 'client derives drain target from the active profile budget'); requireContains(sfuClient, 'this.waitForSendBufferDrain(drainTargetBufferedBytes, SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS)', 'client waits only to the budgeted low-water target'); diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs index 082c9f0ab..8b65bbb0d 100644 --- a/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs +++ b/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs @@ -106,9 +106,9 @@ async function main() { requireContains(runtimeConfig, 'SFU_WLVC_MAX_KEYFRAME_FRAME_BYTES', 'motion payload keyframe cap'); requireContains(runtimeConfig, 'SFU_WLVC_MOTION_DELTA_CADENCE_WINDOW_MS', 'motion payload cadence throttle window'); requireContains(runtimeConfig, 'SFU_WLVC_MOTION_DELTA_PROFILE_DOWNSHIFT_THRESHOLD', 'motion payload repeated-pressure downshift threshold'); - requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 1000;', 'one second send-pressure downgrade window'); - requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 2;', 'two skipped frames trigger quality downgrade'); - requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 2;', 'two send failures trigger quality downgrade'); + requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 750;', 'sub-second send-pressure downgrade window'); + requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 1;', 'first skipped frame triggers quality downgrade'); + requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 1;', 'first send failure triggers quality downgrade'); requireContains(publisherPipeline, 'handleWlvcFramePayloadPressure(encodedPayloadBytes', 'publisher drops oversized frames before send'); requireContains(publisherPipeline, 'encodedPayloadBytes > maxEncodedPayloadBytes', 'publisher compares encoded WLVC payload with cap'); requireContains(sfuPublisherControl, 'sfu_high_motion_payload_pressure', 'publisher controller high-motion pressure reason'); diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs index 8393e961b..780b58496 100644 --- a/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs +++ b/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs @@ -49,8 +49,8 @@ try { requireContains(pressureGate, "request:client-diagnostics", 'online gate records backend client diagnostics POST bodies'); requireContains(pressureGate, '/api\\/user\\/client-diagnostics', 'online gate watches backend diagnostics submissions'); requireContains(pressureGate, 'remote video frozen', 'online gate blocks remote freeze diagnostics'); - requireContains(pressureGate, 'CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024', 'online gate has a critical bufferedAmount ceiling'); - requireContains(pressureGate, 'MAX_ACCEPTED_BUFFERED_BYTES = 8 * 1024 * 1024', 'online gate enforces the quality profile buffer budget'); + requireContains(pressureGate, 'CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024', 'online gate has a critical bufferedAmount ceiling'); + requireContains(pressureGate, 'MAX_ACCEPTED_BUFFERED_BYTES = 4 * 1024 * 1024', 'online gate enforces the quality profile buffer budget'); requireContains(pressureGate, 'socketFailureCount', 'online gate fails on SFU socket close/error during media flow'); requireContains(pressureGate, 'luma > 8', 'online gate fails black remote video'); requireContains(harness, 'highMotionVideo = false', 'media shim keeps high-motion opt-in'); diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs index 4c1aea7da..a540f94e3 100644 --- a/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs +++ b/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs @@ -29,6 +29,7 @@ const framePayload = read('../../src/lib/sfu/framePayload.ts'); const outboundFrameQueue = read('../../src/lib/sfu/outboundFrameQueue.ts'); const inboundFrameAssembler = read('../../src/lib/sfu/inboundFrameAssembler.ts'); const backendOrigin = read('../../src/support/backendOrigin.js'); +const socketLifecycle = read('../../src/domain/realtime/workspace/callWorkspace/socketLifecycle.js'); const stackEnv = read('../../../.env'); const deployScript = read('../../../scripts/deploy.sh'); @@ -39,6 +40,18 @@ try { requireContains(sfuClient, 'const candidates = resolveBackendSfuOriginCandidates()', 'SFU origin candidates'); requireContains(sfuClient, "setBackendSfuOrigin(candidates[index] || '')", 'SFU working origin persistence'); requireContains(sfuClient, 'this.connectWithCandidates(candidates, index + 1, query, roomId, generation)', 'SFU failover to next origin'); + requireContains(sfuClient, 'private connectAttemptInFlight = false', 'SFU client tracks one pending websocket handshake'); + requireContains(sfuClient, 'if (this.connectAttemptInFlight) return', 'SFU client refuses duplicate pending websocket connect attempts'); + requireContains(sfuClient, 'this.connectAttemptInFlight = false\n this.disconnectNotified = false', 'SFU client clears pending handshake only after socket open'); + requireContains(sfuClient, 'Browsers follow pre-open errors with close; wait for that terminal', 'SFU client waits for close before origin failover'); + requireContains(sfuClient, 'const SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000', 'SFU client bounds pending websocket negotiation to 5 minutes'); + requireContains(sfuClient, "failToNextCandidateAfterSocketClose('negotiation_timeout')", 'SFU client closes timed-out websocket negotiation before failover'); + requireContains(socketLifecycle, 'if (state.connectInFlight && !state.manualSocketClose) return;', 'workspace websocket refuses duplicate pending connect attempts'); + requireContains(socketLifecycle, 'state.connectInFlight = true;', 'workspace websocket opens a single-flight gate during handshake'); + requireContains(socketLifecycle, 'The browser will emit close after a failed handshake', 'workspace websocket waits for close before origin failover'); + requireContains(socketLifecycle, 'const WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000;', 'workspace websocket bounds pending negotiation to 5 minutes'); + requireContains(socketLifecycle, "refs.connectionReason.value = 'socket_negotiation_timeout';", 'workspace websocket reports timed-out negotiation'); + requireContains(socketLifecycle, "failOverToNextOrigin('negotiation_timeout')", 'workspace websocket closes timed-out negotiation before failover'); requireContains(sfuClient, 'room: roomId,', 'legacy room query compatibility'); requireContains(sfuClient, 'room_id: roomId,', 'snake_case room query binding'); diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs index 2812f34f4..a3e20939d 100644 --- a/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs +++ b/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs @@ -31,8 +31,8 @@ try { ); requireContains(probe, 'CONTINUATION_THRESHOLD_BYTES = 65_535', 'probe measures around websocket continuation threshold'); requireContains(probe, 'QUALITY_MAX_PAYLOAD_BYTES = 5632 * 1024', 'probe exercises full quality profile payload budget'); - requireContains(probe, 'QUALITY_MAX_BUFFERED_BYTES = 8 * 1024 * 1024', 'probe enforces quality bufferedAmount budget'); - requireContains(probe, 'CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024', 'probe fails before critical backpressure'); + requireContains(probe, 'QUALITY_MAX_BUFFERED_BYTES = 4 * 1024 * 1024', 'probe enforces quality bufferedAmount budget'); + requireContains(probe, 'CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024', 'probe fails before critical backpressure'); requireContains(probe, 'connectSfuSocket', 'probe opens real production SFU sockets'); requireContains(probe, "role: 'publisher'", 'probe opens publisher path'); requireContains(probe, "role: 'subscriber'", 'probe opens subscriber path'); diff --git a/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs b/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs index 5ab120dc0..5189629a4 100644 --- a/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs +++ b/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs @@ -17,8 +17,8 @@ const MIN_REMOTE_HEIGHT = 480; const PRESSURE_DURATION_MS = Math.max(15_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_DURATION_MS || '45000', 10)); const SAMPLE_INTERVAL_MS = Math.max(1_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_SAMPLE_INTERVAL_MS || '2500', 10)); const SLOW_SUBSCRIBER_DURATION_MS = Math.max(5_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_SLOW_MS || '12000', 10)); -const CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024; -const MAX_ACCEPTED_BUFFERED_BYTES = 8 * 1024 * 1024; +const CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024; +const MAX_ACCEPTED_BUFFERED_BYTES = 4 * 1024 * 1024; const MAX_NO_TRANSITION_BUFFERED_BYTES = 1024 * 1024; const MAX_FINAL_BUFFERED_BYTES = 512 * 1024; const MAX_TRANSIENT_BLACK_SAMPLES = 1; diff --git a/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs b/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs index a52f13f74..7f4a9f27c 100644 --- a/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs +++ b/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs @@ -9,8 +9,8 @@ const LOCAL_ENV_FILE = path.join(VIDEOCHAT_DIR, '.env.local'); const CONTINUATION_THRESHOLD_BYTES = 65_535; const QUALITY_MAX_PAYLOAD_BYTES = 5632 * 1024; -const QUALITY_MAX_BUFFERED_BYTES = 8 * 1024 * 1024; -const CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024; +const QUALITY_MAX_BUFFERED_BYTES = 4 * 1024 * 1024; +const CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024; const DRAIN_LOW_WATER_BYTES = 64 * 1024; const DEFAULT_TIMEOUT_MS = Math.max(5_000, Number.parseInt(process.env.VIDEOCHAT_PRODUCTION_PROXY_BUDGET_TIMEOUT_MS || '20000', 10)); const FRAME_SIZES = [ @@ -217,7 +217,7 @@ function encodeBinaryEnvelope({ publisherId, publisherUserId, trackId, frameSequ outgoing_video_quality_profile: 'quality', budget_max_encoded_bytes_per_frame: QUALITY_MAX_PAYLOAD_BYTES, budget_max_keyframe_bytes_per_frame: 6656 * 1024, - budget_max_wire_bytes_per_second: 8400 * 1024, + budget_max_wire_bytes_per_second: 5200 * 1024, budget_max_buffered_bytes: QUALITY_MAX_BUFFERED_BYTES, binary_continuation_threshold_bytes: CONTINUATION_THRESHOLD_BYTES, layout_mode: 'full_frame', diff --git a/demo/video-chat/scripts/check-edge-deployment-decision.sh b/demo/video-chat/scripts/check-edge-deployment-decision.sh index 91ccd099c..025581761 100755 --- a/demo/video-chat/scripts/check-edge-deployment-decision.sh +++ b/demo/video-chat/scripts/check-edge-deployment-decision.sh @@ -86,6 +86,8 @@ require_text "${EDGE_DIR}/edge.php" 'VIDEOCHAT_EDGE_READ_STALL_TIMEOUT_SECONDS' require_text "${EDGE_DIR}/edge.php" '$written === 0' require_text "${EDGE_DIR}/edge.php" "\$chunk === ''" require_text "${EDGE_DIR}/edge.php" 'websocket idle timeout decide' +require_text "${EDGE_DIR}/edge.php" 'WebSocket tunnels cannot stay half-open' +require_text "${EDGE_DIR}/edge.php" '$closeWebSocketTunnel()' require_text "${EDGE_DIR}/edge.php" '$needsBackoff' require_text "${EDGE_DIR}/edge.php" 'Connection: close' require_text "${ROOT_DIR}/frontend-vue/src/lib/wasm/wasm-codec.ts" "WASM_MIME_CACHE_BUSTER" diff --git a/demo/video-chat/scripts/deploy-smoke.sh b/demo/video-chat/scripts/deploy-smoke.sh index eb412ee21..4bf93e6b5 100755 --- a/demo/video-chat/scripts/deploy-smoke.sh +++ b/demo/video-chat/scripts/deploy-smoke.sh @@ -57,7 +57,7 @@ assert_public_health_payload() { } $keys = array_keys($payload); sort($keys); - if ($keys !== ["service", "status", "time"]) { + if ($keys !== ["asset_version", "service", "status", "time"]) { fwrite(STDERR, "unexpected public health keys: " . implode(",", $keys) . "\n"); exit(1); } @@ -65,6 +65,10 @@ assert_public_health_payload() { fwrite(STDERR, "health status is not ok\n"); exit(1); } + if (!is_string($payload["asset_version"] ?? null) || $payload["asset_version"] === "") { + fwrite(STDERR, "health asset_version is missing\n"); + exit(1); + } ' } @@ -105,7 +109,7 @@ websocket_upgrade_smoke() { if [[ "${code}" == "000" ]]; then local header_code - header_code="$(awk '/^HTTP\\// {code=$2} END {print code}' "${headers}" | tr -d '\r')" + header_code="$(awk '/^HTTP\// {code=$2} END {print code}' "${headers}" | tr -d '\r')" [[ -n "${header_code}" ]] && code="${header_code}" fi diff --git a/demo/video-chat/scripts/deploy.sh b/demo/video-chat/scripts/deploy.sh index 5cfaed7c8..21ecceb80 100755 --- a/demo/video-chat/scripts/deploy.sh +++ b/demo/video-chat/scripts/deploy.sh @@ -56,6 +56,11 @@ Optional environment: VIDEOCHAT_DEPLOY_SFU_DOMAIN SFU websocket host, default: sfu.. VIDEOCHAT_DEPLOY_TURN_DOMAIN TURN host, default: turn.. VIDEOCHAT_DEPLOY_CDN_DOMAIN Static/CDN asset host, default: cdn.. + VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS + Optional comma-separated hostnames to route to an + external HTTP upstream through the King edge. + VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM + Upstream host:port for VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS. VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS Comma-separated frontend dev-server hosts, default: deploy domain plus api/ws/sfu/turn/cdn hosts. @@ -177,6 +182,8 @@ refresh_deploy_config() { DEPLOY_SFU_DOMAIN="${VIDEOCHAT_DEPLOY_SFU_DOMAIN:-}" DEPLOY_TURN_DOMAIN="${VIDEOCHAT_DEPLOY_TURN_DOMAIN:-}" DEPLOY_CDN_DOMAIN="${VIDEOCHAT_DEPLOY_CDN_DOMAIN:-}" + DEPLOY_EXTERNAL_DOMAINS="${VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS:-}" + DEPLOY_EXTERNAL_UPSTREAM="${VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM:-}" if [[ -n "${DEPLOY_DOMAIN}" ]]; then DEPLOY_API_DOMAIN="${DEPLOY_API_DOMAIN:-api.${DEPLOY_DOMAIN}}" @@ -190,12 +197,17 @@ refresh_deploy_config() { if [[ -z "${DEPLOY_VUE_ALLOWED_HOSTS}" && -n "${DEPLOY_DOMAIN}" ]]; then DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_DOMAIN},${DEPLOY_API_DOMAIN},${DEPLOY_WS_DOMAIN},${DEPLOY_SFU_DOMAIN},${DEPLOY_TURN_DOMAIN},${DEPLOY_CDN_DOMAIN}" fi + if [[ -n "${DEPLOY_EXTERNAL_DOMAINS}" ]]; then + DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_VUE_ALLOWED_HOSTS:+${DEPLOY_VUE_ALLOWED_HOSTS},}${DEPLOY_EXTERNAL_DOMAINS}" + fi export VIDEOCHAT_DEPLOY_API_DOMAIN="${DEPLOY_API_DOMAIN}" export VIDEOCHAT_DEPLOY_WS_DOMAIN="${DEPLOY_WS_DOMAIN}" export VIDEOCHAT_DEPLOY_SFU_DOMAIN="${DEPLOY_SFU_DOMAIN}" export VIDEOCHAT_DEPLOY_TURN_DOMAIN="${DEPLOY_TURN_DOMAIN}" export VIDEOCHAT_DEPLOY_CDN_DOMAIN="${DEPLOY_CDN_DOMAIN}" + export VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS="${DEPLOY_EXTERNAL_DOMAINS}" + export VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM="${DEPLOY_EXTERNAL_UPSTREAM}" export VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_VUE_ALLOWED_HOSTS}" if [[ -n "${DEPLOY_REFRESH_KNOWN_HOSTS}" ]]; then export VIDEOCHAT_DEPLOY_REFRESH_KNOWN_HOSTS="${DEPLOY_REFRESH_KNOWN_HOSTS}" @@ -223,8 +235,11 @@ deploy_refresh_known_hosts_enabled() { deploy_dns_targets() { local target seen="" local legacy_cdn_domain="" + local external_domains=() [[ -n "${DEPLOY_DOMAIN}" ]] && legacy_cdn_domain="cnd.${DEPLOY_DOMAIN}" - for target in "${DEPLOY_DOMAIN}" "${DEPLOY_API_DOMAIN}" "${DEPLOY_WS_DOMAIN}" "${DEPLOY_SFU_DOMAIN}" "${DEPLOY_TURN_DOMAIN}" "${DEPLOY_CDN_DOMAIN}" "${legacy_cdn_domain}"; do + IFS=',' read -r -a external_domains <<< "${DEPLOY_EXTERNAL_DOMAINS}" + for target in "${DEPLOY_DOMAIN}" "${DEPLOY_API_DOMAIN}" "${DEPLOY_WS_DOMAIN}" "${DEPLOY_SFU_DOMAIN}" "${DEPLOY_TURN_DOMAIN}" "${DEPLOY_CDN_DOMAIN}" "${legacy_cdn_domain}" "${external_domains[@]}"; do + target="${target//[[:space:]]/}" [[ -n "${target}" ]] || continue case " ${seen} " in *" ${target} "*) continue ;; @@ -304,6 +319,8 @@ persist_current_deploy_config() { local_env_upsert VIDEOCHAT_DEPLOY_SFU_DOMAIN "${DEPLOY_SFU_DOMAIN}" local_env_upsert VIDEOCHAT_DEPLOY_TURN_DOMAIN "${DEPLOY_TURN_DOMAIN}" local_env_upsert VIDEOCHAT_DEPLOY_CDN_DOMAIN "${DEPLOY_CDN_DOMAIN}" + local_env_upsert VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS "${DEPLOY_EXTERNAL_DOMAINS}" + local_env_upsert VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM "${DEPLOY_EXTERNAL_UPSTREAM}" local_env_upsert VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS "${DEPLOY_VUE_ALLOWED_HOSTS}" [[ -n "${VIDEOCHAT_DEPLOY_SSH_KEY:-}" ]] && local_env_upsert VIDEOCHAT_DEPLOY_SSH_KEY "${VIDEOCHAT_DEPLOY_SSH_KEY}" [[ -n "${VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE:-}" ]] && local_env_upsert VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE "${VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE}" @@ -450,7 +467,7 @@ if command -v ufw >/dev/null 2>&1 && \${SUDO}ufw status | grep -q 'Status: activ \${SUDO}ufw allow 443/tcp >/dev/null || true \${SUDO}ufw allow 3478/tcp >/dev/null || true \${SUDO}ufw allow 3478/udp >/dev/null || true - \${SUDO}ufw allow 49160:49200/udp >/dev/null || true + \${SUDO}ufw allow 49160:49660/udp >/dev/null || true fi if command -v firewall-cmd >/dev/null 2>&1 && \${SUDO}firewall-cmd --state >/dev/null 2>&1; then @@ -458,7 +475,7 @@ if command -v firewall-cmd >/dev/null 2>&1 && \${SUDO}firewall-cmd --state >/dev \${SUDO}firewall-cmd --permanent --add-service=https >/dev/null || true \${SUDO}firewall-cmd --permanent --add-port=3478/tcp >/dev/null || true \${SUDO}firewall-cmd --permanent --add-port=3478/udp >/dev/null || true - \${SUDO}firewall-cmd --permanent --add-port=49160-49200/udp >/dev/null || true + \${SUDO}firewall-cmd --permanent --add-port=49160-49660/udp >/dev/null || true \${SUDO}firewall-cmd --reload >/dev/null || true fi @@ -496,7 +513,7 @@ sync_checkout() { } certbot_standalone() { - local deploy_path_q domain_q email_q api_domain_q ws_domain_q sfu_domain_q turn_domain_q cdn_domain_q legacy_cdn_domain_q + local deploy_path_q domain_q email_q api_domain_q ws_domain_q sfu_domain_q turn_domain_q cdn_domain_q legacy_cdn_domain_q external_domains_q deploy_path_q="$(shell_quote "${DEPLOY_PATH}")" domain_q="$(shell_quote "${DEPLOY_DOMAIN}")" email_q="$(shell_quote "${DEPLOY_EMAIL}")" @@ -506,6 +523,7 @@ certbot_standalone() { turn_domain_q="$(shell_quote "${DEPLOY_TURN_DOMAIN}")" cdn_domain_q="$(shell_quote "${DEPLOY_CDN_DOMAIN}")" legacy_cdn_domain_q="$(shell_quote "cnd.${DEPLOY_DOMAIN}")" + external_domains_q="$(shell_quote "${DEPLOY_EXTERNAL_DOMAINS}")" log "Obtaining/renewing Let's Encrypt cert for ${DEPLOY_DOMAIN}" remote_bash <ai_next) { int reuse_addr = 1; +#ifdef SO_REUSEPORT + int reuse_port = 1; +#endif listener_fd = socket(cursor->ai_family, cursor->ai_socktype, cursor->ai_protocol); if (listener_fd < 0) { @@ -256,6 +276,18 @@ static zend_result king_server_http1_open_listener_socket( sizeof(reuse_addr) ); +#ifdef SO_REUSEPORT + if (king_server_http1_reuseport_enabled()) { + (void) setsockopt( + listener_fd, + SOL_SOCKET, + SO_REUSEPORT, + &reuse_port, + sizeof(reuse_port) + ); + } +#endif + if (bind(listener_fd, cursor->ai_addr, cursor->ai_addrlen) == 0) { if (listen(listener_fd, SOMAXCONN) == 0) { *listener_fd_out = listener_fd; diff --git a/extension/tests/740-http1-listener-exclusive-bind-contract.phpt b/extension/tests/740-http1-listener-exclusive-bind-contract.phpt index c198593f4..2b1bcf656 100644 --- a/extension/tests/740-http1-listener-exclusive-bind-contract.phpt +++ b/extension/tests/740-http1-listener-exclusive-bind-contract.phpt @@ -11,12 +11,46 @@ if (!is_readable('/proc/net/tcp')) { return; } -$probe = trim((string) shell_exec( - 'command -v python3 >/dev/null 2>&1' - . ' && python3 -c ' . escapeshellarg("import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)") - . ' >/dev/null 2>&1 && printf yes' -)); -if ($probe !== 'yes') { +$probe = false; +$checkPython = proc_open( + ['command', '-v', 'python3'], + [ + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], + $pythonPipes +); +if (is_resource($checkPython)) { + stream_get_contents($pythonPipes[1]); + stream_get_contents($pythonPipes[2]); + foreach ($pythonPipes as $pipe) { + fclose($pipe); + } + $pythonExit = proc_close($checkPython); + + if ($pythonExit === 0) { + $checkReusePort = proc_open( + ['python3', '-c', "import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)"], + [ + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], + $reusePortPipes + ); + if (is_resource($checkReusePort)) { + stream_get_contents($reusePortPipes[1]); + stream_get_contents($reusePortPipes[2]); + foreach ($reusePortPipes as $pipe) { + fclose($pipe); + } + $reusePortExit = proc_close($checkReusePort); + if ($reusePortExit === 0) { + $probe = true; + } + } + } +} +if (!$probe) { echo "skip python3 with socket.SO_REUSEPORT is required"; } ?> @@ -75,7 +109,7 @@ finally: sock.close() PY; - $command = 'python3 -c ' . escapeshellarg($script) . ' ' . (int) $port; + $command = ['python3', '-c', $script, (string) $port]; $process = proc_open($command, [ 1 => ['pipe', 'w'], 2 => ['pipe', 'w'], @@ -91,8 +125,10 @@ PY; } $exitCode = proc_close($process); - if ($exitCode !== 0 && trim($stdout) === '') { - throw new RuntimeException('duplicate bind probe failed: ' . trim($stderr)); + if ($exitCode !== 0) { + throw new RuntimeException( + 'duplicate bind probe failed (exit=' . $exitCode . '): stderr=' . trim($stderr) . '; stdout=' . trim($stdout) + ); } return trim($stdout); @@ -124,7 +160,7 @@ var_dump($capture['listen_result']); var_dump($capture['listen_error']); ?> --EXPECTF-- -string(%d) "BIND_FAIL errno=%d" +string(18) "BIND_FAIL errno=98" int(426) bool(true) string(0) "" diff --git a/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt b/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt new file mode 100644 index 000000000..2d48c1be7 --- /dev/null +++ b/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt @@ -0,0 +1,132 @@ +--TEST-- +King HTTP/1 one-shot listener enables SO_REUSEPORT only for explicit trusted worker opt-in +--SKIPIF-- +/dev/null 2>&1' + . ' && python3 -c ' . escapeshellarg("import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)") + . ' >/dev/null 2>&1 && printf yes' +)); +if ($probe !== 'yes') { + echo "skip python3 with socket.SO_REUSEPORT is required"; +} +?> +--FILE-- + ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], $pipes); + if (!is_resource($process)) { + throw new RuntimeException('failed to launch duplicate bind probe'); + } + + $stdout = stream_get_contents($pipes[1]); + $stderr = stream_get_contents($pipes[2]); + foreach ($pipes as $pipe) { + fclose($pipe); + } + $exitCode = proc_close($process); + + if ($exitCode !== 0 && trim($stdout) === '') { + throw new RuntimeException('duplicate bind probe failed: ' . trim($stderr)); + } + + return trim($stdout); +} + +$server = king_server_websocket_wire_start_server('plain', 1, null, [ + 'KING_HTTP1_ENABLE_REUSEPORT' => '1', +]); +$capture = []; +$response = ''; + +try { + king_http1_wait_for_reuseport_listen_port($server['port']); + $bindAttempt = king_http1_reuseport_bind_attempt($server['port']); + + $response = king_server_http1_wire_request_retry( + $server['port'], + "GET /reuseport-opt-in HTTP/1.1\r\n" + . "Host: 127.0.0.1\r\n" + . "Connection: close\r\n\r\n" + ); +} finally { + $capture = king_server_websocket_wire_stop_server($server); +} + +$parsed = king_server_http1_wire_parse_response($response); + +var_dump($bindAttempt); +var_dump($parsed['status']); +var_dump($capture['listen_result']); +var_dump($capture['listen_error']); +?> +--EXPECT-- +string(7) "BIND_OK" +int(426) +bool(true) +string(0) "" diff --git a/extension/tests/server_websocket_wire_helper.inc b/extension/tests/server_websocket_wire_helper.inc index eedce182b..efcc7a119 100644 --- a/extension/tests/server_websocket_wire_helper.inc +++ b/extension/tests/server_websocket_wire_helper.inc @@ -2,76 +2,94 @@ /* Test helper for starting the on-wire HTTP/1/WebSocket one-shot listener harness. */ -function king_server_websocket_wire_start_server(string $mode, int $iterations = 1, ?int $fixedPort = null): array +function king_server_websocket_wire_start_server(string $mode, int $iterations = 1, ?int $fixedPort = null, array $env = []): array { - if ($fixedPort !== null) { - $port = $fixedPort; - } else { - $probe = stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); - if ($probe === false) { - throw new RuntimeException("failed to reserve on-wire HTTP/1 test port: $errstr"); + $maxAttempts = $fixedPort !== null ? 1 : 10; + + for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) { + if ($fixedPort !== null) { + $port = $fixedPort; + } else { + $probe = stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); + if ($probe === false) { + throw new RuntimeException("failed to reserve on-wire HTTP/1 test port: $errstr"); + } + + $serverName = stream_socket_get_name($probe, false); + fclose($probe); + [, $port] = explode(':', $serverName, 2); } - $serverName = stream_socket_get_name($probe, false); - fclose($probe); - [, $port] = explode(':', $serverName, 2); - } + $capture = tempnam(sys_get_temp_dir(), 'king-server-websocket-wire-'); + $extensionPath = dirname(__DIR__) . '/modules/king.so'; + $commandParts = [ + escapeshellarg(PHP_BINARY), + '-n', + '-d', + escapeshellarg('extension=' . $extensionPath), + '-d', + escapeshellarg('king.security_allow_config_override=1'), + ]; - $capture = tempnam(sys_get_temp_dir(), 'king-server-websocket-wire-'); - $extensionPath = dirname(__DIR__) . '/modules/king.so'; - $commandParts = [ - escapeshellarg(PHP_BINARY), - '-n', - '-d', - escapeshellarg('extension=' . $extensionPath), - '-d', - escapeshellarg('king.security_allow_config_override=1'), - ]; + if ($mode === 'cors-allowlist') { + $commandParts[] = '-d'; + $commandParts[] = escapeshellarg( + 'king.security_cors_allowed_origins=https://app.king.test, https://admin.king.test' + ); + } - if ($mode === 'cors-allowlist') { - $commandParts[] = '-d'; - $commandParts[] = escapeshellarg( - 'king.security_cors_allowed_origins=https://app.king.test, https://admin.king.test' - ); - } + $commandParts = array_merge($commandParts, [ + escapeshellarg(__DIR__ . '/server_websocket_wire_server.inc'), + escapeshellarg($capture), + (string) (int) $port, + escapeshellarg($mode), + (string) max(1, $iterations), + ]); - $commandParts = array_merge($commandParts, [ - escapeshellarg(__DIR__ . '/server_websocket_wire_server.inc'), - escapeshellarg($capture), - (string) (int) $port, - escapeshellarg($mode), - (string) max(1, $iterations), - ]); + $command = implode(' ', $commandParts); - $command = implode(' ', $commandParts); + $processEnv = null; + if ($env !== []) { + $baseEnv = getenv(); + $processEnv = is_array($baseEnv) ? array_merge($baseEnv, $env) : $env; + } - $process = proc_open($command, [ - 1 => ['pipe', 'w'], - 2 => ['pipe', 'w'], - ], $pipes); + $process = proc_open($command, [ + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], $pipes, null, $processEnv); - if (!is_resource($process)) { - @unlink($capture); - throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server'); - } + if (!is_resource($process)) { + @unlink($capture); + if ($fixedPort !== null || $attempt === $maxAttempts) { + throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server'); + } + continue; + } + + $ready = fgets($pipes[1]); + if ($ready === "READY\n") { + return [ + 'process' => $process, + 'pipes' => $pipes, + 'capture' => $capture, + 'port' => (int) $port, + ]; + } - $ready = fgets($pipes[1]); - if ($ready !== "READY\n") { $stderr = stream_get_contents($pipes[2]); foreach ($pipes as $pipe) { fclose($pipe); } proc_close($process); @unlink($capture); - throw new RuntimeException('on-wire HTTP/1 websocket server failed: ' . trim($stderr)); + + if ($fixedPort !== null || $attempt === $maxAttempts) { + throw new RuntimeException('on-wire HTTP/1 websocket server failed: ' . trim($stderr)); + } } - return [ - 'process' => $process, - 'pipes' => $pipes, - 'capture' => $capture, - 'port' => (int) $port, - ]; + throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server'); } function king_server_websocket_wire_stop_server(array $server): array @@ -86,9 +104,15 @@ function king_server_websocket_wire_stop_server(array $server): array $capture = []; if (is_file($server['capture'])) { - $capture = json_decode((string) file_get_contents($server['capture']), true); - if (!is_array($capture)) { + $captureRaw = (string) file_get_contents($server['capture']); + $decodedCapture = json_decode($captureRaw, true); + if (is_array($decodedCapture)) { + $capture = $decodedCapture; + } else { $capture = []; + if (json_last_error() !== JSON_ERROR_NONE) { + $capture['__json_error'] = json_last_error_msg(); + } } @unlink($server['capture']); } @@ -134,9 +158,10 @@ function king_server_http1_wire_abort_request(int $port, string $request, int $s { $deadline = microtime(true) + 3.0; $socket = false; + $message = 'connection timed out'; if (!function_exists('socket_create')) { - throw new RuntimeException('socket_create() is required for HTTP/1 abort requests'); + throw new RuntimeException('The sockets extension is required for HTTP/1 abort requests. Please enable it in php.ini.'); } do { @@ -239,7 +264,7 @@ function king_server_websocket_wire_raw_client_connect(int $port, string $path) $message = 'connection failed'; if (!function_exists('socket_create')) { - throw new RuntimeException('socket_create() is required for raw websocket client fixtures'); + throw new RuntimeException('The sockets extension is required for raw websocket client fixtures. Please enable it in php.ini.'); } do { @@ -315,7 +340,9 @@ function king_server_websocket_wire_raw_client_send_text($socket, string $payloa } elseif ($payloadLength <= 65535) { $frame .= chr(0x80 | 126) . pack('n', $payloadLength); } else { - $frame .= chr(0x80 | 127) . pack('N2', 0, $payloadLength); + $high = intdiv($payloadLength, 4294967296); + $low = $payloadLength % 4294967296; + $frame .= chr(0x80 | 127) . pack('N2', $high, $low); } $maskedPayload = '';