diff --git a/Package.swift b/Package.swift index 48749d3a7..081b4b5d6 100644 --- a/Package.swift +++ b/Package.swift @@ -20,7 +20,7 @@ let package = Package( dependencies: [ // LK-Prefixed Dynamic WebRTC XCFramework .package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "144.7559.03"), - .package(url: "https://github.com/livekit/livekit-uniffi-xcframework.git", exact: "0.0.6"), + .package(name: "LiveKitUniFFI", path: "../../rust-sdks/livekit-uniffi/packages/swift/LiveKitUniFFI"), .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"), // Only used for DocC generation .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"), @@ -34,7 +34,7 @@ let package = Package( name: "LiveKit", dependencies: [ .product(name: "LiveKitWebRTC", package: "webrtc-xcframework"), - .product(name: "LiveKitUniFFI", package: "livekit-uniffi-xcframework"), + .product(name: "LiveKitUniFFI", package: "LiveKitUniFFI"), .product(name: "SwiftProtobuf", package: "swift-protobuf"), "LKObjCHelpers", ], diff --git a/Package@swift-6.2.swift b/Package@swift-6.2.swift index 82fc5ff87..0dab812df 100644 --- a/Package@swift-6.2.swift +++ b/Package@swift-6.2.swift @@ -21,7 +21,7 @@ let package = Package( dependencies: [ // LK-Prefixed Dynamic WebRTC XCFramework .package(url: "https://github.com/livekit/webrtc-xcframework.git", exact: "144.7559.03"), - .package(url: "https://github.com/livekit/livekit-uniffi-xcframework.git", exact: "0.0.6"), + .package(name: "LiveKitUniFFI", path: "../../rust-sdks/livekit-uniffi/packages/swift/LiveKitUniFFI"), .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0"), // Only used for DocC generation .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.3.0"), @@ -35,7 +35,7 @@ let package = Package( name: "LiveKit", dependencies: [ .product(name: "LiveKitWebRTC", package: "webrtc-xcframework"), - .product(name: "LiveKitUniFFI", package: "livekit-uniffi-xcframework"), + .product(name: "LiveKitUniFFI", package: "LiveKitUniFFI"), .product(name: "SwiftProtobuf", package: "swift-protobuf"), "LKObjCHelpers", ], diff --git a/Sources/LiveKit/Core/Room+DataTrack.swift b/Sources/LiveKit/Core/Room+DataTrack.swift new file mode 100644 index 000000000..dd9b46328 --- /dev/null +++ b/Sources/LiveKit/Core/Room+DataTrack.swift @@ -0,0 +1,135 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +internal import LiveKitUniFFI +internal import LiveKitWebRTC + +// MARK: - Data Track Manager Properties + +extension Room { + func setupDataTrackManagers() { + let localBridge = LocalDataTrackBridge(room: self) + localDataTrackManager = LocalDataTrackManager( + delegate: localBridge, + encryptionProvider: nil // TODO: E2EE bridge in Phase 1f + ) + + let remoteBridge = RemoteDataTrackBridge(room: self) + remoteDataTrackManager = RemoteDataTrackManager( + delegate: remoteBridge, + decryptionProvider: nil // TODO: E2EE bridge in Phase 1f + ) + } + + func cleanUpDataTrackManagers() { + localDataTrackManager = nil + remoteDataTrackManager = nil + } +} + +// MARK: - Subscriber Data Track Channel + +extension Room { + func configureSubscriberDataTrackChannel(_ dataChannel: LKRTCDataChannel) { + log("Setting subscriber data track channel") + subscriberDataTrackChannel = dataChannel + dataChannel.delegate = subscriberDataTrackChannelDelegate + } +} + +// MARK: - Local Data Track Bridge + +final class LocalDataTrackBridge: LocalDataTrackManagerDelegate, @unchecked Sendable { + private weak var room: Room? + + init(room: Room) { + self.room = room + } + + func onSignalRequest(request: Data) { + guard let room else { return } + guard let signalRequest = try? Livekit_SignalRequest(serializedBytes: request) else { + room.log("Failed to decode data track signal request", .warning) + return + } + Task { + try? await room.signalClient.sendRequest(signalRequest) + } + } + + func onPacketsAvailable(packets: [Data]) { + guard let room, let channel = room.publisherDataTrackChannel else { return } + for packet in packets { + let buffer = RTC.createDataBuffer(data: packet) + DispatchQueue.liveKitWebRTC.sync { + channel.sendData(buffer) + } + } + } +} + +// MARK: - Remote Data Track Bridge + +final class RemoteDataTrackBridge: RemoteDataTrackManagerDelegate, @unchecked Sendable { + private weak var room: Room? + + init(room: Room) { + self.room = room + } + + func onSignalRequest(request: Data) { + guard let room else { return } + guard let signalRequest = try? Livekit_SignalRequest(serializedBytes: request) else { + room.log("Failed to decode data track signal request", .warning) + return + } + Task { + try? await room.signalClient.sendRequest(signalRequest) + } + } + + func onTrackPublished(track: RemoteDataTrack) { + guard let room else { return } + room.dataTrackDelegates.notify(label: { "room.didPublishDataTrack" }) { + $0.room(room, didPublishDataTrack: track) + } + } + + func onTrackUnpublished(sid: String) { + guard let room else { return } + room.dataTrackDelegates.notify(label: { "room.didUnpublishDataTrack" }) { + $0.room(room, didUnpublishDataTrack: sid) + } + } +} + +// MARK: - Subscriber Data Track Channel Delegate + +final class SubscriberDataTrackChannelDelegate: NSObject, LKRTCDataChannelDelegate, @unchecked Sendable { + private weak var room: Room? + + init(room: Room) { + self.room = room + } + + func dataChannelDidChangeState(_: LKRTCDataChannel) {} + + func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) { + room?.remoteDataTrackManager?.handlePacketReceived(packet: buffer.data) + } +} diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 148fe423d..867b2e8d7 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -44,6 +44,11 @@ extension Room { publisherDataChannel.reset() subscriberDataChannel.reset() + // Clean up data track channels and managers + publisherDataTrackChannel = nil + subscriberDataTrackChannel = nil + cleanUpDataTrackManagers() + await _state.transport?.close() // Reset publish state @@ -178,8 +183,13 @@ extension Room { publisherDataChannel.set(reliable: reliableDataChannel) publisherDataChannel.set(lossy: lossyDataChannel) + // Data track channel (unordered, unreliable — DTP handles its own sequencing) + publisherDataTrackChannel = await publisher.dataChannel(for: LKRTCDataChannel.Labels.dataTrack, + configuration: RTC.createDataChannelConfiguration(ordered: false, maxRetransmits: 0)) + log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))") log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))") + log("dataChannel.\(String(describing: publisherDataTrackChannel?.label)) : \(String(describing: publisherDataTrackChannel?.channelId))") let subscriber = isSinglePC ? nil : try Transport(config: rtcConfiguration, target: .subscriber, @@ -447,9 +457,14 @@ extension Room { if case .quick = mode { try await quickReconnectSequence() self.log("[Connect] Quick reconnect succeeded for attempt \(currentAttempt)") + // Resend data track subscription state after quick reconnect + self.remoteDataTrackManager?.resendSubscriptionUpdates() } else if case .full = mode { try await fullReconnectSequence() self.log("[Connect] Full reconnect succeeded for attempt \(currentAttempt)") + // Republish data tracks after full reconnect + self.localDataTrackManager?.republishTracks() + self.remoteDataTrackManager?.resendSubscriptionUpdates() } } catch { self.log("[Connect] Reconnect mode: \(mode) failed with error: \(error)", .error) diff --git a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift index 0039ea3e7..a3b958ae3 100644 --- a/Sources/LiveKit/Core/Room+SignalClientDelegate.swift +++ b/Sources/LiveKit/Core/Room+SignalClientDelegate.swift @@ -406,6 +406,20 @@ extension Room: SignalClientDelegate { } } + func signalClient(_: SignalClient, didReceiveRawResponse data: Data) async { + // Each manager has specific handler methods per message type. + // Try them all — they return UnsupportedType for messages they don't handle. + try? localDataTrackManager?.handleSfuRequestResponse(res: data) + try? localDataTrackManager?.handleSfuPublishResponse(res: data) + try? remoteDataTrackManager?.handleSubscriberHandles(res: data) + if let identity = localParticipant.identity?.stringValue { + try? remoteDataTrackManager?.handleSfuParticipantUpdate( + res: data, + localParticipantIdentity: identity + ) + } + } + func signalClient(_: SignalClient, didReceiveMediaSectionsRequirement requirement: Livekit_MediaSectionsRequirement) async { guard case let .publisherOnly(publisher) = _state.transport else { return } diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index a8f6a11c1..bcc5b390d 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -111,6 +111,7 @@ extension Room: TransportDelegate { switch dataChannel.label { case LKRTCDataChannel.Labels.reliable: subscriberDataChannel.set(reliable: dataChannel) case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel) + case LKRTCDataChannel.Labels.dataTrack: configureSubscriberDataTrackChannel(dataChannel) default: log("Unknown data channel label \(dataChannel.label)", .warning) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 04a6bd4d5..ffcb9b000 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -19,6 +19,9 @@ import Combine import Foundation +internal import LiveKitUniFFI +internal import LiveKitWebRTC + #if canImport(Network) import Network #endif @@ -128,6 +131,15 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { self?.e2eeManager?.dataChannelEncryptionType ?? .none } + // MARK: - Data Tracks + + var localDataTrackManager: LocalDataTrackManager? + var remoteDataTrackManager: RemoteDataTrackManager? + var publisherDataTrackChannel: LKRTCDataChannel? + var subscriberDataTrackChannel: LKRTCDataChannel? + lazy var subscriberDataTrackChannelDelegate = SubscriberDataTrackChannelDelegate(room: self) + let dataTrackDelegates = MulticastDelegate(label: "DataTrackDelegate") + // MARK: - PreConnect lazy var preConnectBuffer = PreConnectAudioBuffer(room: self) @@ -433,6 +445,8 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable { // Final check if cancelled, don't fire connected events try Task.checkCancellation() + setupDataTrackManagers() + _state.mutate { $0.connectedUrl = finalUrl $0.connectionState = .connected diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 83010dfbc..01bb6fe30 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -289,6 +289,12 @@ private extension SignalClient { return } + // Forward serialized protobuf bytes to delegate for data track managers. + // Re-serialize if the message arrived as JSON (string) rather than binary. + if let rawData = try? response.serializedData() { + _delegate.notifyDetached { await $0.signalClient(self, didReceiveRawResponse: rawData) } + } + Task.detached { let alwaysProcess = switch response.message { case .join, .reconnect, .leave: true @@ -416,6 +422,10 @@ extension SignalClient { // MARK: - Send methods extension SignalClient { + func sendRequest(_ request: Livekit_SignalRequest) async throws { + try await _sendRequest(request) + } + func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws { let r = Livekit_SignalRequest.with { $0.offer = offer.toPBType(offerId: offerId) diff --git a/Sources/LiveKit/Extensions/RTCDataChannel+Util.swift b/Sources/LiveKit/Extensions/RTCDataChannel+Util.swift index bc98e1fda..1e44e7553 100644 --- a/Sources/LiveKit/Extensions/RTCDataChannel+Util.swift +++ b/Sources/LiveKit/Extensions/RTCDataChannel+Util.swift @@ -22,6 +22,7 @@ extension LKRTCDataChannel { enum Labels { static let reliable = "_reliable" static let lossy = "_lossy" + static let dataTrack = "_data_track" } func toLKInfoType() -> Livekit_DataChannelInfo { diff --git a/Sources/LiveKit/Participant/LocalParticipant+DataTrack.swift b/Sources/LiveKit/Participant/LocalParticipant+DataTrack.swift new file mode 100644 index 000000000..aa99b5a49 --- /dev/null +++ b/Sources/LiveKit/Participant/LocalParticipant+DataTrack.swift @@ -0,0 +1,82 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +internal import LiveKitUniFFI + +// MARK: - Data Track Publishing + +extension LocalParticipant { + /// Publishes a data track with the given name. + /// + /// - Parameter name: Track name visible to other participants. Must be unique per publisher. + /// - Returns: A ``LocalDataTrack`` that can be used to push frames to subscribers. + /// - Throws: ``PublishError`` if the track cannot be published. + func publishDataTrack(name: String) async throws -> LocalDataTrack { + guard let manager = _room?.localDataTrackManager else { + throw LiveKitError(.invalidState, message: "Not connected to a room") + } + return try await manager.publishTrack(options: DataTrackOptions(name: name)) + } + + /// Publishes a data track for the duration of the given closure, then unpublishes automatically. + /// + /// - Parameters: + /// - name: Track name visible to other participants. + /// - body: Closure that receives the published track. The track is unpublished when the closure returns or throws. + /// - Returns: The value returned by `body`. + func withDataTrack(name: String, body: (LocalDataTrack) async throws -> T) async throws -> T { + let track = try await publishDataTrack(name: name) + return try await withTaskCancellationHandler { + defer { track.unpublish() } + return try await body(track) + } onCancel: { + track.unpublish() + } + } +} + +// MARK: - Frame Drop Policy + +enum FrameDropPolicy { + /// Propagate the error to the caller. + case `throw` + /// Silently skip the frame. + case drop +} + +// MARK: - Sending AsyncSequence to a Track + +extension LocalDataTrack { + /// Sends frames from the source until it ends or the track is unpublished. + func send( + contentsOf source: S, + onQueueFull: FrameDropPolicy = .drop + ) async throws where S.Element == DataTrackFrame { + for try await frame in source { + guard isPublished() else { break } + do { + try tryPush(frame: frame) + } catch PushFrameErrorReason.QueueFull { + switch onQueueFull { + case .throw: throw PushFrameErrorReason.QueueFull + case .drop: continue + } + } + } + } +} diff --git a/Sources/LiveKit/Protocols/RoomDelegate.swift b/Sources/LiveKit/Protocols/RoomDelegate.swift index badca9434..e7b386199 100644 --- a/Sources/LiveKit/Protocols/RoomDelegate.swift +++ b/Sources/LiveKit/Protocols/RoomDelegate.swift @@ -16,6 +16,8 @@ import Foundation +internal import LiveKitUniFFI + /// ``RoomDelegate`` receives room events as well as ``Participant`` events. /// /// > Important: The thread which the delegate will be called on, is not guranteed to be the `main` thread. @@ -320,3 +322,21 @@ public protocol RoomDelegate: AnyObject, Sendable { @objc(room:publication:didUpdateE2EEState:) optional func room(_ room: Room, publication: TrackPublication, didUpdateE2EEState: E2EEState) } + +// MARK: - Data Track Delegate + +/// Delegate for receiving data track events from a ``Room``. +/// +/// Data track types are not Objective-C compatible, so these callbacks +/// are delivered through a separate Swift-only protocol. +protocol DataTrackDelegate: AnyObject, Sendable { + /// A remote participant published a data track. + func room(_ room: Room, didPublishDataTrack track: RemoteDataTrack) + /// A remote participant unpublished a data track. + func room(_ room: Room, didUnpublishDataTrack sid: String) +} + +extension DataTrackDelegate { + func room(_: Room, didPublishDataTrack _: RemoteDataTrack) {} + func room(_: Room, didUnpublishDataTrack _: String) {} +} diff --git a/Sources/LiveKit/Protocols/SignalClientDelegate.swift b/Sources/LiveKit/Protocols/SignalClientDelegate.swift index 9d66d0fb1..58030a3f2 100644 --- a/Sources/LiveKit/Protocols/SignalClientDelegate.swift +++ b/Sources/LiveKit/Protocols/SignalClientDelegate.swift @@ -38,4 +38,5 @@ protocol SignalClientDelegate: AnyObject, Sendable { func signalClient(_ signalClient: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason, regions: Livekit_RegionSettings?) async func signalClient(_ signalClient: SignalClient, didSubscribeTrack trackSid: Track.Sid) async func signalClient(_ signalClient: SignalClient, didReceiveMediaSectionsRequirement requirement: Livekit_MediaSectionsRequirement) async + func signalClient(_ signalClient: SignalClient, didReceiveRawResponse data: Data) async } diff --git a/Sources/LiveKit/Support/Async/AsyncPolling.swift b/Sources/LiveKit/Support/Async/AsyncPolling.swift new file mode 100644 index 000000000..f8a529aec --- /dev/null +++ b/Sources/LiveKit/Support/Async/AsyncPolling.swift @@ -0,0 +1,35 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A type that produces values asynchronously via a `next()` poll method. +/// +/// Conforming types get a `values` property that wraps the poll in an `AsyncStream`, +/// enabling `for await` iteration: +/// ```swift +/// for await frame in stream.values { +/// process(frame) +/// } +/// ``` +protocol AsyncPolling: Sendable { + associatedtype Element: Sendable + func next() async -> Element? +} + +extension AsyncPolling { + var values: AsyncStream { + AsyncStream(unfolding: next) + } +} diff --git a/Sources/LiveKit/Types/DataTrackFrame+Extensions.swift b/Sources/LiveKit/Types/DataTrackFrame+Extensions.swift new file mode 100644 index 000000000..fa32b13d2 --- /dev/null +++ b/Sources/LiveKit/Types/DataTrackFrame+Extensions.swift @@ -0,0 +1,37 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +internal import LiveKitUniFFI + +extension DataTrackFrame { + /// Creates a frame with the current Unix timestamp in milliseconds. + static func now(payload: Data) -> DataTrackFrame { + DataTrackFrame( + payload: payload, + userTimestamp: UInt64(Date().timeIntervalSince1970 * 1000) + ) + } + + /// Time elapsed since the frame's timestamp, if present. + var latency: TimeInterval? { + guard let ts = userTimestamp else { return nil } + let now = UInt64(Date().timeIntervalSince1970 * 1000) + guard now >= ts else { return nil } + return TimeInterval(now - ts) / 1000.0 + } +} diff --git a/Sources/LiveKit/Types/DataTrackTypes.swift b/Sources/LiveKit/Types/DataTrackTypes.swift new file mode 100644 index 000000000..e0b5ad857 --- /dev/null +++ b/Sources/LiveKit/Types/DataTrackTypes.swift @@ -0,0 +1,35 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import LiveKitUniFFI + +// Internal type aliases for data track types from LiveKitUniFFI. +// Tests access these via `@testable import LiveKit` or direct `import LiveKitUniFFI`. +typealias DataTrackFrame = LiveKitUniFFI.DataTrackFrame +typealias DataTrackInfo = LiveKitUniFFI.DataTrackInfo +typealias DataTrackOptions = LiveKitUniFFI.DataTrackOptions +typealias LocalDataTrack = LiveKitUniFFI.LocalDataTrack +typealias RemoteDataTrack = LiveKitUniFFI.RemoteDataTrack +typealias DataTrackStream = LiveKitUniFFI.DataTrackStream +typealias PushFrameErrorReason = LiveKitUniFFI.PushFrameErrorReason +typealias PublishDataTrackError = LiveKitUniFFI.PublishError +typealias DataTrackSubscribeError = LiveKitUniFFI.DataTrackSubscribeError + +// MARK: - AsyncPolling Conformance + +extension LiveKitUniFFI.DataTrackStream: AsyncPolling { + typealias Element = LiveKitUniFFI.DataTrackFrame +} diff --git a/Tests/LiveKitCoreTests/DataTrack/DataTrackTests.swift b/Tests/LiveKitCoreTests/DataTrack/DataTrackTests.swift new file mode 100644 index 000000000..7993548bd --- /dev/null +++ b/Tests/LiveKitCoreTests/DataTrack/DataTrackTests.swift @@ -0,0 +1,260 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +@testable import LiveKit +import LiveKitUniFFI +import Testing +#if canImport(LiveKitTestSupport) +import LiveKitTestSupport +#endif + +@Suite(.serialized, .tags(.dataTrack, .e2e)) +struct DataTrackTests { + // MARK: - Publish and Receive + + @Test + func publishAndReceive() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + RoomTestingOptions(canSubscribe: true), + ]) { rooms in + let publisherRoom = rooms[0] + let subscriberRoom = rooms[1] + + // Start watching before publishing to avoid race condition + let watcher = DataTrackWatcher(expectedName: "test") + subscriberRoom.dataTrackDelegates.add(delegate: watcher) + + let track = try await publisherRoom.localParticipant.publishDataTrack(name: "test") + #expect(track.isPublished()) + + let remoteTrack = try await watcher.waitForTrack() + #expect(remoteTrack.info().name == "test") + + let stream = try await remoteTrack.subscribe() + + let payload = Data(repeating: 0xAB, count: 1024) + let frameCount = 10 + for _ in 0 ..< frameCount { + try track.tryPush(frame: .now(payload: payload)) + } + + var received = 0 + for await frame in stream.values { + #expect(frame.payload == payload) + received += 1 + if received >= frameCount - 1 { break } + } + #expect(received >= frameCount - 1, "Expected at least \(frameCount - 1) frames, got \(received)") + } + } + + // MARK: - Publish Duplicate Name + + @Test + func publishDuplicateName() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + ]) { rooms in + let room = rooms[0] + let first = try await room.localParticipant.publishDataTrack(name: "dup") + #expect(first.isPublished()) + do { + _ = try await room.localParticipant.publishDataTrack(name: "dup") + Issue.record("Expected duplicate name error") + } catch { + // Any error is acceptable — DuplicateName or similar + } + } + } + + // MARK: - Publish Unauthorized + + @Test + func publishUnauthorized() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: false), + ]) { rooms in + let room = rooms[0] + do { + _ = try await room.localParticipant.publishDataTrack(name: "unauth") + Issue.record("Expected PublishError.NotAllowed") + } catch is PublishError { + // Expected + } + } + } + + // MARK: - Published State + + @Test + func publishedState() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + ]) { rooms in + let room = rooms[0] + let track = try await room.localParticipant.publishDataTrack(name: "state-test") + #expect(track.isPublished()) + + track.unpublish() + await track.waitForUnpublish() + #expect(!track.isPublished()) + } + } + + // MARK: - Frame Timestamp + + @Test + func frameTimestamp() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + RoomTestingOptions(canSubscribe: true), + ]) { rooms in + let publisherRoom = rooms[0] + let subscriberRoom = rooms[1] + + let watcher = DataTrackWatcher(expectedName: "ts-test") + subscriberRoom.dataTrackDelegates.add(delegate: watcher) + + let track = try await publisherRoom.localParticipant.publishDataTrack(name: "ts-test") + let remoteTrack = try await watcher.waitForTrack() + let stream = try await remoteTrack.subscribe() + + let payload = Data([1, 2, 3]) + try track.tryPush(frame: .now(payload: payload)) + + guard let frame = await stream.next() else { + Issue.record("Expected a frame") + return + } + + #expect(frame.userTimestamp != nil) + if let latency = frame.latency { + #expect(latency < 5.0, "Latency should be under 5 seconds, was \(latency)") + } + } + } + + // MARK: - Large Frames (Multi-Packet) + + @Test + func publishLargeFrames() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + RoomTestingOptions(canSubscribe: true), + ]) { rooms in + let publisherRoom = rooms[0] + let subscriberRoom = rooms[1] + + let watcher = DataTrackWatcher(expectedName: "large") + subscriberRoom.dataTrackDelegates.add(delegate: watcher) + + let track = try await publisherRoom.localParticipant.publishDataTrack(name: "large") + let remoteTrack = try await watcher.waitForTrack() + let stream = try await remoteTrack.subscribe() + + // 196KB payload — requires DTP packetization across multiple packets + let payload = Data((0 ..< 196 * 1024).map { UInt8($0 % 256) }) + let frameCount = 3 + for _ in 0 ..< frameCount { + try track.tryPush(frame: DataTrackFrame(payload: payload, userTimestamp: nil)) + try? await Task.sleep(nanoseconds: 100_000_000) // 100ms between large frames + } + + var received = 0 + for await frame in stream.values { + #expect(frame.payload == payload, "Payload mismatch on frame \(received)") + received += 1 + if received >= frameCount { break } + } + #expect(received >= frameCount) + } + } + + // MARK: - Resubscribe + + @Test + func resubscribe() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + RoomTestingOptions(canSubscribe: true), + ]) { rooms in + let publisherRoom = rooms[0] + let subscriberRoom = rooms[1] + + let watcher = DataTrackWatcher(expectedName: "resub") + subscriberRoom.dataTrackDelegates.add(delegate: watcher) + + let track = try await publisherRoom.localParticipant.publishDataTrack(name: "resub") + let remoteTrack = try await watcher.waitForTrack() + + let payload = Data([0xDE, 0xAD]) + + // First subscription + do { + let stream = try await remoteTrack.subscribe() + try track.tryPush(frame: DataTrackFrame(payload: payload, userTimestamp: nil)) + + guard let frame = await stream.next() else { + Issue.record("No frame on first subscription") + return + } + #expect(frame.payload == payload) + } + // Stream dropped — unsubscribes + + // Small delay to let unsubscribe propagate + try await Task.sleep(nanoseconds: 500_000_000) + + // Second subscription + do { + let stream = try await remoteTrack.subscribe() + try track.tryPush(frame: DataTrackFrame(payload: payload, userTimestamp: nil)) + + guard let frame = await stream.next() else { + Issue.record("No frame on second subscription") + return + } + #expect(frame.payload == payload) + } + } + } + + // MARK: - Many Tracks + + @Test + func publishManyTracks() async throws { + try await TestEnvironment.withRooms([ + RoomTestingOptions(canPublishData: true), + ]) { rooms in + let room = rooms[0] + let count = 64 // Conservative vs Rust's 256 — faster CI + + var tracks: [LocalDataTrack] = [] + for i in 0 ..< count { + let track = try await room.localParticipant.publishDataTrack(name: "track-\(i)") + tracks.append(track) + } + + #expect(tracks.count == count) + for (i, track) in tracks.enumerated() { + #expect(track.info().name == "track-\(i)") + #expect(track.isPublished()) + } + } + } +} diff --git a/Tests/LiveKitCoreTests/Tags.swift b/Tests/LiveKitCoreTests/Tags.swift index f43a51639..c210a662b 100644 --- a/Tests/LiveKitCoreTests/Tags.swift +++ b/Tests/LiveKitCoreTests/Tags.swift @@ -35,4 +35,6 @@ extension Tag { @Tag static var media: Self /// End-to-end encryption tests. @Tag static var e2ee: Self + /// Data track publish/subscribe tests. + @Tag static var dataTrack: Self } diff --git a/Tests/LiveKitTestSupport/Room+DataTrack.swift b/Tests/LiveKitTestSupport/Room+DataTrack.swift new file mode 100644 index 000000000..f34731d44 --- /dev/null +++ b/Tests/LiveKitTestSupport/Room+DataTrack.swift @@ -0,0 +1,62 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +@testable import LiveKit +import LiveKitUniFFI + +/// Watches for a remote data track to be published. Register as a delegate +/// on `room.dataTrackDelegates` **before** the track is published to avoid races. +public final class DataTrackWatcher: DataTrackDelegate, @unchecked Sendable { + public let expectedName: String + private let continuation: AsyncStream.Continuation + private let stream: AsyncStream + + public init(expectedName: String) { + self.expectedName = expectedName + let (stream, continuation) = AsyncStream.makeStream(of: RemoteDataTrack.self) + self.stream = stream + self.continuation = continuation + } + + /// Waits for the expected track to appear, with timeout. + public func waitForTrack(timeout: TimeInterval = 15) async throws -> RemoteDataTrack { + let deadline = Date().addingTimeInterval(timeout) + for await track in stream { + return track + } + throw LiveKitError(.timedOut, message: "Timed out waiting for data track '\(expectedName)'") + } + + // MARK: - DataTrackDelegate + + public func room(_: Room, didPublishDataTrack track: RemoteDataTrack) { + if track.info().name == expectedName { + continuation.yield(track) + continuation.finish() + } + } +} + +/// Convenience for simple cases — registers watcher, returns track. +public extension Room { + func waitForDataTrack(name: String, timeout: TimeInterval = 15) async throws -> RemoteDataTrack { + let watcher = DataTrackWatcher(expectedName: name) + dataTrackDelegates.add(delegate: watcher) + defer { dataTrackDelegates.remove(delegate: watcher) } + return try await watcher.waitForTrack(timeout: timeout) + } +}