Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-api: patch
livekit-ffi: patch
---

Support for large RPC messages using data streams - #1013 (@1egoman)
12 changes: 12 additions & 0 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ const REGION_FETCH_TIMEOUT: Duration = Duration::from_secs(3);
const VALIDATE_TIMEOUT: Duration = Duration::from_secs(3);
pub const PROTOCOL_VERSION: u32 = 17;

/// Default value for `ClientInfo.client_protocol` when a participant has not
/// advertised one (treat as v1-only / no data-stream RPC support).
pub const CLIENT_PROTOCOL_DEFAULT: i32 = 0;
/// `ClientInfo.client_protocol` value indicating support for RPC v2 over data streams.
pub const CLIENT_PROTOCOL_DATA_STREAM_RPC: i32 = 1;

/// The client protocol which is sent to other clients and indicates the set of apis that other
/// clients should assume this client supports.
const CLIENT_PROTOCOL_VERSION: i32 = CLIENT_PROTOCOL_DATA_STREAM_RPC;

#[derive(Error, Debug)]
pub enum SignalError {
#[error("ws failure: {0}")]
Expand Down Expand Up @@ -571,6 +581,7 @@ fn create_join_request_param(
os,
os_version,
device_model,
client_protocol: CLIENT_PROTOCOL_VERSION,
..Default::default()
};

Expand Down Expand Up @@ -667,6 +678,7 @@ fn get_livekit_url(
.append_pair("os_version", os_info.version().to_string().as_str())
.append_pair("device_model", device_model.to_string().as_str())
.append_pair("protocol", PROTOCOL_VERSION.to_string().as_str())
.append_pair("client_protocol", CLIENT_PROTOCOL_VERSION.to_string().as_str())
.append_pair("auto_subscribe", if options.auto_subscribe { "1" } else { "0" })
.append_pair("adaptive_stream", if options.adaptive_stream { "1" } else { "0" });

Expand Down
11 changes: 11 additions & 0 deletions livekit/src/room/data_stream/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ impl Stream for ByteStreamReader {
}
}

#[cfg(test)]
impl TextStreamReader {
/// Create a TextStreamReader for testing purposes.
pub(crate) fn new_for_test(
info: TextStreamInfo,
chunk_rx: UnboundedReceiver<StreamResult<Bytes>>,
) -> Self {
Self { info, chunk_rx }
}
}

impl StreamReader for TextStreamReader {
type Output = String;
type Info = TextStreamInfo;
Expand Down
117 changes: 75 additions & 42 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use libwebrtc::{
rtp_transceiver::RtpTransceiver,
RtcError,
};
use livekit_api::signal_client::{SignalOptions, SignalSdkOptions, SIGNAL_CONNECT_TIMEOUT};
use livekit_api::signal_client::{
SignalOptions, SignalSdkOptions, CLIENT_PROTOCOL_DATA_STREAM_RPC, CLIENT_PROTOCOL_DEFAULT,
SIGNAL_CONNECT_TIMEOUT,
};
use livekit_datatrack::{
api::{DataTrackSid, RemoteDataTrack},
backend as dt,
Expand Down Expand Up @@ -67,6 +70,7 @@ pub mod id;
pub mod options;
pub mod participant;
pub mod publication;
pub mod rpc;
pub mod track;
pub(crate) mod utils;

Expand Down Expand Up @@ -329,30 +333,6 @@ pub struct ChatMessage {
pub generated: Option<bool>,
}

#[derive(Debug, Clone)]
pub struct RpcRequest {
pub destination_identity: String,
pub id: String,
pub method: String,
pub payload: String,
pub response_timeout: Duration,
pub version: u32,
}

#[derive(Debug, Clone)]
pub struct RpcResponse {
destination_identity: String,
request_id: String,
payload: Option<String>,
error: Option<proto::RpcError>,
}

#[derive(Debug, Clone)]
pub struct RpcAck {
destination_identity: String,
request_id: String,
}

#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct RoomSdkOptions {
Expand Down Expand Up @@ -473,9 +453,11 @@ pub(crate) struct RoomSession {
remote_participants: RwLock<HashMap<ParticipantIdentity, RemoteParticipant>>,
e2ee_manager: E2eeManager,
incoming_stream_manager: IncomingStreamManager,
outgoing_stream_manager: OutgoingStreamManager,
pub(crate) outgoing_stream_manager: OutgoingStreamManager,
local_dt_input: dt::local::ManagerInput,
remote_dt_input: dt::remote::ManagerInput,
pub(crate) rpc_client: rpc::RpcClientManager,
pub(crate) rpc_server: rpc::RpcServerManager,
handle: AsyncMutex<Option<Handle>>,
}

Expand Down Expand Up @@ -554,6 +536,7 @@ impl Room {
pi.joined_at_ms,
e2ee_manager.encryption_type(),
pi.permission,
pi.client_protocol,
);

let dispatcher = Dispatcher::<RoomEvent>::default();
Expand Down Expand Up @@ -688,6 +671,8 @@ impl Room {
outgoing_stream_manager,
local_dt_input,
remote_dt_input,
rpc_client: rpc::RpcClientManager::new(),
rpc_server: rpc::RpcServerManager::new(),
handle: Default::default(),
});
inner.local_participant.set_session(Arc::downgrade(&inner));
Expand Down Expand Up @@ -733,6 +718,7 @@ impl Room {
pi.attributes,
pi.joined_at_ms,
pi.permission,
pi.client_protocol,
)
};
participant.update_info(pi.clone());
Expand All @@ -757,6 +743,7 @@ impl Room {
open_rx,
dispatcher.clone(),
close_rx.resubscribe(),
inner.clone(),
));
let outgoing_stream_handle = livekit_runtime::spawn(outgoing_data_stream_task(
packet_rx,
Expand Down Expand Up @@ -985,25 +972,31 @@ impl RoomSession {
log::warn!("Received RPC request with null caller identity");
return Ok(());
}
let local_participant = self.local_participant.clone();
let session = self.clone();
let caller = caller_identity.unwrap();
livekit_runtime::spawn(async move {
local_participant
.handle_incoming_rpc_request(
caller_identity.unwrap(),
request_id,
method,
payload,
response_timeout,
version,
let transport = rpc::SessionTransport(session.clone());
session
.rpc_server
.handle_request(
rpc::HandleRequestOptions {
caller_identity: caller,
request_id,
method,
payload,
response_timeout,
version,
},
&transport,
)
.await;
});
}
EngineEvent::RpcResponse { request_id, payload, error } => {
self.local_participant.handle_incoming_rpc_response(request_id, payload, error);
self.rpc_client.handle_response(request_id, payload, error);
}
EngineEvent::RpcAck { request_id } => {
self.local_participant.handle_incoming_rpc_ack(request_id);
self.rpc_client.handle_ack(request_id);
}
EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers),
EngineEvent::ConnectionQuality { updates } => {
Expand Down Expand Up @@ -1143,6 +1136,7 @@ impl RoomSession {
pi.attributes,
pi.joined_at_ms,
pi.permission,
pi.client_protocol,
)
};

Expand Down Expand Up @@ -1828,6 +1822,7 @@ impl RoomSession {
attributes: HashMap<String, String>,
joined_at: i64,
permission: Option<proto::ParticipantPermission>,
client_protocol: i32,
) -> RemoteParticipant {
let participant = RemoteParticipant::new(
self.rtc_engine.clone(),
Expand All @@ -1842,6 +1837,7 @@ impl RoomSession {
joined_at,
self.options.auto_subscribe,
permission,
client_protocol,
);

participant.on_track_published({
Expand Down Expand Up @@ -1984,6 +1980,14 @@ impl RoomSession {
self.remote_participants.read().get(identity).cloned()
}

pub(crate) fn get_remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 {
self.remote_participants
.read()
.get(identity)
.map(|p| p.client_protocol())
.unwrap_or(CLIENT_PROTOCOL_DEFAULT)
}

fn get_local_or_remote_participant(
&self,
identity: &ParticipantIdentity,
Expand Down Expand Up @@ -2053,10 +2057,14 @@ impl RoomSession {
}

/// Receives stream readers for newly-opened streams and dispatches room events.
///
/// Intercepts text streams on RPC topics (`lk.rpc_request`, `lk.rpc_response`)
/// and routes them to the RPC managers instead of emitting them as room events.
async fn incoming_data_stream_task(
mut open_rx: UnboundedReceiver<(AnyStreamReader, String)>,
dispatcher: Dispatcher<RoomEvent>,
mut close_rx: broadcast::Receiver<()>,
session: Arc<RoomSession>,
) {
loop {
tokio::select! {
Expand All @@ -2067,11 +2075,36 @@ async fn incoming_data_stream_task(
reader: TakeCell::new(reader),
participant_identity: ParticipantIdentity(identity)
}),
AnyStreamReader::Text(reader) => dispatcher.dispatch(&RoomEvent::TextStreamOpened {
topic: reader.info().topic.clone(),
reader: TakeCell::new(reader),
participant_identity: ParticipantIdentity(identity)
}),
AnyStreamReader::Text(reader) => {
let topic = reader.info().topic.clone();
match topic.as_str() {
rpc::RPC_REQUEST_TOPIC => {
let caller_identity = ParticipantIdentity(identity);
let session = session.clone();
livekit_runtime::spawn(async move {
let transport = rpc::SessionTransport(session.clone());
session.rpc_server.handle_request_stream(
reader,
caller_identity,
&transport,
).await;
});
}
rpc::RPC_RESPONSE_TOPIC => {
let session = session.clone();
livekit_runtime::spawn(async move {
session.rpc_client.handle_response_stream(reader).await;
});
}
_ => {
dispatcher.dispatch(&RoomEvent::TextStreamOpened {
topic,
reader: TakeCell::new(reader),
participant_identity: ParticipantIdentity(identity)
});
}
}
}
}
},
_ = close_rx.recv() => {
Expand Down
Loading
Loading