Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 55 additions & 23 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl SignalClient {
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
add_track_requests: Vec<proto::AddTrackRequest>,
) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> {
let handle_success = |inner: Arc<SignalInner>, join_response, stream_events| {
let (emitter, events) = mpsc::unbounded_channel();
Expand All @@ -169,7 +170,15 @@ impl SignalClient {
(Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events)
};

match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()).await {
match SignalInner::connect(
url,
token,
options.clone(),
publisher_offer.clone(),
add_track_requests.clone(),
)
.await
{
Ok((inner, join_response, stream_events)) => {
Ok(handle_success(inner, join_response, stream_events))
}
Expand All @@ -183,8 +192,14 @@ impl SignalClient {

for url in urls.iter() {
log::info!("fallback connection to: {}", url);
match SignalInner::connect(url, token, options.clone(), publisher_offer.clone())
.await
match SignalInner::connect(
url,
token,
options.clone(),
publisher_offer.clone(),
add_track_requests.clone(),
)
.await
{
Ok((inner, join_response, stream_events)) => {
return Ok(handle_success(inner, join_response, stream_events))
Expand Down Expand Up @@ -269,6 +284,7 @@ impl SignalInner {
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
add_track_requests: Vec<proto::AddTrackRequest>,
) -> SignalResult<(
Arc<Self>,
proto::JoinResponse,
Expand All @@ -277,8 +293,16 @@ impl SignalInner {
// Try v1 path first if single_peer_connection is enabled
let use_v1_path = options.single_peer_connection;
// For initial connection: reconnect=false, reconnect_reason=None, participant_sid=""
let lk_url =
get_livekit_url(url, &options, use_v1_path, false, None, "", publisher_offer.as_ref())?;
let lk_url = get_livekit_url(
url,
&options,
use_v1_path,
false,
None,
"",
publisher_offer.as_ref(),
&add_track_requests,
)?;
// Try to connect to the SignalClient
let (stream, mut events, single_pc_mode_active) =
match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await {
Expand Down Expand Up @@ -309,7 +333,7 @@ impl SignalInner {

if use_v1_path && is_not_found {
let lk_url_v0 =
get_livekit_url(url, &options, false, false, None, "", None)?;
get_livekit_url(url, &options, false, false, None, "", None, &[])?;
log::warn!("v1 path not found (404), falling back to v0 path");
match SignalStream::connect(
lk_url_v0.clone(),
Expand Down Expand Up @@ -414,6 +438,7 @@ impl SignalInner {
None,
sid,
None,
&[],
)
.unwrap();

Expand Down Expand Up @@ -574,6 +599,7 @@ fn create_join_request_param(
os_version: String,
device_model: String,
publisher_offer: Option<&proto::SessionDescription>,
add_track_requests: &[proto::AddTrackRequest],
) -> String {
let connection_settings = proto::ConnectionSettings {
auto_subscribe: options.auto_subscribe,
Expand All @@ -596,6 +622,7 @@ fn create_join_request_param(
connection_settings: Some(connection_settings),
reconnect,
publisher_offer: publisher_offer.cloned(),
add_track_requests: add_track_requests.to_vec(),
..Default::default()
};

Expand All @@ -612,21 +639,22 @@ fn create_join_request_param(
// Serialize JoinRequest to bytes
let join_request_bytes = join_request.encode_to_vec();

// Use gzip compression when publisher offer is included (SDP makes payload large)
let (compressed_bytes, compression) = if publisher_offer.is_some() {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
if encoder.write_all(&join_request_bytes).is_ok() {
if let Ok(compressed) = encoder.finish() {
(compressed, proto::wrapped_join_request::Compression::Gzip as i32)
// Use gzip compression when publisher offer or add_track_requests are included
let (compressed_bytes, compression) =
if publisher_offer.is_some() || !add_track_requests.is_empty() {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
if encoder.write_all(&join_request_bytes).is_ok() {
if let Ok(compressed) = encoder.finish() {
(compressed, proto::wrapped_join_request::Compression::Gzip as i32)
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
};
};

let wrapped_join_request =
proto::WrappedJoinRequest { join_request: compressed_bytes, compression };
Expand Down Expand Up @@ -654,6 +682,7 @@ fn get_livekit_url(
reconnect_reason: Option<i32>,
participant_sid: &str,
publisher_offer: Option<&proto::SessionDescription>,
add_track_requests: &[proto::AddTrackRequest],
) -> SignalResult<url::Url> {
let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?;

Expand Down Expand Up @@ -692,6 +721,7 @@ fn get_livekit_url(
os_info.version().to_string(),
device_model.to_string(),
publisher_offer,
add_track_requests,
);
lk_url.query_pairs_mut().append_pair("join_request", &join_request_param);
} else {
Expand Down Expand Up @@ -796,39 +826,41 @@ mod tests {
fn livekit_url_test() {
let io = SignalOptions::default();

assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None).is_err());
assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None, &[]).is_err());
assert_eq!(
get_livekit_url("https://localhost:7880", &io, false, false, None, "", None)
get_livekit_url("https://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("http://localhost:7880", &io, false, false, None, "", None)
get_livekit_url("http://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"ws"
);
assert_eq!(
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None)
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None)
get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"ws"
);
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err());
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None, &[])
.is_err());
}

#[test]
fn validate_url_test() {
let io = SignalOptions::default();
let lk_url =
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap();
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap();
let validate_url = get_validate_url(lk_url);

// Should be /rtc/validate, not /rtc/rtc/validate
Expand Down
1 change: 1 addition & 0 deletions livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use crate::{
PushFrameError, PushFrameErrorReason, RemoteDataTrack,
},
id::*,
options::TrackPublishOptions,
participant::{
ConnectionQuality, DisconnectReason, LocalParticipant, Participant, PerformRpcData,
RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData,
Expand Down
114 changes: 111 additions & 3 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::e2ee::EncryptionType;
use bmrng::unbounded::UnboundedRequestReceiver;
use futures_util::{Stream, StreamExt};
use libwebrtc::prelude::RtpEncodingParameters;
use libwebrtc::{
native::frame_cryptor::EncryptionState,
prelude::{
Expand All @@ -31,6 +33,7 @@ use livekit_datatrack::{
use livekit_protocol::observer::Dispatcher;
use livekit_protocol::{self as proto, encryption};
use livekit_runtime::JoinHandle;
use options::TrackPublishOptions;
use parking_lot::RwLock;
pub use proto::DisconnectReason;
use proto::{promise::Promise, SignalTarget};
Expand All @@ -41,6 +44,7 @@ use tokio::sync::{
mpsc::{self, UnboundedReceiver},
oneshot, Mutex as AsyncMutex,
};
use track::LocalTrack;
pub use utils::take_cell::TakeCell;

pub use self::{
Expand All @@ -55,8 +59,8 @@ use crate::{
prelude::*,
registered_audio_filter_plugins,
rtc_engine::{
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, PrePublishTrack,
RtcEngine, SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
},
};

Expand Down Expand Up @@ -394,6 +398,8 @@ pub struct RoomOptions {
pub single_peer_connection: bool,
/// Timeout for each individual signal connection attempt
pub connect_timeout: Duration,
/// Tracks to publish immediately upon joining. Only effective when `single_peer_connection` is true.
pub publish_tracks: Vec<(LocalTrack, TrackPublishOptions)>,
}

impl Default for RoomOptions {
Expand All @@ -416,6 +422,7 @@ impl Default for RoomOptions {
sdk_options: RoomSdkOptions::default(),
single_peer_connection: false,
connect_timeout: SIGNAL_CONNECT_TIMEOUT,
publish_tracks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -519,14 +526,30 @@ impl Room {
signal_options.adaptive_stream = options.adaptive_stream;
signal_options.single_peer_connection = options.single_peer_connection;
signal_options.connect_timeout = options.connect_timeout;
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(

if !options.publish_tracks.is_empty() && !options.single_peer_connection {
return Err(RoomError::Internal(
"publish_tracks requires single_peer_connection to be enabled".into(),
));
}
let encryption_type = e2ee_manager.encryption_type();
let pre_publish_tracks: Vec<PrePublishTrack> = options
.publish_tracks
.iter()
.map(|(track, opts)| {
Self::build_pre_publish_track(track.clone(), opts.clone(), encryption_type)
})
.collect();

let (rtc_engine, join_response, engine_events, pre_publish_receivers) = RtcEngine::connect(
url,
token,
EngineOptions {
rtc_config: options.rtc_config.clone(),
signal_options,
join_retries: options.join_retries,
single_peer_connection: options.single_peer_connection,
publish_tracks: pre_publish_tracks.clone(),
},
Some(e2ee_manager.clone()),
)
Expand Down Expand Up @@ -788,9 +811,94 @@ impl Room {
};
inner.handle.lock().await.replace(handle);

let mut receiver_map: HashMap<String, oneshot::Receiver<proto::TrackInfo>> =
pre_publish_receivers.into_iter().collect();
for pt in pre_publish_tracks {
let Some(rx) = receiver_map.remove(&pt.request.cid) else {
log::warn!("no receiver for pre-published track {}", pt.track.name());
continue;
};
match rtc_engine.wait_track_published_by_cid(pt.request.cid.clone(), rx).await {
Ok(track_info) => {
let publication =
LocalTrackPublication::new(track_info.clone(), pt.track.clone());
pt.track.update_info(track_info);
publication.set_track(Some(pt.track.clone().into()));
publication.update_publish_options(pt.options);
inner.local_participant.add_publication(TrackPublication::Local(publication));
pt.track.enable();
log::debug!("pre-published track completed: {}", pt.track.name());
}
Err(err) => {
log::warn!(
"failed to complete pre-published track {}: {:?}",
pt.track.name(),
err
);
}
}
}

Ok((Self { inner }, events))
}

fn build_pre_publish_track(
track: LocalTrack,
opts: TrackPublishOptions,
encryption_type: EncryptionType,
) -> PrePublishTrack {
let disable_red = encryption_type != EncryptionType::None || !opts.red;

let mut req = proto::AddTrackRequest {
cid: track.rtc_track().id(),
name: track.name(),
r#type: proto::TrackType::from(track.kind()) as i32,
muted: track.is_muted(),
source: proto::TrackSource::from(opts.source) as i32,
disable_dtx: !opts.dtx,
disable_red,
encryption: proto::encryption::Type::from(encryption_type) as i32,
stream: opts.stream.clone(),
..Default::default()
};

if opts.preconnect_buffer {
req.audio_features.push(proto::AudioTrackFeature::TfPreconnectBuffer as i32);
}

let encodings = match &track {
LocalTrack::Video(video_track) => {
let resolution = video_track.rtc_source().video_resolution();
req.width = resolution.width;
req.height = resolution.height;

let encodings = options::compute_video_encodings(req.width, req.height, &opts);
req.layers =
options::video_layers_from_encodings(req.width, req.height, &encodings);

if opts.simulcast && encodings.len() > 1 {
req.simulcast_codecs = vec![proto::SimulcastCodec {
codec: opts.video_codec.as_str().to_string(),
cid: track.rtc_track().id(),
layers: req.layers.clone(),
..Default::default()
}];
}
encodings
}
LocalTrack::Audio(_) => {
let audio_encoding =
opts.audio_encoding.as_ref().unwrap_or(&options::audio::MUSIC.encoding);
vec![RtpEncodingParameters {
max_bitrate: Some(audio_encoding.max_bitrate),
..Default::default()
}]
}
};

PrePublishTrack { track, options: opts, encodings, request: req }
}

pub async fn close(&self) -> RoomResult<()> {
self.inner.close(DisconnectReason::ClientInitiated).await
}
Expand Down
Loading