Conversation
ChangesetThe following package versions will be affected by this PR:
|
| /// | ||
| /// If a signal response type not listed above is provided, the result is an error. | ||
| /// | ||
| pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { |
There was a problem hiding this comment.
I think we should take a closer look at this from the perspective of signaling migration (or "hybrid" signaling if we decide to move some features earlier e.g. data streams, RPC or any other small piece) cc @reenboog
Let's assume that both client language and rust can handle messages, there are 2 general approaches:
- strict chain of responsibility so
if !rust_handled(signal) { other_handled(signal) }- the main problem with this approach is there's no central place in rust to decide that the message was indeed handled, so hypothetically we'd need to pass it to each data track manager, etc. (and build another chain inside rust) - dispatch - client deserializes and knows what to do, passes
rust_handle_specific_messagedown
What I wanna avoid is we re-wrap messages that we already deserialize e.g. in swift (and know what to do with them) just to pass inside this handle_signal_response(&self, res: &[u8]), this should be more specific like fn handle_publish_response etc.
The same may apply to the "receiving" side.
This problem won't exist as we move the signaling to rust ofc, but let's make sure we can do that smoothly.
There was a problem hiding this comment.
Sorry there's also a 3rd way - passing everything at data (raw proto) level to both client and rust, not carrying about overlaps at all, let's call it message bus.
There was a problem hiding this comment.
Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.
There was a problem hiding this comment.
For now, at least for swift - we must deserialize anyway, as the "handlers" are spread across different layers - this problem will disappear/move as we move the signaling to rust.
pblazej
left a comment
There was a problem hiding this comment.
I don't see too much glue code for Swift now, it's definitely doable; I'm mostly worried about the "intermediate" layer so some awkward signaling patterns/states etc.
| } | ||
|
|
||
| /// Try pushing a frame to subscribers of the track. | ||
| pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameErrorReason> { |
There was a problem hiding this comment.
This and other "throwing" methods: separate error type can't be mapped directly to LiveKitError, so we need to "rethrow" in Swift (probably others as well), which is yet another typical problem for such wrappers.
On the other hand, this whole pattern of 1 global error type (+code) is outdated, and we've got counterexamples like StreamError already in the public API.
I'd rather use typed throws for Swift here directly (even if inconsistent with the rest of the SDK) or leave it as-is.
There was a problem hiding this comment.
I don't think UniFFI supports generating Swift functions with typed throws yet, but we will get that for free once it does.
There was a problem hiding this comment.
Yes, this "problem" may exist in other langs like Kotlin if we use "centralized error type"
| } | ||
|
|
||
| /// Unpublishes the track. | ||
| pub fn unpublish(&self) { |
There was a problem hiding this comment.
I like the new APIs a lot, the only asymmetry for me is that you cannot surface the fact that unpublished track is "invalid" just via language constructs ("consuming", scopes, etc.).
I'm thinking of sth like:
public func withDataTrack<T>(
name: String,
body: (LocalDataTrack) async throws -> T
) async throws -> T {
let track = try await publishDataTrack(name: name)
defer { track.unpublish() }
return try await body(track)
}So you can pipe streams e.g.
// Instead of handing back an object that can go stale
try await localParticipant.withDataTrack(name: "game-state") { track in
// track is guaranteed valid here
for await state in gameStates {
track.tryPush(frame: state.toFrame())
}
}
// automatically unpublished when scope exits (cancellation, throw, or return)Or even:
// Publishing: AsyncSequence → DataTrack (sink)
try await localParticipant.publishDataTrack(name: "game", sending:
gameLoop.map { $0.toFrame() }
)
// Subscribing: DataTrack → AsyncSequence (source)
for await frame in stream {
process(frame)
}It's purely additive, so take it with a grain of 🧂 but IMO provides a great DX on top of data tracks if you stick to certain stream-like constructs.
There was a problem hiding this comment.
Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.
There was a problem hiding this comment.
rather than in the uniffi definition
yes, I'm talking purely about wrapping here.
There was a problem hiding this comment.
Something to be aware of is the client doesn't have sole control over the publish state of a data track (the SFU reserves the right to unpublish at any time without the client initiating it) which is why I didn't do something similar in Rust (e.g., modeling with type state).
There was a problem hiding this comment.
Ah so if someone "unpublished it for me" it will just throw and break the loop?
| #[uniffi::export] | ||
| impl DataTrackStream { | ||
| /// Returns the next received frame or `None` if the subscription has ended. | ||
| pub async fn next(&self) -> Option<DataTrackFrame> { |
There was a problem hiding this comment.
This could be mapped in a generic way; the impl is trivial in Swift, basically using AsyncStream(unfolding: next), as we do for logs.
|
|
||
| use bytes::Bytes; | ||
|
|
||
| uniffi::custom_type!(Bytes, Vec<u8>, { remote }); |
There was a problem hiding this comment.
General: do we plan to introduce some helpers for encoding/decoding into that or leaving that to experienced users?
There was a problem hiding this comment.
This macro handles the conversion from Bytes to Vec<u8> automatically (under the hood it implements impl uniffi::FfiConverter<crate::UniFfiTag> for Bytes), so Swift can just pass Data as expected wherever the API on the Rust side accepts Bytes.
There was a problem hiding this comment.
Yeah, I just mean "something higher-level than Data" like Encodable types (JSON etc.)
| pub(crate) payload: Bytes, | ||
| pub(crate) user_timestamp: Option<u64>, | ||
| pub payload: Bytes, | ||
| pub user_timestamp: Option<u64>, |
There was a problem hiding this comment.
Nit: shall we expose duration_since_timestamp e.g. for benchmarks?
There was a problem hiding this comment.
If this is not an uniffi::Object we won't get withTimestampNow etc. for free, right? As it basically maps to raw struct:
public struct DataTrackFrame: Equatable, Hashable, Sendable {
public var payload: Data
public var userTimestamp: UInt64?
}There was a problem hiding this comment.
Applies to other "DTOs" as well, so good to discuss that now.
There was a problem hiding this comment.
It seems this type of "DTO" is more naturally modeled as a value type on the Swift side. I don't think you can currently have associated functions on a uniffi::Record (even though this makes sense in a Swift context), so I see two options:
- Use
uniffi::Object - Export standalone helper functions (e.g.,
fn with_user_timestamp(frame: DataTrackFrame) → DataTrackFrame) and define anextension DataTrackFrameon the Swift side to make it an associated function
There was a problem hiding this comment.
Good news: UniFFI v0.31.0 added support for methods on records and enums!
There was a problem hiding this comment.
Yup let's avoid handwritten extensions and just try bumping UniFFI.
There was a problem hiding this comment.
Edit: tried that locally, and it does not work for #[uniffi::remote(Record)] e.g. DataTrackFrame
| /// Returns the next received frame or `None` if the subscription has ended. | ||
| pub async fn next(&self) -> Option<DataTrackFrame> { | ||
| // TODO: avoid mutex? | ||
| self.0.lock().await.next().await |
pblazej
left a comment
There was a problem hiding this comment.
I still don't see any real blockers here, would be nice to discuss some common cases like errors and records now.
|
Hm, I see one more gap in the runtime after introducing 90cf95a UniFFI polls async futures from Swift's thread via continuation callbacks. That thread has no tokio runtime context. When the future hits The naive claude solution was: pub(crate) fn contextualize<F: std::future::Future>(
future: F,
) -> impl std::future::Future<Output = F::Output> {
use std::pin::Pin;
use std::task::{Context, Poll};
struct Contextualized<F> {
inner: Pin<Box<F>>,
}
impl<F: std::future::Future> std::future::Future for Contextualized<F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _enter = runtime().enter();
self.inner.as_mut().poll(cx)
}
}
Contextualized { inner: Box::pin(future) }
}then e.g. crate::runtime::contextualize(self.input.publish_track(options))
.await
.map(LocalDataTrack) |
|
It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this: // a cancelable version could be introduced as well
pub async fn on_runtime<F, Res>(future: F) -> Res
where
F: Future<Output = Res> + Send + 'static,
Res: Send + 'static,
{
get_runtime()
.spawn(future)
.await
.expect("A background Tokio task panicked")
}
...
// when exporting, wrap with on_runtime
#[uniffi::export]
pub async fn ffi_do_work() -> Result<MyFfiType, FfiErr> {
on_runtime(async {
let res = core::do_work(params).await?;
Ok(res)
})
.await
}I already started drafting this in my work related to signaling. |
| #[uniffi(flat_error)] | ||
| pub enum HandleSignalResponseError { | ||
| #[error("Response decoding failed: {0}")] | ||
| Decode(prost::DecodeError), |
There was a problem hiding this comment.
thought: It is worth making a unique error type here rather than using prost::DecodeError so that the internal protobuf implementation type doesn't leak through the interface?
There was a problem hiding this comment.
Since this uses the #[uniffi(flat_error)] macro (docs), the associated value for each case gets converted to a string via display for the purposes of crossing the FFI boundary so none of the inner error types are exposed—but we still get the enum cases.
| } | ||
|
|
||
| /// Adapts [`DataTrackEncryptionProvider`] to implement [`EncryptionProvider`]. | ||
| pub(super) struct FfiEncryptionProvider(pub(super) Arc<dyn DataTrackEncryptionProvider>); |
There was a problem hiding this comment.
thought: Do you need a uniffi attr macro of some sort here and on FfiDecryptionProvider?
There was a problem hiding this comment.
No, since this type never crosses the FFI boundary. The current approach for exporting the E2EE traits is a bit awkward; I had to introduce FFI-only wrapper traits since nothing like #[uniffi::remote(Trait)] exists. However, once we move the UniFFI macros to the livekit-datatrack crate, this should no longer be necessary—#[uniffi::export(with_foreign)] can be applied directly to the trait definitions then.
| } | ||
|
|
||
| /// Unpublishes the track. | ||
| pub fn unpublish(&self) { |
There was a problem hiding this comment.
Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.
| // TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the | ||
| // need for this additional task. |
There was a problem hiding this comment.
(Just a note that the web implementation has quite a bit of of logic dealing with this, so if nothing else there's a lot of good test cases to pull in)
There was a problem hiding this comment.
I think this will map nicely; CancellationToken is roughly equivalent to AbortSignal on web.
| /// | ||
| /// If a signal response type not listed above is provided, the result is an error. | ||
| /// | ||
| pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> { |
There was a problem hiding this comment.
Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.
| .manager | ||
| .encrypt_data(payload.into(), &self.sender_identity, key_index) | ||
| .map_err(|_| dt::EncryptionError)?; | ||
| .map_err(|_| dt::EncryptionError::Failed)?; |
There was a problem hiding this comment.
question: Is this a breaking api change to existing rust sdk users?
There was a problem hiding this comment.
No, since this type is never exposed in the public API in the livekit crate.
@reenboog, is this something you would want to add in the livekit_runtime crate? |
|
Summary of changes:
livekit-uniffiResolves CLT-2472