Skip to content

Data tracks UniFFI#1034

Draft
ladvoc wants to merge 46 commits intomainfrom
ladvoc/data-tracks-uniffi
Draft

Data tracks UniFFI#1034
ladvoc wants to merge 46 commits intomainfrom
ladvoc/data-tracks-uniffi

Conversation

@ladvoc
Copy link
Copy Markdown
Contributor

@ladvoc ladvoc commented Apr 22, 2026

Summary of changes:

  • Exposes data tracks core functionality through livekit-uniffi
    • This will eventually enable the following clients to share the Rust implementation: Swift, Kotlin, React Native, Flutter
  • Minor changes to the data tracks crate to support this

Resolves CLT-2472

@github-actions
Copy link
Copy Markdown
Contributor

Changeset

The following package versions will be affected by this PR:

Package Bump
livekit patch
livekit-datatrack patch
livekit-ffi patch
livekit-uniffi minor

@ladvoc ladvoc requested review from 1egoman and pblazej April 22, 2026 09:12
Comment thread livekit-uniffi/src/data_track/local.rs Outdated
///
/// If a signal response type not listed above is provided, the result is an error.
///
pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should take a closer look at this from the perspective of signaling migration (or "hybrid" signaling if we decide to move some features earlier e.g. data streams, RPC or any other small piece) cc @reenboog

Let's assume that both client language and rust can handle messages, there are 2 general approaches:

  • strict chain of responsibility so if !rust_handled(signal) { other_handled(signal) } - the main problem with this approach is there's no central place in rust to decide that the message was indeed handled, so hypothetically we'd need to pass it to each data track manager, etc. (and build another chain inside rust)
  • dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down

What I wanna avoid is we re-wrap messages that we already deserialize e.g. in swift (and know what to do with them) just to pass inside this handle_signal_response(&self, res: &[u8]), this should be more specific like fn handle_publish_response etc.

The same may apply to the "receiving" side.

This problem won't exist as we move the signaling to rust ofc, but let's make sure we can do that smoothly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry there's also a 3rd way - passing everything at data (raw proto) level to both client and rust, not carrying about overlaps at all, let's call it message bus.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, at least for swift - we must deserialize anyway, as the "handlers" are spread across different layers - this problem will disappear/move as we move the signaling to rust.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to option 2 in b449306.

@pblazej pblazej requested a review from reenboog April 22, 2026 15:01
Copy link
Copy Markdown
Contributor

@pblazej pblazej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see too much glue code for Swift now, it's definitely doable; I'm mostly worried about the "intermediate" layer so some awkward signaling patterns/states etc.

}

/// Try pushing a frame to subscribers of the track.
pub fn try_push(&self, frame: DataTrackFrame) -> Result<(), PushFrameErrorReason> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other "throwing" methods: separate error type can't be mapped directly to LiveKitError, so we need to "rethrow" in Swift (probably others as well), which is yet another typical problem for such wrappers.

On the other hand, this whole pattern of 1 global error type (+code) is outdated, and we've got counterexamples like StreamError already in the public API.

I'd rather use typed throws for Swift here directly (even if inconsistent with the rest of the SDK) or leave it as-is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think UniFFI supports generating Swift functions with typed throws yet, but we will get that for free once it does.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this "problem" may exist in other langs like Kotlin if we use "centralized error type"

}

/// Unpublishes the track.
pub fn unpublish(&self) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new APIs a lot, the only asymmetry for me is that you cannot surface the fact that unpublished track is "invalid" just via language constructs ("consuming", scopes, etc.).

I'm thinking of sth like:

  public func withDataTrack<T>(
      name: String,
      body: (LocalDataTrack) async throws -> T
  ) async throws -> T {
      let track = try await publishDataTrack(name: name)
      defer { track.unpublish() }
      return try await body(track)
  }

So you can pipe streams e.g.

  // Instead of handing back an object that can go stale
  try await localParticipant.withDataTrack(name: "game-state") { track in
      // track is guaranteed valid here
      for await state in gameStates {
          track.tryPush(frame: state.toFrame())
      }
  }
  // automatically unpublished when scope exits (cancellation, throw, or return)

