diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml
index 3230125e0..1b25b1c4b 100644
--- a/.github/FUNDING.yml
+++ b/.github/FUNDING.yml
@@ -1 +1 @@
-buy_me_a_coffee: https://buymeacoffee.com/kingrt
+buy_me_a_coffee: kingrt
diff --git a/demo/video-chat/backend-king-php/run-dev.sh b/demo/video-chat/backend-king-php/run-dev.sh
index 425bd7ddb..4999fc957 100755
--- a/demo/video-chat/backend-king-php/run-dev.sh
+++ b/demo/video-chat/backend-king-php/run-dev.sh
@@ -118,9 +118,14 @@ start_backend() {
local bind_port="$2"
local worker_count
worker_count="$(worker_count_for_mode "${mode}")"
+ local reuseport_value="${KING_HTTP1_ENABLE_REUSEPORT:-}"
+ if [[ -z "${reuseport_value}" && "${worker_count}" -gt 1 ]]; then
+ reuseport_value="1"
+ fi
local worker_index=1
while (( worker_index <= worker_count )); do
+ KING_HTTP1_ENABLE_REUSEPORT="${reuseport_value}" \
VIDEOCHAT_KING_PORT="${bind_port}" \
VIDEOCHAT_KING_SERVER_MODE="${mode}" \
VIDEOCHAT_KING_WORKER_INDEX="${worker_index}" \
@@ -131,6 +136,9 @@ start_backend() {
done
echo "[video-chat][king-php-backend] started ${worker_count} worker(s) in ${mode} mode on port ${bind_port}"
+ if [[ "${reuseport_value}" == "1" && "${worker_count}" -gt 1 ]]; then
+ echo "[video-chat][king-php-backend] enabled SO_REUSEPORT for trusted ${mode} worker group"
+ fi
}
normalized_mode_override="$(echo "${SERVER_MODE_OVERRIDE}" | tr '[:upper:]' '[:lower:]' | xargs || true)"
diff --git a/demo/video-chat/docker-compose.v1.yml b/demo/video-chat/docker-compose.v1.yml
index e6e0b0990..4bac36615 100644
--- a/demo/video-chat/docker-compose.v1.yml
+++ b/demo/video-chat/docker-compose.v1.yml
@@ -126,7 +126,7 @@ services:
VIDEOCHAT_V1_TURN_STATIC_AUTH_SECRET_FILE: "${VIDEOCHAT_V1_TURN_STATIC_AUTH_SECRET_FILE:-}"
VIDEOCHAT_V1_TURN_EXTERNAL_IP: "${VIDEOCHAT_V1_TURN_EXTERNAL_IP:-}"
VIDEOCHAT_V1_TURN_RELAY_MIN_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}"
- VIDEOCHAT_V1_TURN_RELAY_MAX_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}"
+ VIDEOCHAT_V1_TURN_RELAY_MAX_PORT: "${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}"
entrypoint:
- sh
- -lc
@@ -161,7 +161,7 @@ services:
ports:
- "${VIDEOCHAT_V1_TURN_PORT:-3478}:3478/tcp"
- "${VIDEOCHAT_V1_TURN_PORT:-3478}:3478/udp"
- - "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}:${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49200}/udp"
+ - "${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}:${VIDEOCHAT_V1_TURN_RELAY_MIN_PORT:-49160}-${VIDEOCHAT_V1_TURN_RELAY_MAX_PORT:-49660}/udp"
restart: unless-stopped
videochat-frontend-v1:
@@ -254,6 +254,8 @@ services:
VIDEOCHAT_EDGE_TURN_DOMAIN: "${VIDEOCHAT_DEPLOY_TURN_DOMAIN:-turn.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}"
VIDEOCHAT_EDGE_CDN_DOMAIN: "${VIDEOCHAT_DEPLOY_CDN_DOMAIN:-cdn.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}"
VIDEOCHAT_EDGE_CDN_ALIASES: "${VIDEOCHAT_DEPLOY_CDN_ALIASES:-cnd.${VIDEOCHAT_V1_PUBLIC_HOST:-127.0.0.1}}"
+ VIDEOCHAT_EDGE_EXTERNAL_DOMAINS: "${VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS:-}"
+ VIDEOCHAT_EDGE_EXTERNAL_UPSTREAM: "${VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM:-}"
VIDEOCHAT_EDGE_CERT_FILE: /run/certs/live/fullchain.pem
VIDEOCHAT_EDGE_KEY_FILE: /run/certs/live/privkey.pem
VIDEOCHAT_EDGE_API_UPSTREAM: videochat-backend-v1:18080
diff --git a/demo/video-chat/edge/edge.php b/demo/video-chat/edge/edge.php
index d15a9458d..277134354 100644
--- a/demo/video-chat/edge/edge.php
+++ b/demo/video-chat/edge/edge.php
@@ -12,6 +12,15 @@
$turnDomain = strtolower(trim((string) (getenv('VIDEOCHAT_EDGE_TURN_DOMAIN') ?: 'turn.' . $domain)));
$cdnDomain = strtolower(trim((string) (getenv('VIDEOCHAT_EDGE_CDN_DOMAIN') ?: 'cdn.' . $domain)));
$cdnAliasInput = trim((string) (getenv('VIDEOCHAT_EDGE_CDN_ALIASES') ?: 'cnd.' . $domain));
+$externalDomainInput = trim((string) getenv('VIDEOCHAT_EDGE_EXTERNAL_DOMAINS'));
+$externalDomains = [];
+foreach (preg_split('/\s*,\s*/', $externalDomainInput) ?: [] as $externalDomain) {
+ $externalDomain = strtolower(trim((string) $externalDomain));
+ if ($externalDomain !== '') {
+ $externalDomains[] = $externalDomain;
+ }
+}
+$externalDomains = array_values(array_unique($externalDomains));
$cdnDomains = [$cdnDomain];
foreach (preg_split('/\s*,\s*/', $cdnAliasInput) ?: [] as $alias) {
$alias = strtolower(trim((string) $alias));
@@ -26,6 +35,8 @@
$apiUpstream = getenv('VIDEOCHAT_EDGE_API_UPSTREAM') ?: 'videochat-backend-v1:18080';
$wsUpstream = getenv('VIDEOCHAT_EDGE_WS_UPSTREAM') ?: 'videochat-backend-ws-v1:18080';
$sfuUpstream = getenv('VIDEOCHAT_EDGE_SFU_UPSTREAM') ?: 'videochat-backend-sfu-v1:18080';
+$externalUpstream = trim((string) getenv('VIDEOCHAT_EDGE_EXTERNAL_UPSTREAM'));
+$socialPreviewImagePath = getenv('VIDEOCHAT_EDGE_SOCIAL_PREVIEW_IMAGE') ?: '/assets/orgas/kingrt/social/invitation-preview.png';
$maxHeaderBytes = (int) (getenv('VIDEOCHAT_EDGE_MAX_HEADER_BYTES') ?: '65536');
$connectTimeout = (float) (getenv('VIDEOCHAT_EDGE_CONNECT_TIMEOUT_SECONDS') ?: '5');
$httpIdleTimeout = (int) (getenv('VIDEOCHAT_EDGE_HTTP_IDLE_TIMEOUT_SECONDS') ?: '60');
@@ -286,7 +297,61 @@
};
};
-$serveStatic = static function ($client, array $request) use ($staticRoot, $writeResponse, $contentType, $cdnDomains, $assetVersion): void {
+$escapeHtml = static function (string $value): string {
+ return htmlspecialchars($value, ENT_QUOTES | ENT_SUBSTITUTE, 'UTF-8');
+};
+
+$absoluteHttpsUrl = static function (string $host, string $target) use ($domain): string {
+ $host = trim($host) !== '' ? $host : $domain;
+ $target = trim($target) !== '' ? $target : '/';
+ if ($target[0] !== '/') {
+ $target = '/' . $target;
+ }
+ return 'https://' . $host . $target;
+};
+
+$injectSocialPreview = static function (string $body, array $request) use ($domain, $cdnDomain, $socialPreviewImagePath, $escapeHtml, $absoluteHttpsUrl): string {
+ $target = (string) ($request['target'] ?: ($request['path'] ?? '/'));
+ $path = (string) ($request['path'] ?? '/');
+ $host = (string) ($request['host'] ?: $domain);
+ $assetHost = $cdnDomain !== '' ? $cdnDomain : $host;
+ $pageUrl = $absoluteHttpsUrl($host, $target);
+ $imageUrl = $absoluteHttpsUrl($assetHost, $socialPreviewImagePath);
+ $isInvite = str_starts_with($path, '/join/');
+ $title = $isInvite ? "You're invited to a KINGRT video call" : 'KINGRT Video Chat';
+ $description = $isInvite
+ ? 'Join the video call on KINGRT.'
+ : 'Run your own calls with KINGRT open-source video collaboration.';
+
+ $tags = [
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ '',
+ ];
+ $meta = implode("\n ", $tags);
+
+ if (str_contains($body, '')) {
+ return str_replace('', $meta, $body);
+ }
+
+ return str_replace('', " {$meta}\n ", $body);
+};
+
+$serveStatic = static function ($client, array $request) use ($staticRoot, $writeResponse, $contentType, $cdnDomains, $assetVersion, $injectSocialPreview): void {
$path = rawurldecode((string) $request['path']);
$isCdnAsset = in_array($request['host'], $cdnDomains, true) || str_starts_with($path, '/cdn/');
$corsHeaders = $isCdnAsset
@@ -324,6 +389,9 @@
}
$body = (string) @file_get_contents($candidate);
+ if (basename($candidate) === 'index.html') {
+ $body = $injectSocialPreview($body, $request);
+ }
$headers = [
'Content-Type' => $contentType($candidate),
'Cache-Control' => basename($candidate) === 'index.html'
@@ -395,11 +463,27 @@
$lastUpstreamReadProgress = $lastActivity;
$lastClientWriteProgress = $lastActivity;
$lastUpstreamWriteProgress = $lastActivity;
+ $closeWebSocketTunnel = static function () use (&$clientOpen, &$upstreamOpen, &$toClient, &$toUpstream): void {
+ // WebSocket tunnels cannot stay half-open: otherwise browser requests
+ // remain pending while the closed upstream socket sits in CLOSE_WAIT.
+ $clientOpen = false;
+ $upstreamOpen = false;
+ $toClient = '';
+ $toUpstream = '';
+ };
while ($clientOpen || $upstreamOpen || $toUpstream !== '' || $toClient !== '') {
if ((microtime(true) - $lastActivity) > $idleTimeout) {
break;
}
+ // Upstream may reject a websocket handshake with HTTP bytes, then close.
+ // Keep the client side alive until that buffered response is flushed.
+ if ($isWebSocket && !$upstreamOpen && $toClient === '') {
+ $clientOpen = false;
+ }
+ if ($isWebSocket && !$clientOpen && $toUpstream === '') {
+ $upstreamOpen = false;
+ }
if (!$clientOpen) {
$toClient = '';
}
@@ -447,6 +531,21 @@
foreach ($read as $stream) {
$chunk = @fread($stream, 16384);
if ($chunk === false) {
+ if ($isWebSocket) {
+ if ($stream === $upstreamStream && $toClient !== '') {
+ $upstreamOpen = false;
+ $madeProgress = true;
+ continue;
+ }
+ if ($stream === $client && $toUpstream !== '') {
+ $clientOpen = false;
+ $madeProgress = true;
+ continue;
+ }
+ $closeWebSocketTunnel();
+ $madeProgress = true;
+ continue;
+ }
if ($stream === $client) {
$clientOpen = false;
} else {
@@ -456,6 +555,21 @@
}
if ($chunk === '') {
if (feof($stream)) {
+ if ($isWebSocket) {
+ if ($stream === $upstreamStream && $toClient !== '') {
+ $upstreamOpen = false;
+ $madeProgress = true;
+ continue;
+ }
+ if ($stream === $client && $toUpstream !== '') {
+ $clientOpen = false;
+ $madeProgress = true;
+ continue;
+ }
+ $closeWebSocketTunnel();
+ $madeProgress = true;
+ continue;
+ }
if ($stream === $client) {
$clientOpen = false;
} else {
@@ -510,12 +624,20 @@
if ($stream === $upstreamStream && $toUpstream !== '') {
$written = @fwrite($upstreamStream, $toUpstream);
if ($written === false) {
+ if ($isWebSocket) {
+ $closeWebSocketTunnel();
+ continue;
+ }
$upstreamOpen = false;
$toUpstream = '';
continue;
}
if ($written === 0) {
if ((microtime(true) - $lastUpstreamWriteProgress) >= $writeStallTimeout) {
+ if ($isWebSocket) {
+ $closeWebSocketTunnel();
+ continue;
+ }
$upstreamOpen = false;
$toUpstream = '';
} else {
@@ -531,12 +653,20 @@
if ($stream === $client && $toClient !== '') {
$written = @fwrite($client, $toClient);
if ($written === false) {
+ if ($isWebSocket) {
+ $closeWebSocketTunnel();
+ continue;
+ }
$clientOpen = false;
$toClient = '';
continue;
}
if ($written === 0) {
if ((microtime(true) - $lastClientWriteProgress) >= $writeStallTimeout) {
+ if ($isWebSocket) {
+ $closeWebSocketTunnel();
+ continue;
+ }
$clientOpen = false;
$toClient = '';
} else {
@@ -559,9 +689,12 @@
@fclose($upstreamStream);
};
-$route = static function (array $request) use ($domain, $apiDomain, $wsDomain, $sfuDomain, $turnDomain, $cdnDomains, $apiUpstream, $wsUpstream, $sfuUpstream): ?string {
+$route = static function (array $request) use ($domain, $apiDomain, $wsDomain, $sfuDomain, $turnDomain, $cdnDomains, $externalDomains, $apiUpstream, $wsUpstream, $sfuUpstream, $externalUpstream): ?string {
$host = $request['host'];
$path = $request['path'];
+ if ($externalUpstream !== '' && in_array($host, $externalDomains, true)) {
+ return $externalUpstream;
+ }
if (in_array($host, $cdnDomains, true)) {
return 'static';
}
diff --git a/demo/video-chat/frontend-vue/index.html b/demo/video-chat/frontend-vue/index.html
index bab8a3839..e26f279fc 100644
--- a/demo/video-chat/frontend-vue/index.html
+++ b/demo/video-chat/frontend-vue/index.html
@@ -4,7 +4,8 @@
-
King Video Chat (Vue)
+
+ KINGRT Video Chat
diff --git a/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png b/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png
new file mode 100644
index 000000000..cdfa5c784
Binary files /dev/null and b/demo/video-chat/frontend-vue/public/assets/orgas/kingrt/social/invitation-preview.png differ
diff --git a/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue b/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue
index de63dc73f..309c7f530 100644
--- a/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue
+++ b/demo/video-chat/frontend-vue/src/domain/calls/access/JoinView.vue
@@ -218,6 +218,7 @@ import {
handleAssetVersionSocketPayload,
} from '../../../support/assetVersion';
import { attachForegroundReconnectHandlers } from '../../../support/foregroundReconnect';
+import { buildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints';
import {
applyCallBackgroundPreset as applyBackgroundPreset,
attachCallMediaDeviceWatcher,
@@ -272,7 +273,11 @@ const {
playSpeakerTestSound,
startPreview,
stopPreview,
-} = createJoinAccessPreviewController({ previewVideoRef, state });
+} = createJoinAccessPreviewController({
+ previewVideoRef,
+ state,
+ buildOptionalCallAudioCaptureConstraints,
+});
function normalizeAccessId(value) {
return String(value || '').trim().toLowerCase();
diff --git a/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js b/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js
index 8f542921d..af7b759fc 100644
--- a/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js
+++ b/demo/video-chat/frontend-vue/src/domain/calls/access/joinPreview.js
@@ -1,6 +1,6 @@
import { nextTick } from 'vue';
import { BackgroundFilterController } from '../../realtime/background/controller';
-import { buildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints';
+import { buildOptionalCallAudioCaptureConstraints as defaultBuildOptionalCallAudioCaptureConstraints } from '../../realtime/media/audioCaptureConstraints';
import { callMediaPrefs } from '../../realtime/media/preferences';
function finiteNumber(value, fallback) {
@@ -74,7 +74,9 @@ function applyVolumeToStreams(streams) {
}
}
-function buildPreviewConstraints() {
+function buildPreviewConstraints(
+ buildOptionalCallAudioCaptureConstraints = defaultBuildOptionalCallAudioCaptureConstraints,
+) {
const cameraDeviceId = String(callMediaPrefs.selectedCameraId || '').trim();
const microphoneDeviceId = String(callMediaPrefs.selectedMicrophoneId || '').trim();
return {
@@ -94,7 +96,11 @@ function stopStreams(streams) {
}
}
-export function createJoinAccessPreviewController({ previewVideoRef, state }) {
+export function createJoinAccessPreviewController({
+ previewVideoRef,
+ state,
+ buildOptionalCallAudioCaptureConstraints = defaultBuildOptionalCallAudioCaptureConstraints,
+}) {
const backgroundController = new BackgroundFilterController();
let rawStream = null;
let previewStream = null;
@@ -133,7 +139,7 @@ export function createJoinAccessPreviewController({ previewVideoRef, state }) {
}
try {
- rawStream = await navigator.mediaDevices.getUserMedia(buildPreviewConstraints());
+ rawStream = await navigator.mediaDevices.getUserMedia(buildPreviewConstraints(buildOptionalCallAudioCaptureConstraints));
applyVolumeToStreams([rawStream]);
previewStream = rawStream;
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue b/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue
index 34931e549..e471b8cfa 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/CallWorkspaceView.vue
@@ -512,17 +512,16 @@ const {
getRoomId: () => activeRoomId.value,
});
-let clearRemoteVideoStallTimer = () => {};
+let clearRemoteVideoStallTimer;
let isNativeWebRtcRuntimePath = () => false;
let isWlvcRuntimePath = () => false;
-let nativeAudioBridgeFailureMessage = () => defaultNativeAudioBridgeFailureMessage();
-let resetWlvcEncoderAfterDroppedEncodedFrame = () => {};
-let resetBackgroundRuntimeMetrics = () => {};
-let restartSfuAfterVideoStall = () => {};
-let shouldBlockNativeRuntimeSignaling = () => false;
-let shouldUseNativeAudioBridge = () => false;
-let startRemoteVideoStallTimer = () => {};
-let stopActivityMonitor = () => {};
+let nativeAudioBridgeFailureMessage;
+let resetBackgroundRuntimeMetrics;
+let restartSfuAfterVideoStall;
+let shouldBlockNativeRuntimeSignaling;
+let shouldUseNativeAudioBridge;
+let startRemoteVideoStallTimer;
+let stopActivityMonitor;
const routeCallRef = computed(() => String(route.params.callRef || '').trim());
const desiredRoomId = computed(() => normalizeRoomId(routeCallResolve.roomId || routeCallRef.value || 'lobby'));
@@ -606,34 +605,30 @@ const showLeftSidebarRestoreButton = computed(() => {
return !isCompactViewport.value && isShellLeftSidebarCollapsed.value;
});
-let canProtectCurrentNativeTargets = () => false;
-let canProtectCurrentSfuTargets = () => false;
-let clearMediaSecurityHandshakeWatchdog = () => {};
-let clearMediaSecurityResyncTimer = () => {};
-let clearMediaSecuritySfuPublisherSeen = () => {};
-let clearMediaSecuritySignalCaches = () => {};
-let clearNativeAudioBridgeQuarantine = () => false;
-let currentMediaSecurityRuntimePath = () => 'wlvc_sfu';
-let ensureMediaSecuritySession = () => null;
-let ensureNativeAudioBridgeSecurityReady = async () => false;
-let handleMediaSecuritySignal = async () => {};
-let hintMediaSecuritySync = () => {};
-let mediaSecurityEligibleTargetIds = () => [];
-let mediaSecurityTargetIds = () => [];
-let nativeAudioBridgeIsQuarantined = () => false;
-let nativeAudioSecurityBannerMessage = computed(() => '');
-let noteMediaSecuritySfuPublisherSeen = () => {};
-let recoverMediaSecurityForPublisher = () => {};
-let reportNativeAudioBridgeFailure = () => {};
-let resyncNativeAudioBridgePeerAfterSecurityReady = () => false;
-let scheduleMediaSecurityParticipantSync = () => {};
-let sendMediaSecurityHello = async () => false;
-let sendMediaSecuritySenderKey = async () => false;
-let shouldBypassNativeAudioProtectionForPeer = () => false;
-let shouldRecoverMediaSecurityFromFrameError = () => false;
-let shouldSendTransportOnlySfuFrame = () => false;
-let startMediaSecurityHandshakeWatchdog = () => {};
-let syncMediaSecurityWithParticipants = async () => {};
+let canProtectCurrentSfuTargets;
+let clearMediaSecurityHandshakeWatchdog;
+let clearMediaSecurityResyncTimer;
+let clearMediaSecuritySfuPublisherSeen;
+let clearMediaSecuritySignalCaches;
+let currentMediaSecurityRuntimePath;
+let ensureMediaSecuritySession;
+let ensureNativeAudioBridgeSecurityReady;
+let handleMediaSecuritySignal;
+let hintMediaSecuritySync;
+let mediaSecurityTargetIds;
+let nativeAudioBridgeIsQuarantined;
+let nativeAudioSecurityBannerMessage;
+let noteMediaSecuritySfuPublisherSeen;
+let recoverMediaSecurityForPublisher;
+let reportNativeAudioBridgeFailure;
+let resyncNativeAudioBridgePeerAfterSecurityReady;
+let scheduleMediaSecurityParticipantSync;
+let sendMediaSecurityHello;
+let shouldBypassNativeAudioProtectionForPeer;
+let shouldRecoverMediaSecurityFromFrameError;
+let shouldSendTransportOnlySfuFrame;
+let startMediaSecurityHandshakeWatchdog;
+let syncMediaSecurityWithParticipants;
let appendChatMessage = () => {};
let applyActivitySnapshot = () => {};
let applyCallLayoutPayload = () => {};
@@ -642,53 +637,48 @@ let applyReactionEvent = () => {};
let applyRemoteControlState = () => false;
let applyTypingEvent = () => {};
let clearAdmissionGate = () => {};
-let clearErrors = () => {};
-let clearLobbyActionText = () => {};
-let clearLobbyToastTimer = () => {};
-let clearTransientActivityPublishErrorNotice = () => {};
-let closeNativePeerConnection = () => {};
+let clearErrors;
+let clearLobbyActionText;
+let clearLobbyToastTimer;
+let clearTransientActivityPublishErrorNotice;
+let closeNativePeerConnection;
let hangupCall = () => {};
let hideLobbyJoinToast = () => {};
let markParticipantActivity = () => {};
let normalizeLobbyEntry = (entry) => entry;
let nativeAudioSecurityTelemetrySnapshot = () => null;
let notifyLobbyJoinRequests = () => {};
-let peerControlSnapshot = () => ({
- handRaised: false,
- cameraEnabled: true,
- micEnabled: true,
- screenEnabled: false,
-});
+let peerControlSnapshot;
let pruneParticipantActivity = () => {};
-let refreshUsersDirectory = async () => {};
+let refreshUsersDirectory;
let refreshUsersDirectoryPresentation = () => {};
let reportNativeAudioSdpRejected = () => {};
let requestRoomSnapshot = () => {};
-let resetPeerControlState = () => {};
+let resetPeerControlState;
let scheduleNativePeerAudioTrackDeadline = () => {};
-let sendRoomJoin = () => false;
+let sendRoomJoin;
let sendNativeOffer = async () => {};
-let setAdmissionGate = () => {};
-let setActiveTab = () => {};
+let setAdmissionGate;
+let setActiveTab;
let setNativePeerAudioBridgeState = () => {};
let setNotice = () => {};
let shouldSyncNativeLocalTracksBeforeOffer = () => false;
-let shouldSuppressCallAckNotice = () => false;
+let shouldSuppressCallAckNotice;
let syncNativePeerConnectionsWithRoster = () => {};
let syncNativePeerLocalTracks = () => {};
let synchronizeNativePeerMediaElements = () => {};
let ensureNativePeerConnection = () => null;
-let shouldSuppressExpectedSignalingError = () => false;
-let syncControlStateToPeers = async () => false;
-let syncModerationStateToPeers = async () => false;
-let tryDirectJoinWithModeratorBypass = () => false;
+let shouldSuppressExpectedSignalingError;
+let syncControlStateToPeers;
+let syncModerationStateToPeers;
+let tryDirectJoinWithModeratorBypass;
let applyCallOutputPreferences = () => {};
let currentSfuVideoProfile = computed(() => 'quality');
let downgradeSfuVideoQualityAfterEncodePressure = () => false;
let initSFU = () => {};
let maybeFallbackToNativeRuntime = async () => false;
-let removeSfuRemotePeersForUserId = () => false;
-let setMediaRuntimePath = () => false;
+let removeSfuRemotePeersForUserId;
+let setMediaRuntimePath;
let stopSfuTrackAnnounceTimer = () => {};
let switchMediaRuntimePath = async () => false;
let teardownRemotePeer = () => {};
@@ -913,19 +903,16 @@ const mediaSecurityRuntimeState = {
};
({
- canProtectCurrentNativeTargets,
canProtectCurrentSfuTargets,
clearMediaSecurityHandshakeWatchdog,
clearMediaSecurityResyncTimer,
clearMediaSecuritySfuPublisherSeen,
clearMediaSecuritySignalCaches,
- clearNativeAudioBridgeQuarantine,
currentMediaSecurityRuntimePath,
ensureMediaSecuritySession,
ensureNativeAudioBridgeSecurityReady,
handleMediaSecuritySignal,
hintMediaSecuritySync,
- mediaSecurityEligibleTargetIds,
mediaSecurityTargetIds,
nativeAudioBridgeIsQuarantined,
nativeAudioSecurityBannerMessage,
@@ -935,7 +922,6 @@ const mediaSecurityRuntimeState = {
resyncNativeAudioBridgePeerAfterSecurityReady,
scheduleMediaSecurityParticipantSync,
sendMediaSecurityHello,
- sendMediaSecuritySenderKey,
shouldBypassNativeAudioProtectionForPeer,
shouldRecoverMediaSecurityFromFrameError,
shouldSendTransportOnlySfuFrame,
@@ -1091,11 +1077,11 @@ const localPublisherPipelineState = {
const mediaStack = createCallWorkspaceMediaStack({
callbacks: {
applyCallOutputPreferences: (...args) => applyCallOutputPreferences(...args),
- canProtectCurrentSfuTargets,
+ canProtectCurrentSfuTargets: (...args) => canProtectCurrentSfuTargets(...args),
captureClientDiagnostic,
captureClientDiagnosticError,
clearRemoteVideoContainer,
- clearTransientActivityPublishErrorNotice,
+ clearTransientActivityPublishErrorNotice: (...args) => clearTransientActivityPublishErrorNotice(...args),
currentSfuVideoProfile: (...args) => (
typeof currentSfuVideoProfile === 'function'
? currentSfuVideoProfile(...args)
@@ -1261,7 +1247,6 @@ const mediaStack = createCallWorkspaceMediaStack({
isNativeWebRtcRuntimePath,
isWlvcRuntimePath,
nativeAudioBridgeFailureMessage,
- resetWlvcEncoderAfterDroppedEncodedFrame,
shouldBlockNativeRuntimeSignaling,
shouldUseNativeAudioBridge,
startRemoteVideoStallTimer,
@@ -1410,9 +1395,9 @@ const nativeStack = createCallWorkspaceNativeStack({
createNativePeerAudioElement,
createNativePeerVideoElement,
currentMediaSecurityRuntimePath,
- currentNativeAudioBridgeFailureMessage: nativeAudioBridgeFailureMessage,
- currentShouldUseNativeAudioBridge: shouldUseNativeAudioBridge,
- shouldUseNativeAudioBridge,
+ currentNativeAudioBridgeFailureMessage: (...args) => nativeAudioBridgeFailureMessage(...args),
+ currentShouldUseNativeAudioBridge: (...args) => shouldUseNativeAudioBridge(...args),
+ shouldUseNativeAudioBridge: (...args) => shouldUseNativeAudioBridge(...args),
currentUserId: () => currentUserId.value,
ensureLocalMediaForPublish: () => publishLocalTracks(),
ensureMediaSecuritySession,
@@ -1420,8 +1405,8 @@ const nativeStack = createCallWorkspaceNativeStack({
extractDiagnosticMessage,
getMediaRuntimePath: () => mediaRuntimePath.value,
getPeerByUserId: (userId) => nativePeerConnectionsRef.value.get(userId) || null,
- getPeerControlSnapshot: peerControlSnapshot,
- isNativeWebRtcRuntimePath,
+ getPeerControlSnapshot: (...args) => peerControlSnapshot(...args),
+ isNativeWebRtcRuntimePath: (...args) => isNativeWebRtcRuntimePath(...args),
markParticipantActivity,
mediaDebugLog,
nativeAudioBridgeFailureMessage,
@@ -1429,10 +1414,10 @@ const nativeStack = createCallWorkspaceNativeStack({
nativePeerHasLocalLiveAudioSender,
renderCallVideoLayout: () => renderCallVideoLayout(),
renderNativeRemoteVideos,
- reportNativeAudioBridgeFailure,
+ reportNativeAudioBridgeFailure: (...args) => reportNativeAudioBridgeFailure(...args),
reportNativeAudioSdpRejected,
reconfigureLocalTracksFromSelectedDevices: (...args) => reconfigureLocalTracksFromSelectedDevices(...args),
- resyncNativeAudioBridgePeerAfterSecurityReady,
+ resyncNativeAudioBridgePeerAfterSecurityReady: (...args) => resyncNativeAudioBridgePeerAfterSecurityReady(...args),
sendSocketFrame,
sessionToken: () => sessionState.sessionToken,
shouldBypassNativeAudioProtectionForPeer,
@@ -1565,11 +1550,11 @@ const {
applyViewerContext,
appendChatMessage: (...args) => appendChatMessage(...args),
captureClientDiagnostic,
- clearAdmissionGate,
- clearErrors,
- clearLobbyActionText,
- clearTransientActivityPublishErrorNotice,
- closeNativePeerConnection,
+ clearAdmissionGate: (...args) => clearAdmissionGate(...args),
+ clearErrors: (...args) => clearErrors(...args),
+ clearLobbyActionText: (...args) => clearLobbyActionText(...args),
+ clearTransientActivityPublishErrorNotice: (...args) => clearTransientActivityPublishErrorNotice(...args),
+ closeNativePeerConnection: (...args) => closeNativePeerConnection(...args),
closeSocketLocal: (...args) => closeSocket(...args),
downgradeSfuVideoQualityAfterEncodePressure: (...args) => downgradeSfuVideoQualityAfterEncodePressure(...args),
ensureRoomBuckets,
@@ -1578,28 +1563,29 @@ const {
handleAssetVersionConnectionFailure,
handleAssetVersionSocketClose,
handleAssetVersionSocketPayload,
- handleMediaSecuritySignal,
+ handleMediaSecuritySignal: (...args) => handleMediaSecuritySignal(...args),
handleNativeSignalingEvent,
- hideLobbyJoinToast,
+ hideLobbyJoinToast: (...args) => hideLobbyJoinToast(...args),
mediaDebugLog,
normalizeRoomId,
redirectInvitedRouteToJoinModal,
- refreshUsersDirectory,
- refreshUsersDirectoryPresentation,
+ refreshUsersDirectory: (...args) => refreshUsersDirectory(...args),
+ refreshUsersDirectoryPresentation: (...args) => refreshUsersDirectoryPresentation(...args),
removeParticipantFromSnapshot,
- removeSfuRemotePeersForUserId, requestWlvcFullFrameKeyframe: (...args) => requestWlvcFullFrameKeyframe(...args),
+ removeSfuRemotePeersForUserId: (...args) => removeSfuRemotePeersForUserId(...args),
+ requestWlvcFullFrameKeyframe: (...args) => requestWlvcFullFrameKeyframe(...args),
requestHeaders,
requestRoomSnapshot,
- resetPeerControlState,
+ resetPeerControlState: (...args) => resetPeerControlState(...args),
scheduleNativeOfferRetryForUserId,
sendMediaSecuritySync: (isReconnectOpen) => syncMediaSecurityWithParticipants(isReconnectOpen),
- sendRoomJoin,
- setAdmissionGate,
+ sendRoomJoin: (...args) => sendRoomJoin(...args),
+ setAdmissionGate: (...args) => setAdmissionGate(...args),
setBackendWebSocketOrigin,
- setNotice,
- syncControlStateToPeers,
- syncModerationStateToPeers,
- tryDirectJoinWithModeratorBypass,
+ setNotice: (...args) => setNotice(...args),
+ syncControlStateToPeers: (...args) => syncControlStateToPeers(...args),
+ syncModerationStateToPeers: (...args) => syncModerationStateToPeers(...args),
+ tryDirectJoinWithModeratorBypass: (...args) => tryDirectJoinWithModeratorBypass(...args),
},
constants: {
callStateSignalTypes: CALL_STATE_SIGNAL_TYPES,
@@ -1630,9 +1616,9 @@ const {
serverRoomId,
sessionState,
sfuTransportState,
- shouldBlockNativeRuntimeSignaling,
- shouldSuppressCallAckNotice,
- shouldSuppressExpectedSignalingError,
+ shouldBlockNativeRuntimeSignaling: (...args) => shouldBlockNativeRuntimeSignaling(...args),
+ shouldSuppressCallAckNotice: (...args) => shouldSuppressCallAckNotice(...args),
+ shouldSuppressExpectedSignalingError: (...args) => shouldSuppressExpectedSignalingError(...args),
showAdmissionGate,
socketRef,
socketUrlForRoom,
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js
index b10f8b388..97970d1ec 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherCaptureWorker.js
@@ -123,7 +123,7 @@ async function handleReadback(payload = {}) {
let drawImageMs;
let readbackMs;
let imageData;
- let frameSize;
+ let frameSize = null;
try {
frameSize = resolveWorkerFrameSize(source, payload);
const context = ensureCaptureCanvas(frameSize.frameWidth, frameSize.frameHeight);
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js
index 80a311287..d7e4ecf4a 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/local/publisherPipeline.js
@@ -546,7 +546,6 @@ export function createLocalPublisherPipelineHelpers({
mediaDebugLog('[SFU] WLVC encoder unavailable during source aspect sizing');
return;
}
- let frameImageData = imageData;
let tilePatchMetadata = null;
let tilePatchTransportMetrics = null;
let encoded = null;
@@ -580,7 +579,6 @@ export function createLocalPublisherPipelineHelpers({
videoProfile.frameQuality,
);
if (patchEncoder) {
- frameImageData = selectivePatchPlan.patchImageData;
tilePatchMetadata = selectivePatchPlan.tilePatch;
tilePatchTransportMetrics = {
selection_tile_count: selectivePatchPlan.changedTileCount,
@@ -588,7 +586,7 @@ export function createLocalPublisherPipelineHelpers({
selection_tile_ratio: Number(selectivePatchPlan.selectedTileRatio.toFixed(6)),
selection_mask_guided: selectivePatchPlan.matteGuided,
};
- encoded = patchEncoder.encodeFrame(frameImageData, timestamp);
+ encoded = patchEncoder.encodeFrame(selectivePatchPlan.patchImageData, timestamp);
encodedFrameType = 'keyframe';
}
}
@@ -618,7 +616,6 @@ export function createLocalPublisherPipelineHelpers({
videoProfile.frameQuality,
);
if (patchEncoder) {
- frameImageData = backgroundSnapshotPlan.patchImageData;
tilePatchMetadata = backgroundSnapshotPlan.tilePatch;
tilePatchTransportMetrics = {
selection_tile_count: backgroundSnapshotPlan.changedTileCount,
@@ -626,7 +623,7 @@ export function createLocalPublisherPipelineHelpers({
selection_tile_ratio: Number(backgroundSnapshotPlan.selectedTileRatio.toFixed(6)),
selection_mask_guided: backgroundSnapshotPlan.matteGuided,
};
- encoded = patchEncoder.encodeFrame(frameImageData, timestamp);
+ encoded = patchEncoder.encodeFrame(backgroundSnapshotPlan.patchImageData, timestamp);
encodedFrameType = 'keyframe';
}
}
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js b/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js
index 35cceb15d..42c6cf227 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/media/security.js
@@ -64,9 +64,11 @@ function nativeEncodedFrameAadTrackId(trackKind = 'data') {
return 'native_data';
}
+const VALID_PROTECTED_CODEC_IDS = new Set(['wlvc_wasm', 'wlvc_ts', 'webcodecs_vp8', 'wlvc_unknown']);
+
function normalizeProtectedCodecId(value, runtimePath = '') {
const normalized = asString(value).toLowerCase();
- if (normalized === 'wlvc_wasm' || normalized === 'wlvc_ts' || normalized === 'webcodecs_vp8' || normalized === 'wlvc_unknown') return normalized;
+ if (VALID_PROTECTED_CODEC_IDS.has(normalized)) return normalized;
if (asString(runtimePath).toLowerCase() === 'webrtc_native') return 'webrtc_native';
return 'wlvc_unknown';
}
@@ -385,7 +387,7 @@ export class MediaSecuritySession {
try {
await this.handleSenderKeySignal(sender, pending);
} catch (error) {
- if (asString(error?.message || error).trim().toLowerCase() === 'participant_set_mismatch') {
+ if (this.isParticipantSetMismatchError(error)) {
continue;
}
throw error;
@@ -394,6 +396,15 @@ export class MediaSecuritySession {
return true;
}
+ isParticipantSetMismatchError(error) {
+ const code = asString(error?.code).trim().toUpperCase();
+ if (code === 'PARTICIPANT_SET_MISMATCH') {
+ return true;
+ }
+ // Backward compatibility for older throw sites that only set message text.
+ return asString(error?.message || error).trim().toLowerCase() === 'participant_set_mismatch';
+ }
+
async buildSenderKeySignal(targetUserId) {
if (!(await this.ensureReady())) return null;
const target = normalizeUserId(targetUserId);
@@ -418,7 +429,7 @@ export class MediaSecuritySession {
asString(peer.participantSetHash) !== participantSetHash
|| asString(peer.transcriptHash) !== transcriptHash
) {
- throw new Error('participant_set_mismatch');
+ throw new Error('Participant set mismatch detected (participant_set_mismatch)');
}
peer.participantSetHash = participantSetHash;
peer.transcriptHash = transcriptHash;
@@ -472,7 +483,7 @@ export class MediaSecuritySession {
if (payload.contract_name !== SESSION_CONTRACT_NAME || payload.contract_version !== CONTRACT_VERSION) return false;
const payloadKexSuite = normalizeKexSuite(payload.kex_suite);
if (payloadKexSuite === '' || payloadKexSuite !== keyContext.kexSuite || payload.media_suite !== MEDIA_SUITE) {
- throw new Error('downgrade_attempt');
+ throw new Error('KEX/media suite downgrade attempt detected');
}
const participantSetHash = await this.participantHashForPeer(sender);
const transcriptHash = await this.transcriptHashForPeer({
@@ -498,7 +509,8 @@ export class MediaSecuritySession {
const epoch = Number(payload.epoch || 0);
const senderKeyId = asString(payload.sender_key_id);
- if (epoch < 1 || senderKeyId === '') throw new Error('wrong_key_id');
+ if (epoch < 1) throw new Error('invalid_epoch');
+ if (senderKeyId === '') throw new Error('missing_sender_key_id');
const subtle = subtleCrypto();
const nonce = base64UrlToBytes(payload.nonce);
@@ -803,10 +815,11 @@ export class MediaSecuritySession {
if (!sender || typeof sender.createEncodedStreams !== 'function') return false;
if (this.nativeSenders.has(sender)) return true;
const streams = sender.createEncodedStreams();
- if (!streams?.readable || !streams?.writable || typeof TransformStream !== 'function') return false;
+ const NativeTransformStream = globalThis.TransformStream;
+ if (!streams?.readable || !streams?.writable || typeof NativeTransformStream !== 'function') return false;
this.nativeSenders.add(sender);
streams.readable
- .pipeThrough(new TransformStream({
+ .pipeThrough(new NativeTransformStream({
transform: async (encodedFrame, controller) => {
try {
encodedFrame.data = await this.protectNativeEncodedFrame(encodedFrame, { trackKind, trackId });
@@ -828,10 +841,11 @@ export class MediaSecuritySession {
if (!receiver || typeof receiver.createEncodedStreams !== 'function') return false;
if (this.nativeReceivers.has(receiver)) return true;
const streams = receiver.createEncodedStreams();
- if (!streams?.readable || !streams?.writable || typeof TransformStream !== 'function') return false;
+ const NativeTransformStream = globalThis.TransformStream;
+ if (!streams?.readable || !streams?.writable || typeof NativeTransformStream !== 'function') return false;
this.nativeReceivers.add(receiver);
streams.readable
- .pipeThrough(new TransformStream({
+ .pipeThrough(new NativeTransformStream({
transform: async (encodedFrame, controller) => {
try {
encodedFrame.data = await this.decryptNativeEncodedFrame(encodedFrame, senderUserId, { trackId });
@@ -854,7 +868,7 @@ export class MediaSecuritySession {
&& typeof RTCRtpSender.prototype?.createEncodedStreams === 'function'
&& typeof RTCRtpReceiver !== 'undefined'
&& typeof RTCRtpReceiver.prototype?.createEncodedStreams === 'function'
- && typeof TransformStream === 'function';
+ && typeof globalThis.TransformStream === 'function';
}
}
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js
index eb0ee498f..33907bc28 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/runtimeConfig.js
@@ -35,9 +35,9 @@ export const MEDIA_SECURITY_HANDSHAKE_TIMEOUT_MS = MEDIA_SECURITY_HANDSHAKE_RETR
export const MEDIA_SECURITY_HANDSHAKE_WATCHDOG_INTERVAL_MS = 1000;
export const MEDIA_SECURITY_SFU_TARGET_SETTLE_MS = 50;
export const SFU_AUTO_QUALITY_DOWNGRADE_COOLDOWN_MS = 2500;
-export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 1000;
-export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 2;
-export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 2;
+export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 750;
+export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 1;
+export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 1;
export const SFU_AUTO_QUALITY_RECOVERY_STABLE_WINDOW_MS = 15_000;
export const SFU_AUTO_QUALITY_RECOVERY_MIN_INTERVAL_MS = 30_000;
export const SFU_AUTO_QUALITY_RECOVERY_MAX_READBACK_BUDGET_RATIO = 0.6;
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js
index 272768238..e91461a3e 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/callWorkspace/socketLifecycle.js
@@ -3,6 +3,8 @@ import {
shouldRequestSfuFullKeyframeForReason,
} from '../../sfu/recoveryReasons';
+const WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000;
+
export function createCallWorkspaceSocketHelpers({
callbacks,
constants,
@@ -528,6 +530,7 @@ export function createCallWorkspaceSocketHelpers({
const leaveRoom = options?.leaveRoom === true;
clearReconnectTimer();
clearPingTimer();
+ state.connectInFlight = false;
refs.hasRealtimeRoomSync.value = false;
hideLobbyJoinToast();
const socket = refs.socketRef.value;
@@ -624,9 +627,17 @@ export function createCallWorkspaceSocketHelpers({
}
async function connectSocket() {
+ if (state.connectInFlight && !state.manualSocketClose) return;
const generation = ++state.connectGeneration;
+ state.connectInFlight = true;
+ const finishConnectInFlight = () => {
+ if (generation === state.connectGeneration) {
+ state.connectInFlight = false;
+ }
+ };
const token = String(refs.sessionState.sessionToken || '').trim();
if (token === '') {
+ finishConnectInFlight();
refs.connectionReason.value = 'missing_session';
refs.connectionState.value = 'expired';
return;
@@ -657,6 +668,7 @@ export function createCallWorkspaceSocketHelpers({
const sessionProbe = await probeWorkspaceSession();
if (generation !== state.connectGeneration || state.manualSocketClose) {
+ finishConnectInFlight();
return;
}
if (!sessionProbe.ok) {
@@ -667,6 +679,7 @@ export function createCallWorkspaceSocketHelpers({
} else {
refs.connectionState.value = sessionProbe.state;
setNotice(sessionProbe.message, 'error');
+ finishConnectInFlight();
return;
}
}
@@ -676,6 +689,7 @@ export function createCallWorkspaceSocketHelpers({
refs.connectionState.value = 'blocked';
refs.connectionReason.value = 'secure_transport_required';
setNotice('Secure WebSocket transport is required. Configure HTTPS/WSS backend origins.', 'error');
+ finishConnectInFlight();
return;
}
@@ -684,6 +698,7 @@ export function createCallWorkspaceSocketHelpers({
if (originIndex >= orderedSocketOrigins.length) {
refs.connectionState.value = 'retrying';
refs.connectionReason.value = 'socket_unreachable';
+ finishConnectInFlight();
scheduleReconnect();
return;
}
@@ -701,25 +716,41 @@ export function createCallWorkspaceSocketHelpers({
} catch {
// ignore
}
+ finishConnectInFlight();
return;
}
refs.socketRef.value = socket;
let opened = false;
let failedOver = false;
+ let failoverAfterClose = false;
+ let negotiationTimer = null;
+ const clearNegotiationTimer = () => {
+ if (negotiationTimer === null) return;
+ clearTimeout(negotiationTimer);
+ negotiationTimer = null;
+ };
+ const connectNextOrigin = () => {
+ connectWithOriginAt(originIndex + 1);
+ };
- const failOverToNextOrigin = () => {
+ const failOverToNextOrigin = (closeReason = 'failover') => {
if (failedOver) return;
failedOver = true;
+ clearNegotiationTimer();
if (refs.socketRef.value === socket) {
refs.socketRef.value = null;
}
try {
- socket.close(1000, 'failover');
+ socket.close(1000, closeReason);
} catch {
// ignore
}
- connectWithOriginAt(originIndex + 1);
+ if (socket.readyState === WebSocket.CONNECTING || socket.readyState === WebSocket.CLOSING) {
+ failoverAfterClose = true;
+ return;
+ }
+ connectNextOrigin();
};
const failOverAfterAssetVersionProbe = () => {
@@ -728,19 +759,26 @@ export function createCallWorkspaceSocketHelpers({
: false;
if (assetVersionProbe && typeof assetVersionProbe.then === 'function') {
assetVersionProbe.then((handled) => {
- if (handled) return;
+ if (handled) {
+ finishConnectInFlight();
+ return;
+ }
failOverToNextOrigin();
}).catch(() => {
failOverToNextOrigin();
});
return;
}
- if (assetVersionProbe) return;
+ if (assetVersionProbe) {
+ finishConnectInFlight();
+ return;
+ }
failOverToNextOrigin();
};
socket.addEventListener('open', () => {
if (generation !== state.connectGeneration || state.manualSocketClose) {
+ clearNegotiationTimer();
try {
socket.close(1000, 'stale_connect');
} catch {
@@ -750,6 +788,8 @@ export function createCallWorkspaceSocketHelpers({
}
opened = true;
+ clearNegotiationTimer();
+ finishConnectInFlight();
const isReconnectOpen = refs.reconnectAttempt.value > 0;
refs.reconnectAttempt.value = 0;
refs.connectionState.value = 'online';
@@ -784,7 +824,8 @@ export function createCallWorkspaceSocketHelpers({
socket.addEventListener('error', () => {
if (generation !== state.connectGeneration || state.manualSocketClose) return;
if (!opened) {
- failOverAfterAssetVersionProbe();
+ // The browser will emit close after a failed handshake. Origin failover
+ // is intentionally deferred until then to keep connection attempts single-flight.
return;
}
refs.connectionState.value = 'retrying';
@@ -794,6 +835,7 @@ export function createCallWorkspaceSocketHelpers({
socket.addEventListener('close', (event) => {
if (generation !== state.connectGeneration) return;
+ clearNegotiationTimer();
clearPingTimer();
refs.clearMediaSecurityHandshakeWatchdog();
if (refs.socketRef.value === socket) {
@@ -802,9 +844,15 @@ export function createCallWorkspaceSocketHelpers({
refs.hasRealtimeRoomSync.value = false;
if (state.manualSocketClose) {
+ finishConnectInFlight();
return;
}
if (handleAssetVersionSocketClose(event)) {
+ finishConnectInFlight();
+ return;
+ }
+ if (failoverAfterClose) {
+ connectNextOrigin();
return;
}
@@ -813,12 +861,14 @@ export function createCallWorkspaceSocketHelpers({
refs.connectionState.value = 'expired';
refs.connectionReason.value = closeReason;
state.manualSocketClose = true;
+ finishConnectInFlight();
return;
}
if (closeReason === 'auth_backend_error' || (event?.code === 1008 && closeReason !== '')) {
refs.connectionState.value = 'blocked';
refs.connectionReason.value = closeReason || 'blocked';
state.manualSocketClose = true;
+ finishConnectInFlight();
return;
}
if (!opened) {
@@ -830,6 +880,25 @@ export function createCallWorkspaceSocketHelpers({
refs.connectionReason.value = closeReason || 'socket_closed';
scheduleReconnect();
});
+
+ negotiationTimer = setTimeout(() => {
+ if (generation !== state.connectGeneration || state.manualSocketClose) return;
+ if (opened || failedOver) return;
+ refs.connectionState.value = 'retrying';
+ refs.connectionReason.value = 'socket_negotiation_timeout';
+ captureClientDiagnostic({
+ category: 'realtime',
+ level: 'warning',
+ eventType: 'websocket_negotiation_timeout',
+ code: 'websocket_negotiation_timeout',
+ message: 'Realtime websocket negotiation timed out before the browser opened the socket.',
+ payload: {
+ origin: socketOrigin,
+ negotiation_timeout_ms: WEBSOCKET_NEGOTIATION_TIMEOUT_MS,
+ },
+ });
+ failOverToNextOrigin('negotiation_timeout');
+ }, WEBSOCKET_NEGOTIATION_TIMEOUT_MS);
};
connectWithOriginAt(0);
diff --git a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js
index 4141e68d2..4dea1acaf 100644
--- a/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js
+++ b/demo/video-chat/frontend-vue/src/domain/realtime/workspace/config.js
@@ -32,9 +32,9 @@ export const SFU_WLVC_FRAME_HEIGHT = 720;
export const SFU_WLVC_FRAME_QUALITY = 43;
export const SFU_WLVC_KEYFRAME_INTERVAL = 8;
export const SFU_WLVC_ENCODE_INTERVAL_MS = 92;
-export const SFU_WLVC_SEND_BUFFER_HIGH_WATER_BYTES = 4 * 1024 * 1024;
-export const SFU_WLVC_SEND_BUFFER_LOW_WATER_BYTES = 1024 * 1024;
-export const SFU_WLVC_SEND_BUFFER_CRITICAL_BYTES = 10 * 1024 * 1024;
+export const SFU_WLVC_SEND_BUFFER_HIGH_WATER_BYTES = 2 * 1024 * 1024;
+export const SFU_WLVC_SEND_BUFFER_LOW_WATER_BYTES = 512 * 1024;
+export const SFU_WLVC_SEND_BUFFER_CRITICAL_BYTES = 6 * 1024 * 1024;
export const SFU_WLVC_BACKPRESSURE_MIN_PAUSE_MS = 350;
export const SFU_WLVC_BACKPRESSURE_MAX_PAUSE_MS = 2500;
export const SFU_WLVC_BACKPRESSURE_HARD_RESET_AFTER_MS = 30_000;
@@ -43,12 +43,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({
rescue: Object.freeze({
maxEncodedBytesPerFrame: 2048 * 1024,
maxKeyframeBytesPerFrame: 2560 * 1024,
- maxWireBytesPerSecond: 3200 * 1024,
+ maxWireBytesPerSecond: 2200 * 1024,
maxEncodeMs: 78,
maxDrawImageMs: 24,
maxReadbackMs: 40,
maxQueueAgeMs: 260,
- maxBufferedBytes: 3 * 1024 * 1024,
+ maxBufferedBytes: 2 * 1024 * 1024,
payloadSoftLimitRatio: 0.94,
minKeyframeRetryMs: 1400,
expectedRecovery: 'hold_rescue_until_socket_low_water',
@@ -56,12 +56,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({
realtime: Object.freeze({
maxEncodedBytesPerFrame: 3072 * 1024,
maxKeyframeBytesPerFrame: 3840 * 1024,
- maxWireBytesPerSecond: 4600 * 1024,
+ maxWireBytesPerSecond: 3000 * 1024,
maxEncodeMs: 92,
maxDrawImageMs: 28,
maxReadbackMs: 40,
maxQueueAgeMs: 300,
- maxBufferedBytes: 4 * 1024 * 1024,
+ maxBufferedBytes: 2560 * 1024,
payloadSoftLimitRatio: 0.93,
minKeyframeRetryMs: 1100,
expectedRecovery: 'downshift_to_rescue_before_critical_buffer',
@@ -69,12 +69,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({
balanced: Object.freeze({
maxEncodedBytesPerFrame: 4608 * 1024,
maxKeyframeBytesPerFrame: 5632 * 1024,
- maxWireBytesPerSecond: 6500 * 1024,
+ maxWireBytesPerSecond: 4200 * 1024,
maxEncodeMs: 120,
maxDrawImageMs: 36,
maxReadbackMs: 52,
maxQueueAgeMs: 360,
- maxBufferedBytes: 6500 * 1024,
+ maxBufferedBytes: 3 * 1024 * 1024,
payloadSoftLimitRatio: 0.91,
minKeyframeRetryMs: 900,
expectedRecovery: 'downshift_to_realtime_before_critical_buffer',
@@ -82,12 +82,12 @@ export const SFU_VIDEO_QUALITY_PROFILE_BUDGETS = Object.freeze({
quality: Object.freeze({
maxEncodedBytesPerFrame: 5632 * 1024,
maxKeyframeBytesPerFrame: 6656 * 1024,
- maxWireBytesPerSecond: 8400 * 1024,
+ maxWireBytesPerSecond: 5200 * 1024,
maxEncodeMs: 150,
maxDrawImageMs: 42,
maxReadbackMs: 64,
maxQueueAgeMs: 420,
- maxBufferedBytes: 8 * 1024 * 1024,
+ maxBufferedBytes: 4 * 1024 * 1024,
payloadSoftLimitRatio: 0.9,
minKeyframeRetryMs: 800,
expectedRecovery: 'downshift_to_balanced_before_critical_buffer',
diff --git a/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts b/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts
index a716f00d8..c1fb3f2f3 100644
--- a/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts
+++ b/demo/video-chat/frontend-vue/src/lib/sfu/outboundFrameBudget.ts
@@ -1,4 +1,6 @@
const SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES = 192 * 1024
+const SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO = 0.25
+const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES = 512 * 1024
export const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS = 160
export const SFU_FRAME_WIRE_BUDGET_WINDOW_MS = 1000
@@ -30,9 +32,10 @@ function normalizedBudgetNumber(value: unknown): number {
export function resolveSfuSendDrainTargetBytes(metrics: Record): number {
const bufferedBudgetBytes = normalizedBudgetNumber(metrics.budget_max_buffered_bytes)
if (bufferedBudgetBytes <= 0) return SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES
+ const profileDrainTargetBytes = Math.floor(bufferedBudgetBytes * SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO)
return Math.max(
SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES,
- Math.floor(bufferedBudgetBytes * 0.5),
+ Math.min(SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES, profileDrainTargetBytes),
)
}
diff --git a/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts b/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts
index 3f8deb603..ca49e2089 100644
--- a/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts
+++ b/demo/video-chat/frontend-vue/src/lib/sfu/sfuClient.ts
@@ -86,6 +86,7 @@ const SFU_FRAME_TRANSPORT_SAMPLE_COOLDOWN_MS = 2000
const SFU_PUBLISHER_FRAME_STALL_CHECK_INTERVAL_MS = 1000
const SFU_PUBLISHER_FRAME_STALL_RESUBSCRIBE_AFTER_MS = 6000
const SFU_PUBLISHER_FRAME_STALL_RECOVERY_COOLDOWN_MS = 5000
+const SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000
interface SendBufferDrainResult {
ok: boolean
@@ -120,6 +121,7 @@ export class SFUClient {
private lastFrameTransportSample: SfuFrameTransportSample | null = null
private publisherFrameHealthById = new Map()
private publisherFrameStallTimer: ReturnType | null = null
+ private connectAttemptInFlight = false
private mediaTransport: SfuWebSocketFallbackMediaTransport
constructor(cb: SFUClientCallbacks) {
@@ -170,6 +172,7 @@ export class SFUClient {
): void {
if (generation !== this.connectGeneration) return
if (index >= candidates.length) {
+ this.connectAttemptInFlight = false
reportClientDiagnostic({
category: 'media',
level: 'error',
@@ -198,38 +201,73 @@ export class SFUClient {
this.ws = ws
let opened = false
let failedOver = false
+ let failoverAfterClose = false
+ let negotiationTimer: ReturnType | null = null
+
+ const clearNegotiationTimer = () => {
+ if (negotiationTimer === null) return
+ clearTimeout(negotiationTimer)
+ negotiationTimer = null
+ }
+
+ const connectNextCandidate = () => {
+ this.connectWithCandidates(candidates, index + 1, query, roomId, generation)
+ }
const failToNextCandidate = () => {
if (generation !== this.connectGeneration) return
if (opened) return
if (failedOver) return
failedOver = true
+ clearNegotiationTimer()
if (this.ws === ws) {
this.ws = null
}
- this.connectWithCandidates(candidates, index + 1, query, roomId, generation)
+ connectNextCandidate()
+ }
+
+ const failToNextCandidateAfterSocketClose = (closeReason = 'failover') => {
+ if (generation !== this.connectGeneration) return
+ if (opened) return
+ if (failedOver) return
+ failedOver = true
+ clearNegotiationTimer()
+ if (this.ws === ws) {
+ this.ws = null
+ }
+ try {
+ ws.close(1000, closeReason)
+ } catch {}
+ if (ws.readyState === WebSocket.CONNECTING || ws.readyState === WebSocket.CLOSING) {
+ failoverAfterClose = true
+ return
+ }
+ connectNextCandidate()
}
const failToNextCandidateAfterAssetVersionProbe = (): void => {
- const assetVersionProbe = handleAssetVersionConnectionFailure()
- if (assetVersionProbe && typeof assetVersionProbe.then === 'function') {
- assetVersionProbe.then((handled) => {
- if (handled) return
+ Promise.resolve(handleAssetVersionConnectionFailure())
+ .then((handled) => {
+ if (handled) {
+ this.connectAttemptInFlight = false
+ return
+ }
failToNextCandidate()
- }).catch(() => {
+ })
+ .catch(() => {
failToNextCandidate()
})
- return
- }
- failToNextCandidate()
}
ws.onopen = () => {
if (generation !== this.connectGeneration) {
+ clearNegotiationTimer()
try { ws.close() } catch {}
return
}
opened = true
+ clearNegotiationTimer()
+ this.connectAttemptInFlight = false
this.disconnectNotified = false
setBackendSfuOrigin(candidates[index] || '')
this.send({ type: 'sfu/join', room_id: roomId, role: 'publisher' })
@@ -259,7 +297,15 @@ export class SFUClient {
ws.onclose = (event) => {
if (generation !== this.connectGeneration) return
- if (handleAssetVersionSocketClose(event)) return
+ clearNegotiationTimer()
+ if (handleAssetVersionSocketClose(event)) {
+ this.connectAttemptInFlight = false
+ return
+ }
+ if (failoverAfterClose) {
+ connectNextCandidate()
+ return
+ }
if (!opened) {
failToNextCandidateAfterAssetVersionProbe()
return
@@ -288,18 +334,40 @@ export class SFUClient {
ws.onerror = () => {
if (generation !== this.connectGeneration) return
if (!opened) {
- failToNextCandidateAfterAssetVersionProbe()
+ // Browsers follow pre-open errors with close; wait for that terminal
+ // event before trying the next origin so CONNECTING sockets do not pile up.
return
}
try {
ws.close()
} catch {}
}
+
+ negotiationTimer = setTimeout(() => {
+ if (generation !== this.connectGeneration) return
+ if (opened || failedOver) return
+ reportClientDiagnostic({
+ category: 'media',
+ level: 'warning',
+ eventType: 'sfu_socket_negotiation_timeout',
+ code: 'sfu_socket_negotiation_timeout',
+ message: 'SFU websocket negotiation timed out before the browser opened the socket.',
+ roomId,
+ payload: {
+ room_id: roomId,
+ candidate_origin: String(candidates[index] || ''),
+ negotiation_timeout_ms: SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS,
+ },
+ })
+ failToNextCandidateAfterSocketClose('negotiation_timeout')
+ }, SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS)
}
connect(session: { userId: string; token: string; name: string }, roomId: string, callId = ''): void {
+ if (this.connectAttemptInFlight) return
this.connectGeneration += 1
this.outboundMediaGeneration += 1
+ this.connectAttemptInFlight = true
this.disconnectNotified = false
this.inboundFrameAssembler.clear()
this.outboundFrameSequenceByTrack.clear()
@@ -311,7 +379,7 @@ export class SFUClient {
const generation = this.connectGeneration
if (this.ws) {
- this.retireSocket(this.ws)
+ this.retireSocket(this.ws, true)
this.ws = null
}
@@ -419,6 +487,7 @@ export class SFUClient {
leave(): void {
this.connectGeneration += 1
this.outboundMediaGeneration += 1
+ this.connectAttemptInFlight = false
this.disconnectNotified = false
this.inboundFrameAssembler.clear()
this.outboundFrameSequenceByTrack.clear()
diff --git a/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp b/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp
index a69e55819..9810b0be6 100644
--- a/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp
+++ b/demo/video-chat/frontend-vue/src/lib/wasm/cpp/codec.cpp
@@ -2,9 +2,48 @@
#include
#include
#include
+#include
namespace wlvc {
+namespace {
+
+size_t plane_count(int width, int height) {
+ return static_cast(width) * static_cast(height);
+}
+
+size_t rgba_offset(size_t pixel_index) {
+ return pixel_index * static_cast(4);
+}
+
+size_t rgba_offset(int row, int col, int width) {
+ return rgba_offset(static_cast(row) * static_cast(width)
+ + static_cast(col));
+}
+
+bool fits_int(size_t value) {
+ return value <= static_cast(std::numeric_limits::max());
+}
+
+int count_to_int(size_t value) {
+ return fits_int(value) ? static_cast(value) : -1;
+}
+
+size_t int16_plane_bytes(size_t value_count) {
+ return value_count * sizeof(int16_t);
+}
+
+size_t float_plane_bytes(size_t value_count) {
+ return value_count * sizeof(float);
+}
+
+size_t rle_max_bytes_for_count(size_t value_count) {
+ return static_cast(RLE_HEADER_BYTES)
+ + value_count * static_cast(RLE_PAIR_BYTES);
+}
+
+} // namespace
+
// ---------------------------------------------------------------------------
// Encoder
// ---------------------------------------------------------------------------
@@ -13,21 +52,23 @@ Encoder::Encoder(const EncoderConfig& cfg) : cfg_(cfg) {
const int w = cfg_.width, h = cfg_.height;
const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w;
const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h;
-
- Y_.resize(w * h);
- U_.resize(uvW * uvH);
- V_.resize(uvW * uvH);
- Ydelta_.resize(w * h);
- Udelta_.resize(uvW * uvH);
- Vdelta_.resize(uvW * uvH);
- prevY_.resize(w * h);
- prevU_.resize(uvW * uvH);
- prevV_.resize(uvW * uvH);
- Yq_.resize(w * h);
- Uq_.resize(uvW * uvH);
- Vq_.resize(uvW * uvH);
+ const size_t y_count = plane_count(w, h);
+ const size_t uv_count = plane_count(uvW, uvH);
+
+ Y_.resize(y_count);
+ U_.resize(uv_count);
+ V_.resize(uv_count);
+ Ydelta_.resize(y_count);
+ Udelta_.resize(uv_count);
+ Vdelta_.resize(uv_count);
+ prevY_.resize(y_count);
+ prevU_.resize(uv_count);
+ prevV_.resize(uv_count);
+ Yq_.resize(y_count);
+ Uq_.resize(uv_count);
+ Vq_.resize(uv_count);
tmp_.resize(std::max(w, h));
- rle_buf_.resize(rle_max_bytes(std::max(w * h, uvW * uvH)));
+ rle_buf_.resize(rle_max_bytes_for_count(std::max(y_count, uv_count)));
}
void Encoder::rgba_to_yuv(const uint8_t* rgba) {
@@ -38,24 +79,29 @@ void Encoder::rgba_to_yuv(const uint8_t* rgba) {
if (cfg_.color_space == kYUV) {
for (int row = 0; row < h; ++row) {
for (int col = 0; col < w; ++col) {
- const int i = (row * w + col) * 4;
+ const size_t i = rgba_offset(row, col, w);
const float r = rgba[i], g = rgba[i + 1], b = rgba[i + 2];
- Y_[row * w + col] = 0.299f * r + 0.587f * g + 0.114f * b - 128.0f;
+ Y_[static_cast(row) * static_cast(w) + static_cast(col)] =
+ 0.299f * r + 0.587f * g + 0.114f * b - 128.0f;
}
}
for (int row = 0; row < uvH; ++row) {
for (int col = 0; col < uvW; ++col) {
- const int i = (row * 2 * w + col * 2) * 4;
+ const size_t i = rgba_offset(static_cast(row) * static_cast(2) * static_cast(w)
+ + static_cast(col) * static_cast(2));
const float r = rgba[i], g = rgba[i + 1], b = rgba[i + 2];
- U_[row * uvW + col] = -0.147f * r - 0.289f * g + 0.436f * b;
- V_[row * uvW + col] = 0.615f * r - 0.515f * g - 0.100f * b;
+ const size_t uv_idx = static_cast(row) * static_cast(uvW)
+ + static_cast(col);
+ U_[uv_idx] = -0.147f * r - 0.289f * g + 0.436f * b;
+ V_[uv_idx] = 0.615f * r - 0.515f * g - 0.100f * b;
}
}
} else {
for (int row = 0; row < h; ++row) {
for (int col = 0; col < w; ++col) {
- const int i = (row * w + col) * 4;
- const int idx = row * w + col;
+ const size_t i = rgba_offset(row, col, w);
+ const size_t idx = static_cast(row) * static_cast(w)
+ + static_cast(col);
Y_[idx] = rgba[i];
U_[idx] = rgba[i + 1];
V_[idx] = rgba[i + 2];
@@ -66,9 +112,17 @@ void Encoder::rgba_to_yuv(const uint8_t* rgba) {
int Encoder::encode(const uint8_t* rgba, double timestamp_us,
uint8_t* out_buf, int out_capacity) {
+ if (out_capacity < 0) return -1;
+
const int w = cfg_.width, h = cfg_.height;
const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w;
const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h;
+ const size_t y_count = plane_count(w, h);
+ const size_t uv_count = plane_count(uvW, uvH);
+ const int y_count_i = count_to_int(y_count);
+ const int uv_count_i = count_to_int(uv_count);
+ if (y_count_i < 0 || uv_count_i < 0) return -1;
+
const bool is_key = (frame_count_ % cfg_.key_frame_interval) == 0;
++frame_count_;
@@ -81,9 +135,9 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us,
float* v_enc = V_.data();
const bool use_temporal_residual = cfg_.motion_estimation && !is_key && frame_count_ > 1;
if (use_temporal_residual) {
- for (int i = 0; i < w * h; ++i)
+ for (size_t i = 0; i < y_count; ++i)
Ydelta_[i] = Y_[i] - prevY_[i];
- for (int i = 0; i < uvW * uvH; ++i) {
+ for (size_t i = 0; i < uv_count; ++i) {
Udelta_[i] = U_[i] - prevU_[i];
Vdelta_[i] = V_[i] - prevV_[i];
}
@@ -91,9 +145,9 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us,
u_enc = Udelta_.data();
v_enc = Vdelta_.data();
}
- std::memcpy(prevY_.data(), Y_.data(), w * h * sizeof(float));
- std::memcpy(prevU_.data(), U_.data(), uvW * uvH * sizeof(float));
- std::memcpy(prevV_.data(), V_.data(), uvW * uvH * sizeof(float));
+ std::memcpy(prevY_.data(), Y_.data(), float_plane_bytes(y_count));
+ std::memcpy(prevU_.data(), U_.data(), float_plane_bytes(uv_count));
+ std::memcpy(prevV_.data(), V_.data(), float_plane_bytes(uv_count));
// Forward DWT
dwt_forward(y_enc, w, h, cfg_.levels, tmp_.data());
@@ -109,19 +163,39 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us,
size_t y_rle_sz = 0;
size_t u_rle_sz = 0;
size_t v_rle_sz = 0;
+ size_t y_rle_off = 0;
+ size_t u_rle_off = 0;
+ size_t v_rle_off = 0;
if (cfg_.entropy_coding == kNone) {
- y_rle_sz = static_cast(w * h * sizeof(int16_t));
- u_rle_sz = static_cast(uvW * uvH * sizeof(int16_t));
- v_rle_sz = static_cast(uvW * uvH * sizeof(int16_t));
+ y_rle_sz = int16_plane_bytes(y_count);
+ u_rle_sz = int16_plane_bytes(uv_count);
+ v_rle_sz = int16_plane_bytes(uv_count);
} else {
- y_rle_sz = rle_encode(Yq_.data(), w * h, rle_buf_.data());
- u_rle_sz = rle_encode(Uq_.data(), uvW * uvH, rle_buf_.data());
- v_rle_sz = rle_encode(Vq_.data(), uvW * uvH, rle_buf_.data());
+ const size_t y_cap = int16_plane_bytes(y_count_i);
+ const size_t u_cap = int16_plane_bytes(uv_count_i);
+ const size_t v_cap = int16_plane_bytes(uv_count_i);
+ if (y_cap > std::numeric_limits::max() - u_cap) return -1;
+ const size_t yu_cap = y_cap + u_cap;
+ if (yu_cap > std::numeric_limits::max() - v_cap) return -1;
+ const size_t total_rle_cap = yu_cap + v_cap;
+ rle_buf_.resize(total_rle_cap);
+
+ y_rle_off = 0;
+ u_rle_off = y_cap;
+ v_rle_off = y_cap + u_cap;
+
+ y_rle_sz = rle_encode(Yq_.data(), y_count_i, rle_buf_.data() + y_rle_off);
+ u_rle_sz = rle_encode(Uq_.data(), uv_count_i, rle_buf_.data() + u_rle_off);
+ v_rle_sz = rle_encode(Vq_.data(), uv_count_i, rle_buf_.data() + v_rle_off);
}
// Pack header + payload
- const int total = kHeaderBytes + static_cast(y_rle_sz + u_rle_sz + v_rle_sz);
- if (total > out_capacity) return -1;
+ const size_t total = static_cast(kHeaderBytes) + y_rle_sz + u_rle_sz + v_rle_sz;
+ if (y_rle_sz > static_cast(std::numeric_limits::max())
+ || u_rle_sz > static_cast(std::numeric_limits::max())
+ || v_rle_sz > static_cast(std::numeric_limits::max())) return -1;
+ if (total > static_cast(std::numeric_limits::max())
+ || total > static_cast(out_capacity)) return -1;
uint8_t* p = out_buf;
auto w32 = [&](uint32_t v) {
@@ -167,11 +241,11 @@ int Encoder::encode(const uint8_t* rgba, double timestamp_us,
std::memcpy(p, Vq_.data(), v_rle_sz);
p += v_rle_sz;
} else {
- const size_t y_bytes = rle_encode(Yq_.data(), w * h, p);
+ const size_t y_bytes = rle_encode(Yq_.data(), y_count_i, p);
p += y_bytes;
- const size_t u_bytes = rle_encode(Uq_.data(), uvW * uvH, p);
+ const size_t u_bytes = rle_encode(Uq_.data(), uv_count_i, p);
p += u_bytes;
- const size_t v_bytes = rle_encode(Vq_.data(), uvW * uvH, p);
+ const size_t v_bytes = rle_encode(Vq_.data(), uv_count_i, p);
p += v_bytes;
}
@@ -182,12 +256,21 @@ int Encoder::max_encoded_bytes() const {
const int w = cfg_.width, h = cfg_.height;
const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w;
const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h;
+ const size_t y_count = plane_count(w, h);
+ const size_t uv_count = plane_count(uvW, uvH);
+ size_t total = 0;
if (cfg_.entropy_coding == kNone) {
- return kHeaderBytes + (w * h + uvW * uvH + uvW * uvH) * static_cast(sizeof(int16_t));
+ total = static_cast(kHeaderBytes)
+ + int16_plane_bytes(y_count)
+ + int16_plane_bytes(uv_count)
+ + int16_plane_bytes(uv_count);
+ } else {
+ total = static_cast(kHeaderBytes)
+ + rle_max_bytes_for_count(y_count)
+ + rle_max_bytes_for_count(uv_count)
+ + rle_max_bytes_for_count(uv_count);
}
- return kHeaderBytes
- + static_cast(rle_max_bytes(w * h))
- + static_cast(rle_max_bytes(uvW * uvH)) * 2;
+ return count_to_int(total);
}
void Encoder::reset() {
@@ -205,16 +288,18 @@ Decoder::Decoder(const DecoderConfig& cfg) : cfg_(cfg) {
const int w = cfg_.width, h = cfg_.height;
const int uvW = cfg_.color_space == kYUV ? (w >> 1) : w;
const int uvH = cfg_.color_space == kYUV ? (h >> 1) : h;
-
- Y_.resize(w * h);
- U_.resize(uvW * uvH);
- V_.resize(uvW * uvH);
- prevY_.resize(w * h);
- prevU_.resize(uvW * uvH);
- prevV_.resize(uvW * uvH);
- Yq_.resize(w * h);
- Uq_.resize(uvW * uvH);
- Vq_.resize(uvW * uvH);
+ const size_t y_count = plane_count(w, h);
+ const size_t uv_count = plane_count(uvW, uvH);
+
+ Y_.resize(y_count);
+ U_.resize(uv_count);
+ V_.resize(uv_count);
+ prevY_.resize(y_count);
+ prevU_.resize(uv_count);
+ prevV_.resize(uv_count);
+ Yq_.resize(y_count);
+ Uq_.resize(uv_count);
+ Vq_.resize(uv_count);
tmp_.resize(std::max(w, h));
}
@@ -261,16 +346,26 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) {
r8(); // blur radius
}
- if (kHeaderBytes + y_bytes + u_bytes + v_bytes > static_cast(enc_size))
+ const size_t payload_end = static_cast(kHeaderBytes)
+ + static_cast(y_bytes)
+ + static_cast(u_bytes)
+ + static_cast(v_bytes);
+ if (payload_end > static_cast(enc_size))
return -1;
+ const size_t y_count = plane_count(w, h);
+ const size_t uv_count = plane_count(uvW, uvH);
+ const int y_count_i = count_to_int(y_count);
+ const int uv_count_i = count_to_int(uv_count);
+ if (y_count_i < 0 || uv_count_i < 0) return -1;
+
int yn = 0;
int un = 0;
int vn = 0;
if (entropyMode == kNone) {
- const size_t y_expected = static_cast(w * h * sizeof(int16_t));
- const size_t u_expected = static_cast(uvW * uvH * sizeof(int16_t));
- const size_t v_expected = static_cast(uvW * uvH * sizeof(int16_t));
+ const size_t y_expected = int16_plane_bytes(y_count);
+ const size_t u_expected = int16_plane_bytes(uv_count);
+ const size_t v_expected = int16_plane_bytes(uv_count);
if (y_bytes != y_expected || u_bytes != u_expected || v_bytes != v_expected) return -1;
std::memcpy(Yq_.data(), p, y_bytes);
p += y_bytes;
@@ -278,19 +373,19 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) {
p += u_bytes;
std::memcpy(Vq_.data(), p, v_bytes);
p += v_bytes;
- yn = w * h;
- un = uvW * uvH;
- vn = uvW * uvH;
+ yn = y_count_i;
+ un = uv_count_i;
+ vn = uv_count_i;
} else {
- yn = rle_decode(p, y_bytes, Yq_.data(), w * h);
+ yn = rle_decode(p, y_bytes, Yq_.data(), y_count_i);
p += y_bytes;
- un = rle_decode(p, u_bytes, Uq_.data(), uvW * uvH);
+ un = rle_decode(p, u_bytes, Uq_.data(), uv_count_i);
p += u_bytes;
- vn = rle_decode(p, v_bytes, Vq_.data(), uvW * uvH);
+ vn = rle_decode(p, v_bytes, Vq_.data(), uv_count_i);
p += v_bytes;
}
- if (yn != w * h || un != uvW * uvH || vn != uvW * uvH) return -1;
+ if (yn != y_count_i || un != uv_count_i || vn != uv_count_i) return -1;
// Dequantise
QuantConfig qcfg = { quality, levels };
@@ -305,22 +400,22 @@ int Decoder::decode(const uint8_t* enc, int enc_size, uint8_t* rgba_out) {
// Temporal residual reconstruction
if (!is_key) {
- for (int i = 0; i < w * h; ++i)
+ for (size_t i = 0; i < y_count; ++i)
Y_[i] += prevY_[i];
}
if (!is_key && (flags & kFrameFlagChromaTemporalResidual)) {
- for (int i = 0; i < uvW * uvH; ++i) {
+ for (size_t i = 0; i < uv_count; ++i) {
U_[i] += prevU_[i];
V_[i] += prevV_[i];
}
}
- std::memcpy(prevY_.data(), Y_.data(), w * h * sizeof(float));
- std::memcpy(prevU_.data(), U_.data(), uvW * uvH * sizeof(float));
- std::memcpy(prevV_.data(), V_.data(), uvW * uvH * sizeof(float));
+ std::memcpy(prevY_.data(), Y_.data(), float_plane_bytes(y_count));
+ std::memcpy(prevU_.data(), U_.data(), float_plane_bytes(uv_count));
+ std::memcpy(prevV_.data(), V_.data(), float_plane_bytes(uv_count));
if (colorSpace == kRGB) {
- for (int i = 0; i < w * h; ++i) {
- const int pi = i * 4;
+ for (size_t i = 0; i < y_count; ++i) {
+ const size_t pi = rgba_offset(i);
rgba_out[pi] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, Y_[i])));
rgba_out[pi + 1] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, U_[i])));
rgba_out[pi + 2] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, V_[i])));
@@ -337,8 +432,10 @@ void Decoder::yuv_to_rgba(uint8_t* rgba, int w, int h,
int uvW, int uvH) {
for (int row = 0; row < h; ++row) {
for (int col = 0; col < w; ++col) {
- const int yi = row * w + col;
- const int uvi = (row >> 1) * uvW + (col >> 1);
+ const size_t yi = static_cast(row) * static_cast(w)
+ + static_cast(col);
+ const size_t uvi = static_cast(row >> 1) * static_cast(uvW)
+ + static_cast(col >> 1);
const float y = Y[yi] + 128.0f;
const float u = U[uvi];
@@ -348,7 +445,7 @@ void Decoder::yuv_to_rgba(uint8_t* rgba, int w, int h,
const float g = y - 0.39465f * u - 0.58060f * v;
const float b = y + 2.03211f * u;
- const int pi = yi * 4;
+ const size_t pi = rgba_offset(yi);
rgba[pi] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, r)));
rgba[pi + 1] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, g)));
rgba[pi + 2] = static_cast(std::fmaxf(0.0f, std::fminf(255.0f, b)));
diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs
index bc9d16954..1029b7336 100644
--- a/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs
+++ b/demo/video-chat/frontend-vue/tests/contract/sfu-browser-ws-send-drain-contract.mjs
@@ -29,6 +29,8 @@ try {
const publisherBackpressureController = read('src/domain/realtime/workspace/callWorkspace/publisherBackpressureController.js');
requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_LOW_WATER_BYTES = 192 * 1024', 'client has an explicit low-water send resume target');
+ requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_PROFILE_DRAIN_RATIO = 0.25', 'client drains to a low fraction of the active profile budget');
+ requireContains(outboundFrameBudget, 'const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_DRAIN_TARGET_BYTES = 512 * 1024', 'client caps send-buffer drain target below one megabyte');
requireContains(outboundFrameBudget, 'export const SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS = 160', 'client does not hide media behind a 500ms browser drain wait');
requireContains(outboundFrameBudget, 'export function resolveSfuSendDrainTargetBytes(metrics', 'client derives drain target from the active profile budget');
requireContains(sfuClient, 'this.waitForSendBufferDrain(drainTargetBufferedBytes, SFU_FRAME_CHUNK_BACKPRESSURE_MAX_WAIT_MS)', 'client waits only to the budgeted low-water target');
diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs
index 082c9f0ab..8b65bbb0d 100644
--- a/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs
+++ b/demo/video-chat/frontend-vue/tests/contract/sfu-motion-backpressure-contract.mjs
@@ -106,9 +106,9 @@ async function main() {
requireContains(runtimeConfig, 'SFU_WLVC_MAX_KEYFRAME_FRAME_BYTES', 'motion payload keyframe cap');
requireContains(runtimeConfig, 'SFU_WLVC_MOTION_DELTA_CADENCE_WINDOW_MS', 'motion payload cadence throttle window');
requireContains(runtimeConfig, 'SFU_WLVC_MOTION_DELTA_PROFILE_DOWNSHIFT_THRESHOLD', 'motion payload repeated-pressure downshift threshold');
- requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 1000;', 'one second send-pressure downgrade window');
- requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 2;', 'two skipped frames trigger quality downgrade');
- requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 2;', 'two send failures trigger quality downgrade');
+ requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_BACKPRESSURE_WINDOW_MS = 750;', 'sub-second send-pressure downgrade window');
+ requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SKIP_THRESHOLD = 1;', 'first skipped frame triggers quality downgrade');
+ requireContains(runtimeConfig, 'export const SFU_AUTO_QUALITY_DOWNGRADE_SEND_FAILURE_THRESHOLD = 1;', 'first send failure triggers quality downgrade');
requireContains(publisherPipeline, 'handleWlvcFramePayloadPressure(encodedPayloadBytes', 'publisher drops oversized frames before send');
requireContains(publisherPipeline, 'encodedPayloadBytes > maxEncodedPayloadBytes', 'publisher compares encoded WLVC payload with cap');
requireContains(sfuPublisherControl, 'sfu_high_motion_payload_pressure', 'publisher controller high-motion pressure reason');
diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs
index 8393e961b..780b58496 100644
--- a/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs
+++ b/demo/video-chat/frontend-vue/tests/contract/sfu-online-acceptance-no-critical-pressure-contract.mjs
@@ -49,8 +49,8 @@ try {
requireContains(pressureGate, "request:client-diagnostics", 'online gate records backend client diagnostics POST bodies');
requireContains(pressureGate, '/api\\/user\\/client-diagnostics', 'online gate watches backend diagnostics submissions');
requireContains(pressureGate, 'remote video frozen', 'online gate blocks remote freeze diagnostics');
- requireContains(pressureGate, 'CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024', 'online gate has a critical bufferedAmount ceiling');
- requireContains(pressureGate, 'MAX_ACCEPTED_BUFFERED_BYTES = 8 * 1024 * 1024', 'online gate enforces the quality profile buffer budget');
+ requireContains(pressureGate, 'CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024', 'online gate has a critical bufferedAmount ceiling');
+ requireContains(pressureGate, 'MAX_ACCEPTED_BUFFERED_BYTES = 4 * 1024 * 1024', 'online gate enforces the quality profile buffer budget');
requireContains(pressureGate, 'socketFailureCount', 'online gate fails on SFU socket close/error during media flow');
requireContains(pressureGate, 'luma > 8', 'online gate fails black remote video');
requireContains(harness, 'highMotionVideo = false', 'media shim keeps high-motion opt-in');
diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs
index 4c1aea7da..a540f94e3 100644
--- a/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs
+++ b/demo/video-chat/frontend-vue/tests/contract/sfu-origin-room-binding-contract.mjs
@@ -29,6 +29,7 @@ const framePayload = read('../../src/lib/sfu/framePayload.ts');
const outboundFrameQueue = read('../../src/lib/sfu/outboundFrameQueue.ts');
const inboundFrameAssembler = read('../../src/lib/sfu/inboundFrameAssembler.ts');
const backendOrigin = read('../../src/support/backendOrigin.js');
+const socketLifecycle = read('../../src/domain/realtime/workspace/callWorkspace/socketLifecycle.js');
const stackEnv = read('../../../.env');
const deployScript = read('../../../scripts/deploy.sh');
@@ -39,6 +40,18 @@ try {
requireContains(sfuClient, 'const candidates = resolveBackendSfuOriginCandidates()', 'SFU origin candidates');
requireContains(sfuClient, "setBackendSfuOrigin(candidates[index] || '')", 'SFU working origin persistence');
requireContains(sfuClient, 'this.connectWithCandidates(candidates, index + 1, query, roomId, generation)', 'SFU failover to next origin');
+ requireContains(sfuClient, 'private connectAttemptInFlight = false', 'SFU client tracks one pending websocket handshake');
+ requireContains(sfuClient, 'if (this.connectAttemptInFlight) return', 'SFU client refuses duplicate pending websocket connect attempts');
+ requireContains(sfuClient, 'this.connectAttemptInFlight = false\n this.disconnectNotified = false', 'SFU client clears pending handshake only after socket open');
+ requireContains(sfuClient, 'Browsers follow pre-open errors with close; wait for that terminal', 'SFU client waits for close before origin failover');
+ requireContains(sfuClient, 'const SFU_WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000', 'SFU client bounds pending websocket negotiation to 5 minutes');
+ requireContains(sfuClient, "failToNextCandidateAfterSocketClose('negotiation_timeout')", 'SFU client closes timed-out websocket negotiation before failover');
+ requireContains(socketLifecycle, 'if (state.connectInFlight && !state.manualSocketClose) return;', 'workspace websocket refuses duplicate pending connect attempts');
+ requireContains(socketLifecycle, 'state.connectInFlight = true;', 'workspace websocket opens a single-flight gate during handshake');
+ requireContains(socketLifecycle, 'The browser will emit close after a failed handshake', 'workspace websocket waits for close before origin failover');
+ requireContains(socketLifecycle, 'const WEBSOCKET_NEGOTIATION_TIMEOUT_MS = 5 * 60 * 1000;', 'workspace websocket bounds pending negotiation to 5 minutes');
+ requireContains(socketLifecycle, "refs.connectionReason.value = 'socket_negotiation_timeout';", 'workspace websocket reports timed-out negotiation');
+ requireContains(socketLifecycle, "failOverToNextOrigin('negotiation_timeout')", 'workspace websocket closes timed-out negotiation before failover');
requireContains(sfuClient, 'room: roomId,', 'legacy room query compatibility');
requireContains(sfuClient, 'room_id: roomId,', 'snake_case room query binding');
diff --git a/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs b/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs
index 2812f34f4..a3e20939d 100644
--- a/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs
+++ b/demo/video-chat/frontend-vue/tests/contract/sfu-production-socket-proxy-budget-contract.mjs
@@ -31,8 +31,8 @@ try {
);
requireContains(probe, 'CONTINUATION_THRESHOLD_BYTES = 65_535', 'probe measures around websocket continuation threshold');
requireContains(probe, 'QUALITY_MAX_PAYLOAD_BYTES = 5632 * 1024', 'probe exercises full quality profile payload budget');
- requireContains(probe, 'QUALITY_MAX_BUFFERED_BYTES = 8 * 1024 * 1024', 'probe enforces quality bufferedAmount budget');
- requireContains(probe, 'CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024', 'probe fails before critical backpressure');
+ requireContains(probe, 'QUALITY_MAX_BUFFERED_BYTES = 4 * 1024 * 1024', 'probe enforces quality bufferedAmount budget');
+ requireContains(probe, 'CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024', 'probe fails before critical backpressure');
requireContains(probe, 'connectSfuSocket', 'probe opens real production SFU sockets');
requireContains(probe, "role: 'publisher'", 'probe opens publisher path');
requireContains(probe, "role: 'subscriber'", 'probe opens subscriber path');
diff --git a/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs b/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs
index 5ab120dc0..5189629a4 100644
--- a/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs
+++ b/demo/video-chat/frontend-vue/tests/e2e/online-sfu-pressure-acceptance.mjs
@@ -17,8 +17,8 @@ const MIN_REMOTE_HEIGHT = 480;
const PRESSURE_DURATION_MS = Math.max(15_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_DURATION_MS || '45000', 10));
const SAMPLE_INTERVAL_MS = Math.max(1_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_SAMPLE_INTERVAL_MS || '2500', 10));
const SLOW_SUBSCRIBER_DURATION_MS = Math.max(5_000, Number.parseInt(process.env.VIDEOCHAT_ONLINE_PRESSURE_SLOW_MS || '12000', 10));
-const CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024;
-const MAX_ACCEPTED_BUFFERED_BYTES = 8 * 1024 * 1024;
+const CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024;
+const MAX_ACCEPTED_BUFFERED_BYTES = 4 * 1024 * 1024;
const MAX_NO_TRANSITION_BUFFERED_BYTES = 1024 * 1024;
const MAX_FINAL_BUFFERED_BYTES = 512 * 1024;
const MAX_TRANSIENT_BLACK_SAMPLES = 1;
diff --git a/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs b/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs
index a52f13f74..7f4a9f27c 100644
--- a/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs
+++ b/demo/video-chat/frontend-vue/tests/e2e/production-socket-proxy-budget.mjs
@@ -9,8 +9,8 @@ const LOCAL_ENV_FILE = path.join(VIDEOCHAT_DIR, '.env.local');
const CONTINUATION_THRESHOLD_BYTES = 65_535;
const QUALITY_MAX_PAYLOAD_BYTES = 5632 * 1024;
-const QUALITY_MAX_BUFFERED_BYTES = 8 * 1024 * 1024;
-const CRITICAL_BUFFERED_BYTES = 10 * 1024 * 1024;
+const QUALITY_MAX_BUFFERED_BYTES = 4 * 1024 * 1024;
+const CRITICAL_BUFFERED_BYTES = 6 * 1024 * 1024;
const DRAIN_LOW_WATER_BYTES = 64 * 1024;
const DEFAULT_TIMEOUT_MS = Math.max(5_000, Number.parseInt(process.env.VIDEOCHAT_PRODUCTION_PROXY_BUDGET_TIMEOUT_MS || '20000', 10));
const FRAME_SIZES = [
@@ -217,7 +217,7 @@ function encodeBinaryEnvelope({ publisherId, publisherUserId, trackId, frameSequ
outgoing_video_quality_profile: 'quality',
budget_max_encoded_bytes_per_frame: QUALITY_MAX_PAYLOAD_BYTES,
budget_max_keyframe_bytes_per_frame: 6656 * 1024,
- budget_max_wire_bytes_per_second: 8400 * 1024,
+ budget_max_wire_bytes_per_second: 5200 * 1024,
budget_max_buffered_bytes: QUALITY_MAX_BUFFERED_BYTES,
binary_continuation_threshold_bytes: CONTINUATION_THRESHOLD_BYTES,
layout_mode: 'full_frame',
diff --git a/demo/video-chat/scripts/check-edge-deployment-decision.sh b/demo/video-chat/scripts/check-edge-deployment-decision.sh
index 91ccd099c..025581761 100755
--- a/demo/video-chat/scripts/check-edge-deployment-decision.sh
+++ b/demo/video-chat/scripts/check-edge-deployment-decision.sh
@@ -86,6 +86,8 @@ require_text "${EDGE_DIR}/edge.php" 'VIDEOCHAT_EDGE_READ_STALL_TIMEOUT_SECONDS'
require_text "${EDGE_DIR}/edge.php" '$written === 0'
require_text "${EDGE_DIR}/edge.php" "\$chunk === ''"
require_text "${EDGE_DIR}/edge.php" 'websocket idle timeout decide'
+require_text "${EDGE_DIR}/edge.php" 'WebSocket tunnels cannot stay half-open'
+require_text "${EDGE_DIR}/edge.php" '$closeWebSocketTunnel()'
require_text "${EDGE_DIR}/edge.php" '$needsBackoff'
require_text "${EDGE_DIR}/edge.php" 'Connection: close'
require_text "${ROOT_DIR}/frontend-vue/src/lib/wasm/wasm-codec.ts" "WASM_MIME_CACHE_BUSTER"
diff --git a/demo/video-chat/scripts/deploy-smoke.sh b/demo/video-chat/scripts/deploy-smoke.sh
index eb412ee21..4bf93e6b5 100755
--- a/demo/video-chat/scripts/deploy-smoke.sh
+++ b/demo/video-chat/scripts/deploy-smoke.sh
@@ -57,7 +57,7 @@ assert_public_health_payload() {
}
$keys = array_keys($payload);
sort($keys);
- if ($keys !== ["service", "status", "time"]) {
+ if ($keys !== ["asset_version", "service", "status", "time"]) {
fwrite(STDERR, "unexpected public health keys: " . implode(",", $keys) . "\n");
exit(1);
}
@@ -65,6 +65,10 @@ assert_public_health_payload() {
fwrite(STDERR, "health status is not ok\n");
exit(1);
}
+ if (!is_string($payload["asset_version"] ?? null) || $payload["asset_version"] === "") {
+ fwrite(STDERR, "health asset_version is missing\n");
+ exit(1);
+ }
'
}
@@ -105,7 +109,7 @@ websocket_upgrade_smoke() {
if [[ "${code}" == "000" ]]; then
local header_code
- header_code="$(awk '/^HTTP\\// {code=$2} END {print code}' "${headers}" | tr -d '\r')"
+ header_code="$(awk '/^HTTP\// {code=$2} END {print code}' "${headers}" | tr -d '\r')"
[[ -n "${header_code}" ]] && code="${header_code}"
fi
diff --git a/demo/video-chat/scripts/deploy.sh b/demo/video-chat/scripts/deploy.sh
index 5cfaed7c8..21ecceb80 100755
--- a/demo/video-chat/scripts/deploy.sh
+++ b/demo/video-chat/scripts/deploy.sh
@@ -56,6 +56,11 @@ Optional environment:
VIDEOCHAT_DEPLOY_SFU_DOMAIN SFU websocket host, default: sfu..
VIDEOCHAT_DEPLOY_TURN_DOMAIN TURN host, default: turn..
VIDEOCHAT_DEPLOY_CDN_DOMAIN Static/CDN asset host, default: cdn..
+ VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS
+ Optional comma-separated hostnames to route to an
+ external HTTP upstream through the King edge.
+ VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM
+ Upstream host:port for VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS.
VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS
Comma-separated frontend dev-server hosts, default:
deploy domain plus api/ws/sfu/turn/cdn hosts.
@@ -177,6 +182,8 @@ refresh_deploy_config() {
DEPLOY_SFU_DOMAIN="${VIDEOCHAT_DEPLOY_SFU_DOMAIN:-}"
DEPLOY_TURN_DOMAIN="${VIDEOCHAT_DEPLOY_TURN_DOMAIN:-}"
DEPLOY_CDN_DOMAIN="${VIDEOCHAT_DEPLOY_CDN_DOMAIN:-}"
+ DEPLOY_EXTERNAL_DOMAINS="${VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS:-}"
+ DEPLOY_EXTERNAL_UPSTREAM="${VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM:-}"
if [[ -n "${DEPLOY_DOMAIN}" ]]; then
DEPLOY_API_DOMAIN="${DEPLOY_API_DOMAIN:-api.${DEPLOY_DOMAIN}}"
@@ -190,12 +197,17 @@ refresh_deploy_config() {
if [[ -z "${DEPLOY_VUE_ALLOWED_HOSTS}" && -n "${DEPLOY_DOMAIN}" ]]; then
DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_DOMAIN},${DEPLOY_API_DOMAIN},${DEPLOY_WS_DOMAIN},${DEPLOY_SFU_DOMAIN},${DEPLOY_TURN_DOMAIN},${DEPLOY_CDN_DOMAIN}"
fi
+ if [[ -n "${DEPLOY_EXTERNAL_DOMAINS}" ]]; then
+ DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_VUE_ALLOWED_HOSTS:+${DEPLOY_VUE_ALLOWED_HOSTS},}${DEPLOY_EXTERNAL_DOMAINS}"
+ fi
export VIDEOCHAT_DEPLOY_API_DOMAIN="${DEPLOY_API_DOMAIN}"
export VIDEOCHAT_DEPLOY_WS_DOMAIN="${DEPLOY_WS_DOMAIN}"
export VIDEOCHAT_DEPLOY_SFU_DOMAIN="${DEPLOY_SFU_DOMAIN}"
export VIDEOCHAT_DEPLOY_TURN_DOMAIN="${DEPLOY_TURN_DOMAIN}"
export VIDEOCHAT_DEPLOY_CDN_DOMAIN="${DEPLOY_CDN_DOMAIN}"
+ export VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS="${DEPLOY_EXTERNAL_DOMAINS}"
+ export VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM="${DEPLOY_EXTERNAL_UPSTREAM}"
export VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS="${DEPLOY_VUE_ALLOWED_HOSTS}"
if [[ -n "${DEPLOY_REFRESH_KNOWN_HOSTS}" ]]; then
export VIDEOCHAT_DEPLOY_REFRESH_KNOWN_HOSTS="${DEPLOY_REFRESH_KNOWN_HOSTS}"
@@ -223,8 +235,11 @@ deploy_refresh_known_hosts_enabled() {
deploy_dns_targets() {
local target seen=""
local legacy_cdn_domain=""
+ local external_domains=()
[[ -n "${DEPLOY_DOMAIN}" ]] && legacy_cdn_domain="cnd.${DEPLOY_DOMAIN}"
- for target in "${DEPLOY_DOMAIN}" "${DEPLOY_API_DOMAIN}" "${DEPLOY_WS_DOMAIN}" "${DEPLOY_SFU_DOMAIN}" "${DEPLOY_TURN_DOMAIN}" "${DEPLOY_CDN_DOMAIN}" "${legacy_cdn_domain}"; do
+ IFS=',' read -r -a external_domains <<< "${DEPLOY_EXTERNAL_DOMAINS}"
+ for target in "${DEPLOY_DOMAIN}" "${DEPLOY_API_DOMAIN}" "${DEPLOY_WS_DOMAIN}" "${DEPLOY_SFU_DOMAIN}" "${DEPLOY_TURN_DOMAIN}" "${DEPLOY_CDN_DOMAIN}" "${legacy_cdn_domain}" "${external_domains[@]}"; do
+ target="${target//[[:space:]]/}"
[[ -n "${target}" ]] || continue
case " ${seen} " in
*" ${target} "*) continue ;;
@@ -304,6 +319,8 @@ persist_current_deploy_config() {
local_env_upsert VIDEOCHAT_DEPLOY_SFU_DOMAIN "${DEPLOY_SFU_DOMAIN}"
local_env_upsert VIDEOCHAT_DEPLOY_TURN_DOMAIN "${DEPLOY_TURN_DOMAIN}"
local_env_upsert VIDEOCHAT_DEPLOY_CDN_DOMAIN "${DEPLOY_CDN_DOMAIN}"
+ local_env_upsert VIDEOCHAT_DEPLOY_EXTERNAL_DOMAINS "${DEPLOY_EXTERNAL_DOMAINS}"
+ local_env_upsert VIDEOCHAT_DEPLOY_EXTERNAL_UPSTREAM "${DEPLOY_EXTERNAL_UPSTREAM}"
local_env_upsert VIDEOCHAT_DEPLOY_VUE_ALLOWED_HOSTS "${DEPLOY_VUE_ALLOWED_HOSTS}"
[[ -n "${VIDEOCHAT_DEPLOY_SSH_KEY:-}" ]] && local_env_upsert VIDEOCHAT_DEPLOY_SSH_KEY "${VIDEOCHAT_DEPLOY_SSH_KEY}"
[[ -n "${VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE:-}" ]] && local_env_upsert VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE "${VIDEOCHAT_DEPLOY_KNOWN_HOSTS_FILE}"
@@ -450,7 +467,7 @@ if command -v ufw >/dev/null 2>&1 && \${SUDO}ufw status | grep -q 'Status: activ
\${SUDO}ufw allow 443/tcp >/dev/null || true
\${SUDO}ufw allow 3478/tcp >/dev/null || true
\${SUDO}ufw allow 3478/udp >/dev/null || true
- \${SUDO}ufw allow 49160:49200/udp >/dev/null || true
+ \${SUDO}ufw allow 49160:49660/udp >/dev/null || true
fi
if command -v firewall-cmd >/dev/null 2>&1 && \${SUDO}firewall-cmd --state >/dev/null 2>&1; then
@@ -458,7 +475,7 @@ if command -v firewall-cmd >/dev/null 2>&1 && \${SUDO}firewall-cmd --state >/dev
\${SUDO}firewall-cmd --permanent --add-service=https >/dev/null || true
\${SUDO}firewall-cmd --permanent --add-port=3478/tcp >/dev/null || true
\${SUDO}firewall-cmd --permanent --add-port=3478/udp >/dev/null || true
- \${SUDO}firewall-cmd --permanent --add-port=49160-49200/udp >/dev/null || true
+ \${SUDO}firewall-cmd --permanent --add-port=49160-49660/udp >/dev/null || true
\${SUDO}firewall-cmd --reload >/dev/null || true
fi
@@ -496,7 +513,7 @@ sync_checkout() {
}
certbot_standalone() {
- local deploy_path_q domain_q email_q api_domain_q ws_domain_q sfu_domain_q turn_domain_q cdn_domain_q legacy_cdn_domain_q
+ local deploy_path_q domain_q email_q api_domain_q ws_domain_q sfu_domain_q turn_domain_q cdn_domain_q legacy_cdn_domain_q external_domains_q
deploy_path_q="$(shell_quote "${DEPLOY_PATH}")"
domain_q="$(shell_quote "${DEPLOY_DOMAIN}")"
email_q="$(shell_quote "${DEPLOY_EMAIL}")"
@@ -506,6 +523,7 @@ certbot_standalone() {
turn_domain_q="$(shell_quote "${DEPLOY_TURN_DOMAIN}")"
cdn_domain_q="$(shell_quote "${DEPLOY_CDN_DOMAIN}")"
legacy_cdn_domain_q="$(shell_quote "cnd.${DEPLOY_DOMAIN}")"
+ external_domains_q="$(shell_quote "${DEPLOY_EXTERNAL_DOMAINS}")"
log "Obtaining/renewing Let's Encrypt cert for ${DEPLOY_DOMAIN}"
remote_bash <ai_next) {
int reuse_addr = 1;
+#ifdef SO_REUSEPORT
+ int reuse_port = 1;
+#endif
listener_fd = socket(cursor->ai_family, cursor->ai_socktype, cursor->ai_protocol);
if (listener_fd < 0) {
@@ -256,6 +276,18 @@ static zend_result king_server_http1_open_listener_socket(
sizeof(reuse_addr)
);
+#ifdef SO_REUSEPORT
+ if (king_server_http1_reuseport_enabled()) {
+ (void) setsockopt(
+ listener_fd,
+ SOL_SOCKET,
+ SO_REUSEPORT,
+ &reuse_port,
+ sizeof(reuse_port)
+ );
+ }
+#endif
+
if (bind(listener_fd, cursor->ai_addr, cursor->ai_addrlen) == 0) {
if (listen(listener_fd, SOMAXCONN) == 0) {
*listener_fd_out = listener_fd;
diff --git a/extension/tests/740-http1-listener-exclusive-bind-contract.phpt b/extension/tests/740-http1-listener-exclusive-bind-contract.phpt
index c198593f4..2b1bcf656 100644
--- a/extension/tests/740-http1-listener-exclusive-bind-contract.phpt
+++ b/extension/tests/740-http1-listener-exclusive-bind-contract.phpt
@@ -11,12 +11,46 @@ if (!is_readable('/proc/net/tcp')) {
return;
}
-$probe = trim((string) shell_exec(
- 'command -v python3 >/dev/null 2>&1'
- . ' && python3 -c ' . escapeshellarg("import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)")
- . ' >/dev/null 2>&1 && printf yes'
-));
-if ($probe !== 'yes') {
+$probe = false;
+$checkPython = proc_open(
+ ['command', '-v', 'python3'],
+ [
+ 1 => ['pipe', 'w'],
+ 2 => ['pipe', 'w'],
+ ],
+ $pythonPipes
+);
+if (is_resource($checkPython)) {
+ stream_get_contents($pythonPipes[1]);
+ stream_get_contents($pythonPipes[2]);
+ foreach ($pythonPipes as $pipe) {
+ fclose($pipe);
+ }
+ $pythonExit = proc_close($checkPython);
+
+ if ($pythonExit === 0) {
+ $checkReusePort = proc_open(
+ ['python3', '-c', "import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)"],
+ [
+ 1 => ['pipe', 'w'],
+ 2 => ['pipe', 'w'],
+ ],
+ $reusePortPipes
+ );
+ if (is_resource($checkReusePort)) {
+ stream_get_contents($reusePortPipes[1]);
+ stream_get_contents($reusePortPipes[2]);
+ foreach ($reusePortPipes as $pipe) {
+ fclose($pipe);
+ }
+ $reusePortExit = proc_close($checkReusePort);
+ if ($reusePortExit === 0) {
+ $probe = true;
+ }
+ }
+ }
+}
+if (!$probe) {
echo "skip python3 with socket.SO_REUSEPORT is required";
}
?>
@@ -75,7 +109,7 @@ finally:
sock.close()
PY;
- $command = 'python3 -c ' . escapeshellarg($script) . ' ' . (int) $port;
+ $command = ['python3', '-c', $script, (string) $port];
$process = proc_open($command, [
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
@@ -91,8 +125,10 @@ PY;
}
$exitCode = proc_close($process);
- if ($exitCode !== 0 && trim($stdout) === '') {
- throw new RuntimeException('duplicate bind probe failed: ' . trim($stderr));
+ if ($exitCode !== 0) {
+ throw new RuntimeException(
+ 'duplicate bind probe failed (exit=' . $exitCode . '): stderr=' . trim($stderr) . '; stdout=' . trim($stdout)
+ );
}
return trim($stdout);
@@ -124,7 +160,7 @@ var_dump($capture['listen_result']);
var_dump($capture['listen_error']);
?>
--EXPECTF--
-string(%d) "BIND_FAIL errno=%d"
+string(18) "BIND_FAIL errno=98"
int(426)
bool(true)
string(0) ""
diff --git a/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt b/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt
new file mode 100644
index 000000000..2d48c1be7
--- /dev/null
+++ b/extension/tests/741-http1-listener-reuseport-opt-in-contract.phpt
@@ -0,0 +1,132 @@
+--TEST--
+King HTTP/1 one-shot listener enables SO_REUSEPORT only for explicit trusted worker opt-in
+--SKIPIF--
+/dev/null 2>&1'
+ . ' && python3 -c ' . escapeshellarg("import socket; raise SystemExit(0 if hasattr(socket, 'SO_REUSEPORT') else 1)")
+ . ' >/dev/null 2>&1 && printf yes'
+));
+if ($probe !== 'yes') {
+ echo "skip python3 with socket.SO_REUSEPORT is required";
+}
+?>
+--FILE--
+ ['pipe', 'w'],
+ 2 => ['pipe', 'w'],
+ ], $pipes);
+ if (!is_resource($process)) {
+ throw new RuntimeException('failed to launch duplicate bind probe');
+ }
+
+ $stdout = stream_get_contents($pipes[1]);
+ $stderr = stream_get_contents($pipes[2]);
+ foreach ($pipes as $pipe) {
+ fclose($pipe);
+ }
+ $exitCode = proc_close($process);
+
+ if ($exitCode !== 0 && trim($stdout) === '') {
+ throw new RuntimeException('duplicate bind probe failed: ' . trim($stderr));
+ }
+
+ return trim($stdout);
+}
+
+$server = king_server_websocket_wire_start_server('plain', 1, null, [
+ 'KING_HTTP1_ENABLE_REUSEPORT' => '1',
+]);
+$capture = [];
+$response = '';
+
+try {
+ king_http1_wait_for_reuseport_listen_port($server['port']);
+ $bindAttempt = king_http1_reuseport_bind_attempt($server['port']);
+
+ $response = king_server_http1_wire_request_retry(
+ $server['port'],
+ "GET /reuseport-opt-in HTTP/1.1\r\n"
+ . "Host: 127.0.0.1\r\n"
+ . "Connection: close\r\n\r\n"
+ );
+} finally {
+ $capture = king_server_websocket_wire_stop_server($server);
+}
+
+$parsed = king_server_http1_wire_parse_response($response);
+
+var_dump($bindAttempt);
+var_dump($parsed['status']);
+var_dump($capture['listen_result']);
+var_dump($capture['listen_error']);
+?>
+--EXPECT--
+string(7) "BIND_OK"
+int(426)
+bool(true)
+string(0) ""
diff --git a/extension/tests/server_websocket_wire_helper.inc b/extension/tests/server_websocket_wire_helper.inc
index eedce182b..efcc7a119 100644
--- a/extension/tests/server_websocket_wire_helper.inc
+++ b/extension/tests/server_websocket_wire_helper.inc
@@ -2,76 +2,94 @@
/* Test helper for starting the on-wire HTTP/1/WebSocket one-shot listener harness. */
-function king_server_websocket_wire_start_server(string $mode, int $iterations = 1, ?int $fixedPort = null): array
+function king_server_websocket_wire_start_server(string $mode, int $iterations = 1, ?int $fixedPort = null, array $env = []): array
{
- if ($fixedPort !== null) {
- $port = $fixedPort;
- } else {
- $probe = stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr);
- if ($probe === false) {
- throw new RuntimeException("failed to reserve on-wire HTTP/1 test port: $errstr");
+ $maxAttempts = $fixedPort !== null ? 1 : 10;
+
+ for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
+ if ($fixedPort !== null) {
+ $port = $fixedPort;
+ } else {
+ $probe = stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr);
+ if ($probe === false) {
+ throw new RuntimeException("failed to reserve on-wire HTTP/1 test port: $errstr");
+ }
+
+ $serverName = stream_socket_get_name($probe, false);
+ fclose($probe);
+ [, $port] = explode(':', $serverName, 2);
}
- $serverName = stream_socket_get_name($probe, false);
- fclose($probe);
- [, $port] = explode(':', $serverName, 2);
- }
+ $capture = tempnam(sys_get_temp_dir(), 'king-server-websocket-wire-');
+ $extensionPath = dirname(__DIR__) . '/modules/king.so';
+ $commandParts = [
+ escapeshellarg(PHP_BINARY),
+ '-n',
+ '-d',
+ escapeshellarg('extension=' . $extensionPath),
+ '-d',
+ escapeshellarg('king.security_allow_config_override=1'),
+ ];
- $capture = tempnam(sys_get_temp_dir(), 'king-server-websocket-wire-');
- $extensionPath = dirname(__DIR__) . '/modules/king.so';
- $commandParts = [
- escapeshellarg(PHP_BINARY),
- '-n',
- '-d',
- escapeshellarg('extension=' . $extensionPath),
- '-d',
- escapeshellarg('king.security_allow_config_override=1'),
- ];
+ if ($mode === 'cors-allowlist') {
+ $commandParts[] = '-d';
+ $commandParts[] = escapeshellarg(
+ 'king.security_cors_allowed_origins=https://app.king.test, https://admin.king.test'
+ );
+ }
- if ($mode === 'cors-allowlist') {
- $commandParts[] = '-d';
- $commandParts[] = escapeshellarg(
- 'king.security_cors_allowed_origins=https://app.king.test, https://admin.king.test'
- );
- }
+ $commandParts = array_merge($commandParts, [
+ escapeshellarg(__DIR__ . '/server_websocket_wire_server.inc'),
+ escapeshellarg($capture),
+ (string) (int) $port,
+ escapeshellarg($mode),
+ (string) max(1, $iterations),
+ ]);
- $commandParts = array_merge($commandParts, [
- escapeshellarg(__DIR__ . '/server_websocket_wire_server.inc'),
- escapeshellarg($capture),
- (string) (int) $port,
- escapeshellarg($mode),
- (string) max(1, $iterations),
- ]);
+ $command = implode(' ', $commandParts);
- $command = implode(' ', $commandParts);
+ $processEnv = null;
+ if ($env !== []) {
+ $baseEnv = getenv();
+ $processEnv = is_array($baseEnv) ? array_merge($baseEnv, $env) : $env;
+ }
- $process = proc_open($command, [
- 1 => ['pipe', 'w'],
- 2 => ['pipe', 'w'],
- ], $pipes);
+ $process = proc_open($command, [
+ 1 => ['pipe', 'w'],
+ 2 => ['pipe', 'w'],
+ ], $pipes, null, $processEnv);
- if (!is_resource($process)) {
- @unlink($capture);
- throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server');
- }
+ if (!is_resource($process)) {
+ @unlink($capture);
+ if ($fixedPort !== null || $attempt === $maxAttempts) {
+ throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server');
+ }
+ continue;
+ }
+
+ $ready = fgets($pipes[1]);
+ if ($ready === "READY\n") {
+ return [
+ 'process' => $process,
+ 'pipes' => $pipes,
+ 'capture' => $capture,
+ 'port' => (int) $port,
+ ];
+ }
- $ready = fgets($pipes[1]);
- if ($ready !== "READY\n") {
$stderr = stream_get_contents($pipes[2]);
foreach ($pipes as $pipe) {
fclose($pipe);
}
proc_close($process);
@unlink($capture);
- throw new RuntimeException('on-wire HTTP/1 websocket server failed: ' . trim($stderr));
+
+ if ($fixedPort !== null || $attempt === $maxAttempts) {
+ throw new RuntimeException('on-wire HTTP/1 websocket server failed: ' . trim($stderr));
+ }
}
- return [
- 'process' => $process,
- 'pipes' => $pipes,
- 'capture' => $capture,
- 'port' => (int) $port,
- ];
+ throw new RuntimeException('failed to launch on-wire HTTP/1 websocket server');
}
function king_server_websocket_wire_stop_server(array $server): array
@@ -86,9 +104,15 @@ function king_server_websocket_wire_stop_server(array $server): array
$capture = [];
if (is_file($server['capture'])) {
- $capture = json_decode((string) file_get_contents($server['capture']), true);
- if (!is_array($capture)) {
+ $captureRaw = (string) file_get_contents($server['capture']);
+ $decodedCapture = json_decode($captureRaw, true);
+ if (is_array($decodedCapture)) {
+ $capture = $decodedCapture;
+ } else {
$capture = [];
+ if (json_last_error() !== JSON_ERROR_NONE) {
+ $capture['__json_error'] = json_last_error_msg();
+ }
}
@unlink($server['capture']);
}
@@ -134,9 +158,10 @@ function king_server_http1_wire_abort_request(int $port, string $request, int $s
{
$deadline = microtime(true) + 3.0;
$socket = false;
+ $message = 'connection timed out';
if (!function_exists('socket_create')) {
- throw new RuntimeException('socket_create() is required for HTTP/1 abort requests');
+ throw new RuntimeException('The sockets extension is required for HTTP/1 abort requests. Please enable it in php.ini.');
}
do {
@@ -239,7 +264,7 @@ function king_server_websocket_wire_raw_client_connect(int $port, string $path)
$message = 'connection failed';
if (!function_exists('socket_create')) {
- throw new RuntimeException('socket_create() is required for raw websocket client fixtures');
+ throw new RuntimeException('The sockets extension is required for raw websocket client fixtures. Please enable it in php.ini.');
}
do {
@@ -315,7 +340,9 @@ function king_server_websocket_wire_raw_client_send_text($socket, string $payloa
} elseif ($payloadLength <= 65535) {
$frame .= chr(0x80 | 126) . pack('n', $payloadLength);
} else {
- $frame .= chr(0x80 | 127) . pack('N2', 0, $payloadLength);
+ $high = intdiv($payloadLength, 4294967296);
+ $low = $payloadLength % 4294967296;
+ $frame .= chr(0x80 | 127) . pack('N2', $high, $low);
}
$maskedPayload = '';