From 7d82c5bbc78d1b52a2f4ff1e751792f7626880ea Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 10 Apr 2026 00:24:22 -0700 Subject: [PATCH 1/6] first pass at dynacast --- examples/local_video/src/publisher.rs | 20 +++- examples/local_video/src/subscriber.rs | 1 + libwebrtc/src/native/rtp_parameters.rs | 96 +++++++++++++++---- libwebrtc/src/rtp_parameters.rs | 31 ++++++ livekit/src/room/mod.rs | 85 ++++++++++++++++ .../room/participant/remote_participant.rs | 12 ++- livekit/src/room/track/local_video_track.rs | 84 ++++++++++++++++ livekit/src/rtc_engine/mod.rs | 6 ++ livekit/src/rtc_engine/rtc_session.rs | 13 +++ 9 files changed, 324 insertions(+), 24 deletions(-) diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index dc56f5969..e131baf80 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -75,6 +75,10 @@ struct Args { /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) #[arg(long, default_value_t = false)] h265: bool, + + /// Enable dynacast (pause unused simulcast layers based on subscriber demand) + #[arg(long, default_value_t = false)] + dynacast: bool, } fn list_cameras() -> Result<()> { @@ -137,6 +141,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity); let mut room_options = RoomOptions::default(); room_options.auto_subscribe = true; + room_options.dynacast = args.dynacast; let (room, _) = Room::connect(&url, &token, room_options).await?; let room = std::sync::Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); @@ -419,11 +424,24 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let secs = last_fps_log.elapsed().as_secs_f64(); let fps_est = frames as f64 / secs; let n = frames.max(1) as f64; + let layers = track.publishing_layers(); + let layers_str = if layers.is_empty() { + "n/a".to_string() + } else { + layers + .iter() + .map(|(rid, quality, active)| { + format!("{}({})={}", rid, quality, if *active { "ON" } else { "off" }) + }) + .collect::>() + .join(", ") + }; info!( - "Publishing video: {}x{}, ~{:.1} fps | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}", + "Publishing video: {}x{}, ~{:.1} fps | layers: [{}] | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}", width, height, fps_est, + layers_str, sum_get_ms / n, sum_decode_ms / n, sum_convert_ms / n, diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 255f9f867..54acf0004 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -502,6 +502,7 @@ impl eframe::App for VideoApp { let resp = ui.selectable_label(is_selected, label); if resp.clicked() { if let Some(ref pub_remote) = sc.publication { + info!("Requesting layer: {:?}", q); pub_remote.set_video_quality(q); sc.requested_quality = Some(q); } diff --git a/libwebrtc/src/native/rtp_parameters.rs b/libwebrtc/src/native/rtp_parameters.rs index 866447d93..cce96b976 100644 --- a/libwebrtc/src/native/rtp_parameters.rs +++ b/libwebrtc/src/native/rtp_parameters.rs @@ -39,7 +39,13 @@ impl From for RtpParameters { Self { codecs: value.codecs.into_iter().map(Into::into).collect(), header_extensions: value.header_extensions.into_iter().map(Into::into).collect(), + encodings: value.encodings.into_iter().map(Into::into).collect(), rtcp: value.rtcp.into(), + transaction_id: value.transaction_id, + mid: value.mid, + has_degradation_preference: value.has_degradation_preference, + // Safety: DegradationPreference is #[repr(i32)] + degradation_preference: unsafe { std::mem::transmute(value.degradation_preference) }, } } } @@ -51,13 +57,40 @@ impl From for RtpCodecParameters { payload_type: value.payload_type as u8, clock_rate: value.has_clock_rate.then_some(value.clock_rate as u64), channels: value.has_num_channels.then_some(value.num_channels as u16), + name: value.name, + // Safety: MediaType, RtcpFeedbackType, RtcpFeedbackMessageType are #[repr(i32)] + kind: unsafe { std::mem::transmute(value.kind) }, + has_max_ptime: value.has_max_ptime, + max_ptime: value.max_ptime, + has_ptime: value.has_ptime, + ptime: value.ptime, + rtcp_feedback: value + .rtcp_feedback + .into_iter() + .map(|f| CodecFeedback { + feedback_type: unsafe { std::mem::transmute(f.feedback_type) }, + has_message_type: f.has_message_type, + message_type: unsafe { std::mem::transmute(f.message_type) }, + }) + .collect(), + parameters: value + .parameters + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect(), } } } impl From for RtcpParameters { fn from(value: sys_rp::ffi::RtcpParameters) -> Self { - Self { cname: value.cname, reduced_size: value.reduced_size } + Self { + cname: value.cname, + reduced_size: value.reduced_size, + mux: value.mux, + has_ssrc: value.has_ssrc, + ssrc: value.ssrc, + } } } @@ -72,6 +105,8 @@ impl From for RtpEncodingParameters { scale_resolution_down_by: value .has_scale_resolution_down_by .then_some(value.scale_resolution_down_by), + has_ssrc: value.has_ssrc, + ssrc: value.ssrc, } } } @@ -139,21 +174,26 @@ impl From for sys_rp::ffi::RtpExtension { impl From for sys_rp::ffi::RtpParameters { fn from(value: RtpParameters) -> Self { + // Safety: DegradationPreference is #[repr(i32)] + let degradation_preference: sys_rp::ffi::DegradationPreference = + unsafe { std::mem::transmute(value.degradation_preference) }; Self { codecs: value.codecs.into_iter().map(Into::into).collect(), header_extensions: value.header_extensions.into_iter().map(Into::into).collect(), - encodings: Vec::new(), + encodings: value.encodings.into_iter().map(Into::into).collect(), rtcp: value.rtcp.into(), - transaction_id: "".to_string(), - mid: "".to_string(), - has_degradation_preference: false, - degradation_preference: sys_rp::ffi::DegradationPreference::Balanced, + transaction_id: value.transaction_id, + mid: value.mid, + has_degradation_preference: value.has_degradation_preference, + degradation_preference, } } } impl From for sys_rp::ffi::RtpCodecParameters { fn from(value: RtpCodecParameters) -> Self { + // Safety: MediaType, RtcpFeedbackType, RtcpFeedbackMessageType are all #[repr(i32)] + let kind: sys_webrtc::ffi::MediaType = unsafe { std::mem::transmute(value.kind) }; Self { payload_type: value.payload_type as i32, mime_type: value.mime_type, @@ -161,14 +201,32 @@ impl From for sys_rp::ffi::RtpCodecParameters { clock_rate: value.clock_rate.unwrap_or_default() as i32, has_num_channels: value.channels.is_some(), num_channels: value.channels.unwrap_or_default() as i32, - name: "".to_string(), - kind: sys_rp::ffi::MediaType::Audio, - has_max_ptime: false, - max_ptime: 0, - has_ptime: false, - ptime: 0, - rtcp_feedback: Vec::new(), - parameters: Vec::new(), + name: value.name, + kind, + has_max_ptime: value.has_max_ptime, + max_ptime: value.max_ptime, + has_ptime: value.has_ptime, + ptime: value.ptime, + rtcp_feedback: value + .rtcp_feedback + .into_iter() + .map(|f| { + let feedback_type: sys_rp::ffi::RtcpFeedbackType = + unsafe { std::mem::transmute(f.feedback_type) }; + let message_type: sys_rp::ffi::RtcpFeedbackMessageType = + unsafe { std::mem::transmute(f.message_type) }; + sys_rp::ffi::RtcpFeedback { + feedback_type, + has_message_type: f.has_message_type, + message_type, + } + }) + .collect(), + parameters: value + .parameters + .into_iter() + .map(|(key, value)| sys_rp::ffi::StringKeyValue { key, value }) + .collect(), } } } @@ -178,9 +236,9 @@ impl From for sys_rp::ffi::RtcpParameters { Self { cname: value.cname, reduced_size: value.reduced_size, - has_ssrc: false, - ssrc: 0, - mux: false, + has_ssrc: value.has_ssrc, + ssrc: value.ssrc, + mux: value.mux, } } } @@ -205,8 +263,8 @@ impl From for sys_rp::ffi::RtpEncodingParameters { num_temporal_layers: 0, has_scalability_mode: false, scalability_mode: "".to_string(), - has_ssrc: false, - ssrc: 0, + has_ssrc: value.has_ssrc, + ssrc: value.ssrc, } } } diff --git a/libwebrtc/src/rtp_parameters.rs b/libwebrtc/src/rtp_parameters.rs index 559b2d504..df634ddbe 100644 --- a/libwebrtc/src/rtp_parameters.rs +++ b/libwebrtc/src/rtp_parameters.rs @@ -33,7 +33,22 @@ pub struct RtpHeaderExtensionParameters { pub struct RtpParameters { pub codecs: Vec, pub header_extensions: Vec, + pub encodings: Vec, pub rtcp: RtcpParameters, + /// Opaque token used by WebRTC to pair getParameters/setParameters calls. + /// Must be preserved when round-tripping through set_parameters(). + pub(crate) transaction_id: String, + pub(crate) mid: String, + pub(crate) has_degradation_preference: bool, + pub(crate) degradation_preference: i32, +} + +/// Mirrors webrtc_sys RtcpFeedback for round-trip fidelity. +#[derive(Debug, Clone, Default)] +pub(crate) struct CodecFeedback { + pub feedback_type: i32, + pub has_message_type: bool, + pub message_type: i32, } #[derive(Debug, Clone, Default)] @@ -42,12 +57,23 @@ pub struct RtpCodecParameters { pub mime_type: String, // read-only pub clock_rate: Option, pub channels: Option, + pub(crate) name: String, + pub(crate) kind: i32, + pub(crate) has_max_ptime: bool, + pub(crate) max_ptime: i32, + pub(crate) has_ptime: bool, + pub(crate) ptime: i32, + pub(crate) rtcp_feedback: Vec, + pub(crate) parameters: Vec<(String, String)>, } #[derive(Debug, Clone, Default)] pub struct RtcpParameters { pub cname: String, pub reduced_size: bool, + pub(crate) mux: bool, + pub(crate) has_ssrc: bool, + pub(crate) ssrc: u32, } #[derive(Debug, Clone)] @@ -58,6 +84,9 @@ pub struct RtpEncodingParameters { pub priority: Priority, pub rid: String, pub scale_resolution_down_by: Option, + /// Preserved for round-trip fidelity with WebRTC's getParameters/setParameters. + pub has_ssrc: bool, + pub ssrc: u32, } #[derive(Debug, Clone)] @@ -89,6 +118,8 @@ impl Default for RtpEncodingParameters { priority: Priority::Low, rid: String::default(), scale_resolution_down_by: None, + has_ssrc: false, + ssrc: 0, } } } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 0035237b6..4e6f779a8 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1030,6 +1030,9 @@ impl RoomSession { EngineEvent::TrackMuted { sid, muted } => { self.handle_server_initiated_mute_track(sid, muted); } + EngineEvent::SubscribedQualityUpdate { update } => { + self.handle_subscribed_quality_update(update); + } EngineEvent::LocalDataTrackInput(event) => { _ = self.local_dt_input.send(event); } @@ -1814,6 +1817,88 @@ impl RoomSession { log::warn!("Track not found in mute request: {}", sid_for_log); } + #[allow(deprecated)] + fn handle_subscribed_quality_update(&self, update: proto::SubscribedQualityUpdate) { + if !self.options.dynacast { + return; + } + + let track_sid: TrackSid = match update.track_sid.clone().try_into() { + Ok(sid) => sid, + Err(_) => { + log::warn!( + "dynacast: invalid track sid in subscribed quality update: {}", + update.track_sid + ); + return; + } + }; + + let publication = match self.local_participant.get_track_publication(&track_sid) { + Some(pub_) => pub_, + None => { + log::warn!( + "dynacast: local track publication not found for sid {}", + track_sid + ); + return; + } + }; + + let video_track = match publication.track() { + Some(LocalTrack::Video(vt)) => vt, + _ => { + log::debug!( + "dynacast: track {} is not a local video track, ignoring quality update", + track_sid + ); + return; + } + }; + + let qualities: Vec = if !update.subscribed_codecs.is_empty() { + let codec = publication.publish_options().video_codec.as_str().to_lowercase(); + log::info!( + "dynacast: SFU quality update for {}: subscribed_codecs={:?}, looking for codec '{}'", + track_sid, + update.subscribed_codecs.iter().map(|sc| { + let qs: Vec = sc.qualities.iter().map(|q| { + format!("{:?}={}", proto::VideoQuality::try_from(q.quality).unwrap_or(proto::VideoQuality::High), q.enabled) + }).collect(); + format!("{}:[{}]", sc.codec, qs.join(", ")) + }).collect::>().join("; "), + codec, + ); + update + .subscribed_codecs + .iter() + .find(|sc| sc.codec.to_lowercase() == codec) + .map(|sc| sc.qualities.clone()) + .unwrap_or_else(|| { + log::warn!("dynacast: codec '{}' not found in subscribed_codecs, falling back to first", codec); + update + .subscribed_codecs + .first() + .map(|sc| sc.qualities.clone()) + .unwrap_or_default() + }) + } else { + let qs: Vec = update.subscribed_qualities.iter().map(|q| { + format!("{:?}={}", proto::VideoQuality::try_from(q.quality).unwrap_or(proto::VideoQuality::High), q.enabled) + }).collect(); + log::info!( + "dynacast: SFU quality update for {} (legacy): [{}]", + track_sid, + qs.join(", "), + ); + update.subscribed_qualities.clone() + }; + + if let Err(e) = video_track.set_publishing_layers(&qualities) { + log::error!("dynacast: failed to set publishing layers for {}: {}", track_sid, e); + } + } + /// Create a new participant /// Also add it to the participants list fn create_participant( diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index eea7e5672..bf6a2febd 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -456,18 +456,22 @@ impl RemoteParticipant { let rtc_engine = rtc_engine.clone(); livekit_runtime::spawn(async move { let tsid: String = publication.sid().into(); - let quality = match quality { + let proto_quality = match quality { VideoQuality::Low => proto::VideoQuality::Low, VideoQuality::Medium => proto::VideoQuality::Medium, VideoQuality::High => proto::VideoQuality::High, - } - .into(); + }; let update_track_settings = proto::UpdateTrackSettings { track_sids: vec![tsid.clone()], - quality, + quality: proto_quality.into(), ..Default::default() }; + log::info!( + "subscriber: sending UpdateTrackSettings to SFU: track={}, quality={:?}", + tsid, + proto_quality, + ); rtc_engine .send_request(proto::signal_request::Message::TrackSetting( update_track_settings, diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index c7c26649b..a39b83cab 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -146,4 +146,88 @@ impl LocalVideoTrack { pub(crate) fn update_info(&self, info: proto::TrackInfo) { super::update_info(&self.inner, &Track::LocalVideo(self.clone()), info); } + + /// Returns a snapshot of each simulcast layer's RID, quality label, and active state. + /// Useful for diagnostics / HUD display. Returns an empty vec if no transceiver is set. + pub fn publishing_layers(&self) -> Vec<(String, String, bool)> { + let Some(transceiver) = self.transceiver() else { + return Vec::new(); + }; + let params = transceiver.sender().parameters(); + params + .encodings + .iter() + .map(|e| { + let quality = crate::options::video_quality_for_rid(&e.rid) + .unwrap_or(proto::VideoQuality::High); + (e.rid.clone(), format!("{:?}", quality), e.active) + }) + .collect() + } + + /// Toggle simulcast encoding layers on/off based on subscriber demand. + /// Used by dynacast: the SFU tells us which quality levels are needed, + /// and we set `encoding.active` accordingly on the RTP sender. + pub(crate) fn set_publishing_layers( + &self, + qualities: &[proto::SubscribedQuality], + ) -> RoomResult<()> { + let transceiver = self.transceiver().ok_or_else(|| { + RoomError::Internal("cannot set publishing layers: no transceiver".into()) + })?; + + let sender = transceiver.sender(); + let mut params = sender.parameters(); + + let mut any_enabled = false; + let mut changed = false; + for encoding in &mut params.encodings { + let quality = crate::options::video_quality_for_rid(&encoding.rid) + .unwrap_or(proto::VideoQuality::High); + + let should_active = qualities + .iter() + .find(|q| q.quality == quality as i32) + .map(|q| q.enabled) + .unwrap_or(false); + + if should_active { + any_enabled = true; + } + + if encoding.active != should_active { + changed = true; + encoding.active = should_active; + } + } + + if !any_enabled { + for encoding in &mut params.encodings { + encoding.active = false; + } + } + + let layers: Vec = params + .encodings + .iter() + .map(|e| { + let quality = crate::options::video_quality_for_rid(&e.rid) + .unwrap_or(proto::VideoQuality::High); + let state = if e.active { "ON" } else { "off" }; + format!("{}({:?})={}", e.rid, quality, state) + }) + .collect(); + + if changed { + log::info!("dynacast: layers changed -> [{}]", layers.join(", ")); + } else { + log::debug!("dynacast: layers unchanged [{}]", layers.join(", ")); + } + + sender.set_parameters(params).map_err(|e| { + RoomError::Internal(format!("failed to set sender parameters: {}", e)) + })?; + + Ok(()) + } } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 1ad21f7de..8b16a94d3 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -192,6 +192,9 @@ pub enum EngineEvent { sid: String, muted: bool, }, + SubscribedQualityUpdate { + update: proto::SubscribedQualityUpdate, + }, LocalDataTrackInput(dt::local::InputEvent), RemoteDataTrackInput(dt::remote::InputEvent), } @@ -638,6 +641,9 @@ impl EngineInner { SessionEvent::TrackMuted { sid, muted } => { let _ = self.engine_tx.send(EngineEvent::TrackMuted { sid, muted }); } + SessionEvent::SubscribedQualityUpdate { update } => { + let _ = self.engine_tx.send(EngineEvent::SubscribedQualityUpdate { update }); + } SessionEvent::LocalDataTrackInput(event) => { let _ = self.engine_tx.send(EngineEvent::LocalDataTrackInput(event)); } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 3b181fae5..5744064cd 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -205,6 +205,9 @@ pub enum SessionEvent { sid: String, muted: bool, }, + SubscribedQualityUpdate { + update: proto::SubscribedQualityUpdate, + }, LocalDataTrackInput(dt::local::InputEvent), RemoteDataTrackInput(dt::remote::InputEvent), } @@ -1209,6 +1212,16 @@ impl SessionInner { self.handle_media_sections_requirement(req)?; } } + #[allow(deprecated)] + proto::signal_response::Message::SubscribedQualityUpdate(update) => { + log::debug!( + "received subscribed quality update for track {}: {:?}", + update.track_sid, + update.subscribed_codecs + ); + let _ = + self.emitter.send(SessionEvent::SubscribedQualityUpdate { update }); + } _ => {} } From 6dd468e80d122679c032408f3da65f46939224f2 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 10 Apr 2026 00:32:13 -0700 Subject: [PATCH 2/6] first pass at dynacast --- livekit/src/rtc_engine/rtc_session.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 5744064cd..0aabbb0cb 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1212,7 +1212,6 @@ impl SessionInner { self.handle_media_sections_requirement(req)?; } } - #[allow(deprecated)] proto::signal_response::Message::SubscribedQualityUpdate(update) => { log::debug!( "received subscribed quality update for track {}: {:?}", From 464f3ba92a23e9a0297cfeaeb256b03b38b58a34 Mon Sep 17 00:00:00 2001 From: David Chen Date: Fri, 10 Apr 2026 17:20:33 -0700 Subject: [PATCH 3/6] add e2e test --- libwebrtc/src/native/rtp_parameters.rs | 6 +- livekit/src/room/mod.rs | 20 ++- livekit/src/room/track/local_video_track.rs | 6 +- livekit/src/rtc_engine/rtc_session.rs | 3 +- livekit/tests/dynacast_test.rs | 164 ++++++++++++++++++++ 5 files changed, 182 insertions(+), 17 deletions(-) create mode 100644 livekit/tests/dynacast_test.rs diff --git a/libwebrtc/src/native/rtp_parameters.rs b/libwebrtc/src/native/rtp_parameters.rs index cce96b976..571ae1b8c 100644 --- a/libwebrtc/src/native/rtp_parameters.rs +++ b/libwebrtc/src/native/rtp_parameters.rs @@ -73,11 +73,7 @@ impl From for RtpCodecParameters { message_type: unsafe { std::mem::transmute(f.message_type) }, }) .collect(), - parameters: value - .parameters - .into_iter() - .map(|kv| (kv.key, kv.value)) - .collect(), + parameters: value.parameters.into_iter().map(|kv| (kv.key, kv.value)).collect(), } } } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 4e6f779a8..f1a6b8eed 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1837,10 +1837,7 @@ impl RoomSession { let publication = match self.local_participant.get_track_publication(&track_sid) { Some(pub_) => pub_, None => { - log::warn!( - "dynacast: local track publication not found for sid {}", - track_sid - ); + log::warn!("dynacast: local track publication not found for sid {}", track_sid); return; } }; @@ -1883,9 +1880,18 @@ impl RoomSession { .unwrap_or_default() }) } else { - let qs: Vec = update.subscribed_qualities.iter().map(|q| { - format!("{:?}={}", proto::VideoQuality::try_from(q.quality).unwrap_or(proto::VideoQuality::High), q.enabled) - }).collect(); + let qs: Vec = update + .subscribed_qualities + .iter() + .map(|q| { + format!( + "{:?}={}", + proto::VideoQuality::try_from(q.quality) + .unwrap_or(proto::VideoQuality::High), + q.enabled + ) + }) + .collect(); log::info!( "dynacast: SFU quality update for {} (legacy): [{}]", track_sid, diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index a39b83cab..df41778ed 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -224,9 +224,9 @@ impl LocalVideoTrack { log::debug!("dynacast: layers unchanged [{}]", layers.join(", ")); } - sender.set_parameters(params).map_err(|e| { - RoomError::Internal(format!("failed to set sender parameters: {}", e)) - })?; + sender + .set_parameters(params) + .map_err(|e| RoomError::Internal(format!("failed to set sender parameters: {}", e)))?; Ok(()) } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 0aabbb0cb..678be451a 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1218,8 +1218,7 @@ impl SessionInner { update.track_sid, update.subscribed_codecs ); - let _ = - self.emitter.send(SessionEvent::SubscribedQualityUpdate { update }); + let _ = self.emitter.send(SessionEvent::SubscribedQualityUpdate { update }); } _ => {} } diff --git a/livekit/tests/dynacast_test.rs b/livekit/tests/dynacast_test.rs new file mode 100644 index 000000000..5bef7dfaa --- /dev/null +++ b/livekit/tests/dynacast_test.rs @@ -0,0 +1,164 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "__lk-e2e-test")] +use { + anyhow::{anyhow, Result}, + common::{ + test_rooms_with_options, + video::{SolidColorParams, SolidColorTrack}, + TestRoomOptions, + }, + livekit::{options::VideoCodec, prelude::*, track::VideoQuality}, + std::{sync::Arc, time::Duration}, + tokio::time::{self, timeout}, +}; + +mod common; + +/// Extracts the `LocalVideoTrack` from the publisher's first video track publication. +#[cfg(feature = "__lk-e2e-test")] +fn publisher_video_track(room: &Room) -> Result { + for pub_ in room.local_participant().track_publications().values() { + if let Some(LocalTrack::Video(vt)) = pub_.track() { + return Ok(vt); + } + } + Err(anyhow!("No local video track publication found")) +} + +/// Polls `publishing_layers()` until the `check` predicate returns true, or times out. +#[cfg(feature = "__lk-e2e-test")] +async fn wait_for_layers( + track: &LocalVideoTrack, + label: &str, + max_wait: Duration, + check: impl Fn(&[(String, String, bool)]) -> bool, +) -> Result> { + let deadline = tokio::time::Instant::now() + max_wait; + loop { + let layers = track.publishing_layers(); + log::info!("dynacast test [{}]: layers = {:?}", label, layers); + if check(&layers) { + return Ok(layers); + } + if tokio::time::Instant::now() >= deadline { + return Err(anyhow!( + "dynacast test [{}]: timed out waiting for expected layer state, last = {:?}", + label, + layers + )); + } + time::sleep(Duration::from_millis(250)).await; + } +} + +/// Verifies that dynacast toggles publisher simulcast layers in response to subscriber quality +/// requests. +/// +/// 1. Publisher connects with `dynacast: true` and publishes a simulcast VP8 track. +/// 2. Subscriber receives the track -- baseline expects all layers active. +/// 3. Subscriber requests LOW quality via `set_video_quality` -- the SFU should send a +/// `SubscribedQualityUpdate` that disables the higher layers. +/// 4. Subscriber requests HIGH quality again -- all layers should re-activate. +/// +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_dynacast() -> Result<()> { + let mut pub_room_opts = RoomOptions::default(); + pub_room_opts.dynacast = true; + let pub_options = TestRoomOptions { room: pub_room_opts, ..Default::default() }; + let sub_options = TestRoomOptions::default(); + + let mut rooms = test_rooms_with_options([pub_options, sub_options]).await?; + let (pub_room, _pub_events) = rooms.remove(0); + let (_sub_room, mut sub_events) = rooms.remove(0); + + let pub_room = Arc::new(pub_room); + let solid_params = SolidColorParams { width: 1280, height: 720, luma: 128 }; + let mut solid_track = SolidColorTrack::new(pub_room.clone(), solid_params); + solid_track.publish(VideoCodec::VP8, true).await?; + + let sub_publication: RemoteTrackPublication = timeout(Duration::from_secs(15), async { + loop { + let Some(event) = sub_events.recv().await else { + return Err(anyhow!("Event channel closed before TrackSubscribed")); + }; + if let RoomEvent::TrackSubscribed { publication, .. } = event { + return Ok(publication); + } + } + }) + .await??; + + let pub_video_track = publisher_video_track(&pub_room)?; + + // --- Baseline: all simulcast layers should be active after initial subscription --- + let layers = wait_for_layers( + &pub_video_track, + "baseline", + Duration::from_secs(15), + |layers| layers.len() > 1 && layers.iter().all(|(_, _, active)| *active), + ) + .await?; + log::info!("dynacast baseline layers: {:?}", layers); + assert!(layers.len() > 1, "expected multiple simulcast layers, got {}", layers.len()); + + // --- Request LOW quality: SFU should tell publisher to deactivate Medium and High --- + log::info!("dynacast test: requesting LOW quality"); + sub_publication.set_video_quality(VideoQuality::Low); + + let layers = wait_for_layers( + &pub_video_track, + "after LOW request", + Duration::from_secs(30), + |layers| { + let low_active = layers.iter().any(|(_, q, a)| q == "Low" && *a); + let high_inactive = layers.iter().filter(|(_, q, _)| q != "Low").all(|(_, _, a)| !*a); + low_active && high_inactive + }, + ) + .await?; + log::info!("dynacast layers after LOW request: {:?}", layers); + assert!( + layers.iter().any(|(_, q, a)| q == "Low" && *a), + "expected Low layer to be active, got {:?}", + layers + ); + assert!( + layers.iter().filter(|(_, q, _)| q != "Low").all(|(_, _, a)| !*a), + "expected Medium and High layers to be inactive, got {:?}", + layers + ); + + // --- Request HIGH quality: all layers should become active again --- + log::info!("dynacast test: requesting HIGH quality"); + sub_publication.set_video_quality(VideoQuality::High); + + let layers = wait_for_layers( + &pub_video_track, + "after HIGH request", + Duration::from_secs(30), + |layers| layers.len() > 1 && layers.iter().all(|(_, _, active)| *active), + ) + .await?; + log::info!("dynacast layers after HIGH request: {:?}", layers); + assert!( + layers.iter().all(|(_, _, active)| *active), + "expected all layers active after HIGH request, got {:?}", + layers + ); + + Ok(()) +} From f7a226504bdc39075ca7b0b2c196a6444f4a0e55 Mon Sep 17 00:00:00 2001 From: David Chen Date: Sat, 11 Apr 2026 17:43:41 -0700 Subject: [PATCH 4/6] add changeset --- .changeset/add_dynacast_support.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/add_dynacast_support.md diff --git a/.changeset/add_dynacast_support.md b/.changeset/add_dynacast_support.md new file mode 100644 index 000000000..446d5e87d --- /dev/null +++ b/.changeset/add_dynacast_support.md @@ -0,0 +1,7 @@ +--- +libwebrtc: patch +livekit: patch +livekit-ffi: patch +--- + +Add dynacast support - #1003 (@chenosaurus) From 825881cf3f03f2ca304d48af75c81e55dacd1e14 Mon Sep 17 00:00:00 2001 From: David Chen Date: Sat, 11 Apr 2026 21:54:41 -0700 Subject: [PATCH 5/6] fix log --- livekit/src/room/participant/remote_participant.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index bf6a2febd..80d8efe59 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -456,21 +456,21 @@ impl RemoteParticipant { let rtc_engine = rtc_engine.clone(); livekit_runtime::spawn(async move { let tsid: String = publication.sid().into(); - let proto_quality = match quality { + let quality = match quality { VideoQuality::Low => proto::VideoQuality::Low, VideoQuality::Medium => proto::VideoQuality::Medium, VideoQuality::High => proto::VideoQuality::High, - }; + }.into(); let update_track_settings = proto::UpdateTrackSettings { track_sids: vec![tsid.clone()], - quality: proto_quality.into(), + quality, ..Default::default() }; log::info!( "subscriber: sending UpdateTrackSettings to SFU: track={}, quality={:?}", tsid, - proto_quality, + proto::VideoQuality::try_from(quality), ); rtc_engine .send_request(proto::signal_request::Message::TrackSetting( From a0e25a0b2b727631ed67f111bfa33596fb7485ab Mon Sep 17 00:00:00 2001 From: David Chen Date: Sat, 11 Apr 2026 21:55:27 -0700 Subject: [PATCH 6/6] cleanup --- livekit/src/room/participant/remote_participant.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index 80d8efe59..4ee712d0d 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -460,7 +460,8 @@ impl RemoteParticipant { VideoQuality::Low => proto::VideoQuality::Low, VideoQuality::Medium => proto::VideoQuality::Medium, VideoQuality::High => proto::VideoQuality::High, - }.into(); + } + .into(); let update_track_settings = proto::UpdateTrackSettings { track_sids: vec![tsid.clone()], quality,