Or even:

  // Publishing: AsyncSequence → DataTrack (sink)
  try await localParticipant.publishDataTrack(name: "game", sending:
      gameLoop.map { $0.toFrame() }
  )

  // Subscribing: DataTrack → AsyncSequence (source)
  for await frame in stream {
      process(frame)
  }

It's purely additive, so take it with a grain of 🧂 but IMO provides a great DX on top of data tracks if you stick to certain stream-like constructs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than in the uniffi definition

yes, I'm talking purely about wrapping here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to be aware of is the client doesn't have sole control over the publish state of a data track (the SFU reserves the right to unpublish at any time without the client initiating it) which is why I didn't do something similar in Rust (e.g., modeling with type state).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so if someone "unpublished it for me" it will just throw and break the loop?

#[uniffi::export]
impl DataTrackStream {
/// Returns the next received frame or `None` if the subscription has ended.
pub async fn next(&self) -> Option<DataTrackFrame> {
Copy link
Copy Markdown
Contributor

@pblazej pblazej Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be mapped in a generic way; the impl is trivial in Swift, basically using AsyncStream(unfolding: next), as we do for logs.


use bytes::Bytes;

uniffi::custom_type!(Bytes, Vec<u8>, { remote });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General: do we plan to introduce some helpers for encoding/decoding into that or leaving that to experienced users?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro handles the conversion from Bytes to Vec<u8> automatically (under the hood it implements impl uniffi::FfiConverter<crate::UniFfiTag> for Bytes), so Swift can just pass Data as expected wherever the API on the Rust side accepts Bytes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just mean "something higher-level than Data" like Encodable types (JSON etc.)

pub(crate) payload: Bytes,
pub(crate) user_timestamp: Option<u64>,
pub payload: Bytes,
pub user_timestamp: Option<u64>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shall we expose duration_since_timestamp e.g. for benchmarks?

Copy link
Copy Markdown
Contributor

@pblazej pblazej Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not an uniffi::Object we won't get withTimestampNow etc. for free, right? As it basically maps to raw struct:

public struct DataTrackFrame: Equatable, Hashable, Sendable {
  public var payload: Data
  public var userTimestamp: UInt64?
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applies to other "DTOs" as well, so good to discuss that now.

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this type of "DTO" is more naturally modeled as a value type on the Swift side. I don't think you can currently have associated functions on a uniffi::Record (even though this makes sense in a Swift context), so I see two options:

  1. Use uniffi::Object
  2. Export standalone helper functions (e.g., fn with_user_timestamp(frame: DataTrackFrame) → DataTrackFrame) and define an extension DataTrackFrame on the Swift side to make it an associated function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good news: UniFFI v0.31.0 added support for methods on records and enums!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup let's avoid handwritten extensions and just try bumping UniFFI.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: tried that locally, and it does not work for #[uniffi::remote(Record)] e.g. DataTrackFrame

/// Returns the next received frame or `None` if the subscription has ended.
pub async fn next(&self) -> Option<DataTrackFrame> {
// TODO: avoid mutex?
self.0.lock().await.next().await
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @reenboog talking about async 😄

Copy link
Copy Markdown
Contributor

@pblazej pblazej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see any real blockers here, would be nice to discuss some common cases like errors and records now.

@pblazej
Copy link
Copy Markdown
Contributor

pblazej commented Apr 24, 2026

Hm, I see one more gap in the runtime after introducing 90cf95a

UniFFI polls async futures from Swift's thread via continuation callbacks. That thread has no tokio runtime context. When the future hits tokio::time::timeout (line 453 in manager.rs), tokio tries to register a timer with a runtime that doesn't exist on that thread → panic.

The naive claude solution was:

pub(crate) fn contextualize<F: std::future::Future>(
    future: F,
) -> impl std::future::Future<Output = F::Output> {
    use std::pin::Pin;
    use std::task::{Context, Poll};

    struct Contextualized<F> {
        inner: Pin<Box<F>>,
    }

    impl<F: std::future::Future> std::future::Future for Contextualized<F> {
        type Output = F::Output;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let _enter = runtime().enter();
            self.inner.as_mut().poll(cx)
        }
    }

    Contextualized { inner: Box::pin(future) }
}

then e.g.

crate::runtime::contextualize(self.input.publish_track(options))
            .await
            .map(LocalDataTrack)

@reenboog
Copy link
Copy Markdown
Contributor

reenboog commented Apr 26, 2026

It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this:

// a cancelable version could be introduced as well
pub async fn on_runtime<F, Res>(future: F) -> Res
where
    F: Future<Output = Res> + Send + 'static,
    Res: Send + 'static,
{
    get_runtime()
        .spawn(future)
        .await
        .expect("A background Tokio task panicked")
}

...

// when exporting, wrap with on_runtime
#[uniffi::export]
pub async fn ffi_do_work() -> Result<MyFfiType, FfiErr> {
    on_runtime(async {
        let res = core::do_work(params).await?;
        Ok(res)
    })
    .await
}

I already started drafting this in my work related to signaling.

#[uniffi(flat_error)]
pub enum HandleSignalResponseError {
#[error("Response decoding failed: {0}")]
Decode(prost::DecodeError),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: It is worth making a unique error type here rather than using prost::DecodeError so that the internal protobuf implementation type doesn't leak through the interface?

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses the #[uniffi(flat_error)] macro (docs), the associated value for each case gets converted to a string via display for the purposes of crossing the FFI boundary so none of the inner error types are exposed—but we still get the enum cases.

}

/// Adapts [`DataTrackEncryptionProvider`] to implement [`EncryptionProvider`].
pub(super) struct FfiEncryptionProvider(pub(super) Arc<dyn DataTrackEncryptionProvider>);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Do you need a uniffi attr macro of some sort here and on FfiDecryptionProvider?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since this type never crosses the FFI boundary. The current approach for exporting the E2EE traits is a bit awkward; I had to introduce FFI-only wrapper traits since nothing like #[uniffi::remote(Trait)] exists. However, once we move the UniFFI macros to the livekit-datatrack crate, this should no longer be necessary—#[uniffi::export(with_foreign)] can be applied directly to the trait definitions then.

}

/// Unpublishes the track.
pub fn unpublish(&self) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally makes sense to me! Though IMO since this is somewhat swift specific, maybe this makes sense to expose in the swift specific code above this rather than in the uniffi definition. Also because I don't think sending a closure over uniffi is possible.

Comment on lines +125 to +126
// TODO: in a follow-up PR, refactor manager to work with cancellation tokens directly, eliminating the
// need for this additional task.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just a note that the web implementation has quite a bit of of logic dealing with this, so if nothing else there's a lot of good test cases to pull in)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will map nicely; CancellationToken is roughly equivalent to AbortSignal on web.

Comment thread livekit-uniffi/src/data_track/local.rs Outdated
///
/// If a signal response type not listed above is provided, the result is an error.
///
pub fn handle_signal_response(&self, res: &[u8]) -> Result<(), HandleSignalResponseError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given each client implementation pulls in the protobufs today anyway and that isn't likely to change, I think I'd vote for 2 (dispatch - client deserializes and knows what to do, passes rust_handle_specific_message down). This was generally what I did on the web (I realize it's not exactly the same set of problems though) and it worked quite well. If there's a desire to eventually do all protobuf serialization / deserialization at the manager level, then I think it's fairly reasonable to then move to 1 or 3 once we're more confident which pattern makes the most sense.

.manager
.encrypt_data(payload.into(), &self.sender_identity, key_index)
.map_err(|_| dt::EncryptionError)?;
.map_err(|_| dt::EncryptionError::Failed)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is this a breaking api change to existing rust sdk users?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since this type is never exposed in the public API in the livekit crate.

@ladvoc
Copy link
Copy Markdown
Contributor Author

ladvoc commented Apr 29, 2026

It's even more generic: at some point, a reactor might be expected (timers, network, any io), and none found. So, I think we should wrap all publicly exportable functions with something like this [...]

@reenboog, is this something you would want to add in the livekit_runtime crate?

@reenboog
Copy link
Copy Markdown
Contributor

reenboog commented Apr 30, 2026

is this something you would want to add in the livekit_runtime crate?
@ladvoc, yes, the idea is all async public functions, exported with uniffi, should be wrapped with that on_runtime I described earlier, to make it actually run on a runtime instead of piggybacking on whatever each platform provides. No manual enter is required, nor manual polling. I'm wrapping up a small PR to expose a runtime, so that both, uniffi-based modules and the legacy (protobuf-based) ones could use that global runtime.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants