diff --git a/rs/hang/src/model/broadcast.rs b/rs/hang/src/model/broadcast.rs index ca6e671c7..d78518e5d 100644 --- a/rs/hang/src/model/broadcast.rs +++ b/rs/hang/src/model/broadcast.rs @@ -75,12 +75,12 @@ pub struct BroadcastConsumer { impl BroadcastConsumer { pub fn new(inner: moq_lite::BroadcastConsumer) -> Self { - let catalog = inner.subscribe_track(&Catalog::default_track()).into(); + let catalog = inner.subscribe_track(&Catalog::default_track(), None).into(); Self { inner, catalog } } pub fn subscribe(&self, track: &moq_lite::Track, latency: std::time::Duration) -> TrackConsumer { - TrackConsumer::new(self.inner.subscribe_track(track), latency) + TrackConsumer::new(self.inner.subscribe_track(track, None), latency) } } diff --git a/rs/libmoq/src/consume.rs b/rs/libmoq/src/consume.rs index 93241e1e0..7c4618ff8 100644 --- a/rs/libmoq/src/consume.rs +++ b/rs/libmoq/src/consume.rs @@ -182,7 +182,7 @@ impl Consume { let track = consume.broadcast.subscribe_track(&moq_lite::Track { name: rendition.clone(), priority: video.priority, - }); + }, None); let track = TrackConsumer::new(track, latency); let channel = oneshot::channel(); @@ -216,7 +216,7 @@ impl Consume { let track = consume.broadcast.subscribe_track(&moq_lite::Track { name: rendition.clone(), priority: audio.priority, - }); + }, None); let track = TrackConsumer::new(track, latency); let channel = oneshot::channel(); diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index f1a2ce50a..f0644ba45 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> { Some(announce) = origin.announced() => match announce { (path, Some(broadcast)) => { tracing::info!(broadcast = %path, "broadcast is online, subscribing to track"); - let track = broadcast.subscribe_track(&track); + let track = broadcast.subscribe_track(&track, None); clock = Some(clock::Subscriber::new(track)); } (path, None) => { diff --git a/rs/moq-lite/src/error.rs b/rs/moq-lite/src/error.rs index 6c9a6d4ad..ee70c1079 100644 --- a/rs/moq-lite/src/error.rs +++ b/rs/moq-lite/src/error.rs @@ -66,6 +66,9 @@ pub enum Error { #[error("unauthorized")] Unauthorized, + #[error("delivery timeout exceeded")] + DeliveryTimeout, + #[error("unexpected message")] UnexpectedMessage, @@ -105,6 +108,7 @@ impl Error { Self::TooLarge => 18, Self::TooManyParameters => 19, Self::InvalidRole => 20, + Self::DeliveryTimeout => 21, Self::App(app) => *app + 64, } } diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 9c0224cc9..e611e322f 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -93,7 +93,7 @@ impl Publisher { priority: msg.subscriber_priority, }; - let track = broadcast.subscribe_track(&track); + let track = broadcast.subscribe_track(&track, msg.delivery_timeout); let (tx, rx) = oneshot::channel(); let mut subscribes = self.subscribes.lock(); @@ -109,9 +109,10 @@ impl Publisher { let request_id = msg.request_id; let subscribes = self.subscribes.clone(); let version = self.version; + let delivery_timeout = msg.delivery_timeout; web_async::spawn(async move { - if let Err(err) = Self::run_track(session, track, request_id, rx, version).await { + if let Err(err) = Self::run_track(session, track, request_id, delivery_timeout, rx, version).await { control .send(ietf::PublishDone { request_id, @@ -149,6 +150,7 @@ impl Publisher { session: S, mut track: TrackConsumer, request_id: RequestId, + delivery_timeout: Option, mut cancel: oneshot::Receiver<()>, version: Version, ) -> Result<(), Error> { @@ -212,6 +214,7 @@ impl Publisher { msg, track.info.priority, group, + delivery_timeout, version, )); @@ -241,6 +244,7 @@ impl Publisher { msg: ietf::GroupHeader, priority: u8, mut group: GroupConsumer, + delivery_timeout: Option, version: Version, ) -> Result<(), Error> { // TODO add a way to open in priority order. @@ -257,6 +261,39 @@ impl Publisher { tracing::trace!(?msg, "sending group header"); + let result = Self::run_group_inner(&mut stream, &msg, &mut group, delivery_timeout).await; + + // Handle errors specially + match result { + Err(Error::DeliveryTimeout) => { + tracing::warn!(sequence = %msg.group_id, "group delivery timeout - resetting stream"); + // Reset the stream on delivery timeout + stream.abort(&Error::DeliveryTimeout); + return Err(Error::DeliveryTimeout); + } + Err(Error::Cancel) => { + tracing::debug!(sequence = %msg.group_id, "group cancelled"); + return Err(Error::Cancel); + } + Err(err) => { + tracing::debug!(?err, sequence = %msg.group_id, "group error"); + return Err(err); + } + Ok(()) => { + stream.finish()?; + stream.closed().await?; + tracing::debug!(sequence = %msg.group_id, "finished group"); + Ok(()) + } + } + } + + async fn run_group_inner( + stream: &mut Writer, + msg: &ietf::GroupHeader, + group: &mut GroupConsumer, + delivery_timeout: Option, + ) -> Result<(), Error> { loop { let frame = tokio::select! { biased; @@ -292,6 +329,20 @@ impl Publisher { chunk = frame.read_chunk() => chunk, }; + // Check delivery timeout before writing chunk + if let Some(timeout) = delivery_timeout { + let now = tokio::time::Instant::now(); + if now.duration_since(frame.arrival_time).as_millis() > timeout as u128 { + tracing::warn!( + "Delivery timeout exceeded. Arrival: {:?}, now: {:?}. Dropping group {}", + frame.arrival_time, + now, + group.info.sequence + ); + return Err(Error::DeliveryTimeout); + } + } + match chunk? { Some(mut chunk) => stream.write_all(&mut chunk).await?, None => break, @@ -300,13 +351,6 @@ impl Publisher { } } - stream.finish()?; - - // Wait until everything is acknowledged by the peer so we can still cancel the stream. - stream.closed().await?; - - tracing::debug!(sequence = %msg.group_id, "finished group"); - Ok(()) } diff --git a/rs/moq-lite/src/ietf/subscribe.rs b/rs/moq-lite/src/ietf/subscribe.rs index b552b2535..d0352ada6 100644 --- a/rs/moq-lite/src/ietf/subscribe.rs +++ b/rs/moq-lite/src/ietf/subscribe.rs @@ -43,6 +43,7 @@ pub struct Subscribe<'a> { pub subscriber_priority: u8, pub group_order: GroupOrder, pub filter_type: FilterType, + pub delivery_timeout: Option, } impl Message for Subscribe<'_> { @@ -79,6 +80,12 @@ impl Message for Subscribe<'_> { // Ignore parameters, who cares. let _params = Parameters::decode(r, version)?; + let delivery_timeout = if u8::decode(r, version)? == 1 { + Some(u64::decode(r, version)?) + } else { + None + }; + Ok(Self { request_id, track_namespace, @@ -86,6 +93,7 @@ impl Message for Subscribe<'_> { subscriber_priority, group_order, filter_type, + delivery_timeout, }) } @@ -104,6 +112,13 @@ impl Message for Subscribe<'_> { self.filter_type.encode(w, version); 0u8.encode(w, version); // no parameters + + if let Some(timeout) = self.delivery_timeout { + 1u8.encode(w, version); + timeout.encode(w, version); + } else { + 0u8.encode(w, version); + } } } @@ -283,6 +298,7 @@ mod tests { subscriber_priority: 128, group_order: GroupOrder::Descending, filter_type: FilterType::LargestObject, + delivery_timeout: None, }; let encoded = encode_message(&msg); @@ -303,6 +319,7 @@ mod tests { subscriber_priority: 255, group_order: GroupOrder::Descending, filter_type: FilterType::LargestObject, + delivery_timeout: None, }; let encoded = encode_message(&msg); diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index b8ace558d..a4cee60f3 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -220,10 +220,10 @@ impl Subscriber { loop { // Keep serving requests until there are no more consumers. // This way we'll clean up the task when the broadcast is no longer needed. - let track = tokio::select! { + let (track, delivery_timeout) = tokio::select! { _ = broadcast.unused() => break, producer = broadcast.requested_track() => match producer { - Some(producer) => producer, + Some((track, timeout)) => (track, timeout), None => break, }, _ = self.session.closed() => break, @@ -232,18 +232,22 @@ impl Subscriber { let request_id = self.control.next_request_id().await?; let mut this = self.clone(); + // Set this track subscription to carry the delivery timeout + let mut track_with_timeout = track.clone(); + track_with_timeout.delivery_timeout = delivery_timeout; + let mut state = self.state.lock(); state.subscribes.insert( request_id, TrackState { - producer: track.clone(), + producer: track_with_timeout.clone(), alias: None, }, ); let path = path.to_owned(); web_async::spawn(async move { - if let Err(err) = this.run_subscribe(request_id, path, track).await { + if let Err(err) = this.run_subscribe(request_id, path, track_with_timeout, delivery_timeout).await { tracing::debug!(%err, id = %request_id, "error running subscribe"); } this.state.lock().subscribes.remove(&request_id); @@ -258,6 +262,7 @@ impl Subscriber { request_id: RequestId, broadcast: Path<'_>, track: TrackProducer, + delivery_timeout: Option, ) -> Result<(), Error> { self.control.send(ietf::Subscribe { request_id, @@ -267,6 +272,7 @@ impl Subscriber { group_order: GroupOrder::Descending, // we want largest group filter_type: FilterType::LargestObject, + delivery_timeout, })?; // TODO we should send a joining fetch, but it's annoying to implement. @@ -290,6 +296,7 @@ impl Subscriber { tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, stripping"); } + let delivery_timeout; let producer = { let mut state = self.state.lock(); let request_id = match state.aliases.get(&group.track_alias) { @@ -300,6 +307,7 @@ impl Subscriber { } }; let track = state.subscribes.get_mut(&request_id).ok_or(Error::NotFound)?; + delivery_timeout = track.producer.delivery_timeout; let group = Group { sequence: group.group_id, @@ -309,7 +317,7 @@ impl Subscriber { let res = tokio::select! { _ = producer.unused() => Err(Error::Cancel), - res = self.run_group(group, stream, producer.clone()) => res, + res = self.run_group(group, stream, producer.clone(), delivery_timeout) => res, }; match res { @@ -317,6 +325,10 @@ impl Subscriber { tracing::trace!(group = %producer.info.sequence, "group cancelled"); producer.abort(Error::Cancel); } + Err(Error::DeliveryTimeout) => { + tracing::info!(group = %producer.info.sequence, "group delivery timeout"); + producer.close(); + } Err(err) => { tracing::debug!(%err, group = %producer.info.sequence, "group error"); producer.abort(err); @@ -335,6 +347,7 @@ impl Subscriber { group: ietf::GroupHeader, stream: &mut Reader, mut producer: GroupProducer, + delivery_timeout: Option, ) -> Result<(), Error> { while let Some(id_delta) = stream.decode_maybe::().await? { if id_delta != 0 { @@ -365,10 +378,14 @@ impl Subscriber { let res = tokio::select! { _ = frame.unused() => Err(Error::Cancel), - res = self.run_frame(stream, frame.clone()) => res, + res = self.run_frame(stream, frame.clone(), delivery_timeout) => res, }; if let Err(err) = res { + if matches!(err, Error::DeliveryTimeout) { + // Simply break out of the frame writing loop + return Err(Error::DeliveryTimeout); + } frame.abort(err.clone()); return Err(err); } @@ -384,11 +401,14 @@ impl Subscriber { &mut self, stream: &mut Reader, mut frame: FrameProducer, + _delivery_timeout: Option, ) -> Result<(), Error> { let mut remain = frame.info.size; tracing::trace!(size = %frame.info.size, "reading frame"); + // TODO: Implement delivery timeout check here using frame.info.sent_timestamp if available + while remain > 0 { let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?; remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?; diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index d32b3aca8..de8583fab 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -189,7 +189,7 @@ impl Publisher { }; let broadcast = consumer.ok_or(Error::NotFound)?; - let track = broadcast.subscribe_track(&track); + let track = broadcast.subscribe_track(&track, subscribe.delivery_timeout); // TODO wait until track.info() to get the *real* priority @@ -265,10 +265,11 @@ impl Publisher { }; let priority = priority.insert(track.info.priority, sequence); + let delivery_timeout = subscribe.delivery_timeout; // Spawn a task to serve this group, ignoring any errors because they don't really matter. // TODO add some logging at least. - let handle = Box::pin(Self::serve_group(session.clone(), msg, priority, group, version)); + let handle = Box::pin(Self::serve_group(session.clone(), msg, priority, group, delivery_timeout, version)); // Terminate the old group if it's still running. if let Some(old_sequence) = old_sequence.take() { @@ -296,6 +297,7 @@ impl Publisher { msg: lite::Group, mut priority: PriorityHandle, mut group: GroupConsumer, + delivery_timeout: Option, version: Version, ) -> Result<(), Error> { // TODO add a way to open in priority order. @@ -309,6 +311,39 @@ impl Publisher { stream.encode(&lite::DataType::Group).await?; stream.encode(&msg).await?; + let result = Self::serve_group_inner(&mut stream, &mut priority, &mut group, delivery_timeout).await; + + // Handle errors specially + match result { + Err(Error::DeliveryTimeout) => { + tracing::warn!(sequence = %msg.sequence, "group delivery timeout - resetting stream"); + // Reset the stream on delivery timeout + stream.abort(&Error::DeliveryTimeout); + return Err(Error::DeliveryTimeout); + } + Err(Error::Cancel) => { + tracing::debug!(sequence = %msg.sequence, "group cancelled"); + return Err(Error::Cancel); + } + Err(err) => { + tracing::debug!(?err, sequence = %msg.sequence, "group error"); + return Err(err); + } + Ok(()) => { + stream.finish()?; + stream.closed().await?; + tracing::debug!(sequence = %msg.sequence, "finished group"); + Ok(()) + } + } + } + + async fn serve_group_inner( + stream: &mut Writer, + priority: &mut PriorityHandle, + group: &mut GroupConsumer, + delivery_timeout: Option, + ) -> Result<(), Error> { loop { let frame = tokio::select! { biased; @@ -342,6 +377,20 @@ impl Publisher { } }; + // Check delivery timeout before writing chunk + if let Some(timeout) = delivery_timeout { + let now = tokio::time::Instant::now(); + if now.duration_since(frame.arrival_time).as_millis() > timeout as u128 { + tracing::warn!( + "Delivery timeout exceeded. Arrival: {:?}, now: {:?}. Dropping group {}", + frame.arrival_time, + now, + group.info.sequence + ); + return Err(Error::DeliveryTimeout); + } + } + match chunk? { Some(mut chunk) => stream.write_all(&mut chunk).await?, None => break, @@ -351,11 +400,6 @@ impl Publisher { tracing::trace!(size = %frame.info.size, "wrote frame"); } - stream.finish()?; - stream.closed().await?; - - tracing::debug!(sequence = %msg.sequence, "finished group"); - Ok(()) } } diff --git a/rs/moq-lite/src/lite/subscribe.rs b/rs/moq-lite/src/lite/subscribe.rs index 36052f2b8..db2b3a4ca 100644 --- a/rs/moq-lite/src/lite/subscribe.rs +++ b/rs/moq-lite/src/lite/subscribe.rs @@ -15,6 +15,7 @@ pub struct Subscribe<'a> { pub broadcast: Path<'a>, pub track: Cow<'a, str>, pub priority: u8, + pub delivery_timeout: Option, } impl Message for Subscribe<'_> { @@ -23,12 +24,18 @@ impl Message for Subscribe<'_> { let broadcast = Path::decode(r, version)?; let track = Cow::::decode(r, version)?; let priority = u8::decode(r, version)?; + let delivery_timeout = if u8::decode(r, version)? == 1 { + Some(u64::decode(r, version)?) + } else { + None + }; Ok(Self { id, broadcast, track, priority, + delivery_timeout, }) } @@ -37,6 +44,12 @@ impl Message for Subscribe<'_> { self.broadcast.encode(w, version); self.track.encode(w, version); self.priority.encode(w, version); + if let Some(timeout) = self.delivery_timeout { + 1u8.encode(w, version); + timeout.encode(w, version); + } else { + 0u8.encode(w, version); + } } } diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9054ca9bd..c061a2f4a 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -153,27 +153,31 @@ impl Subscriber { loop { // Keep serving requests until there are no more consumers. // This way we'll clean up the task when the broadcast is no longer needed. - let track = tokio::select! { + let (track, delivery_timeout) = tokio::select! { _ = broadcast.unused() => break, producer = broadcast.requested_track() => match producer { - Some(producer) => producer, + Some((track, timeout)) => (track, timeout), None => break, }, _ = self.session.closed() => break, }; + // Set this track subscription to carry the delivery timeout + let mut track_with_timeout = track.clone(); + track_with_timeout.delivery_timeout = delivery_timeout; + let id = self.next_id.fetch_add(1, atomic::Ordering::Relaxed); let mut this = self.clone(); let path = path.clone(); web_async::spawn(async move { - this.run_subscribe(id, path, track).await; + this.run_subscribe(id, path, track_with_timeout, delivery_timeout).await; this.subscribes.lock().remove(&id); }); } } - async fn run_subscribe(&mut self, id: u64, broadcast: Path<'_>, track: TrackProducer) { + async fn run_subscribe(&mut self, id: u64, broadcast: Path<'_>, track: TrackProducer, delivery_timeout: Option) { self.subscribes.lock().insert(id, track.clone()); let msg = lite::Subscribe { @@ -181,6 +185,7 @@ impl Subscriber { broadcast: broadcast.to_owned(), track: (&track.info.name).into(), priority: track.info.priority, + delivery_timeout, }; tracing::info!(id, broadcast = %self.log_path(&broadcast), track = %track.info.name, "subscribe started"); @@ -238,9 +243,11 @@ impl Subscriber { pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let hdr: lite::Group = stream.decode().await?; + let delivery_timeout; let group = { let mut subs = self.subscribes.lock(); let track = subs.get_mut(&hdr.subscribe).ok_or(Error::Cancel)?; + delivery_timeout = track.delivery_timeout; let group = Group { sequence: hdr.sequence }; track.create_group(group).ok_or(Error::Old)? @@ -248,7 +255,7 @@ impl Subscriber { let res = tokio::select! { _ = group.unused() => Err(Error::Cancel), - res = self.run_group(stream, group.clone()) => res, + res = self.run_group(stream, group.clone(), delivery_timeout) => res, }; match res { @@ -256,6 +263,10 @@ impl Subscriber { tracing::trace!(group = %group.info.sequence, "group cancelled"); group.abort(Error::Cancel); } + Err(Error::DeliveryTimeout) => { + tracing::info!(group = %group.info.sequence, "group delivery timeout"); + group.close(); + } Err(err) => { tracing::debug!(%err, group = %group.info.sequence, "group error"); group.abort(err); @@ -273,16 +284,21 @@ impl Subscriber { &mut self, stream: &mut Reader, mut group: GroupProducer, + delivery_timeout: Option, ) -> Result<(), Error> { while let Some(size) = stream.decode_maybe::().await? { let frame = group.create_frame(Frame { size }); let res = tokio::select! { _ = frame.unused() => Err(Error::Cancel), - res = self.run_frame(stream, frame.clone()) => res, + res = self.run_frame(stream, frame.clone(), delivery_timeout) => res, }; if let Err(err) = res { + if matches!(err, Error::DeliveryTimeout) { + // Simply break out of the frame writing loop + return Err(Error::DeliveryTimeout); + } frame.abort(err.clone()); return Err(err); } @@ -297,11 +313,14 @@ impl Subscriber { &mut self, stream: &mut Reader, mut frame: FrameProducer, + _delivery_timeout: Option, ) -> Result<(), Error> { let mut remain = frame.info.size; tracing::trace!(size = %frame.info.size, "reading frame"); + // TODO: Implement delivery timeout check here using frame.info.sent_timestamp if available + const MAX_CHUNK: usize = 1024 * 1024; // 1 MiB while remain > 0 { let chunk = stream diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index c8affe410..6d83c9072 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -44,8 +44,8 @@ pub struct BroadcastProducer { state: Lock, closed: watch::Sender, requested: ( - async_channel::Sender, - async_channel::Receiver, + async_channel::Sender<(TrackProducer, Option)>, + async_channel::Receiver<(TrackProducer, Option)>, ), cloned: Arc, } @@ -70,7 +70,7 @@ impl BroadcastProducer { } /// Return the next requested track. - pub async fn requested_track(&mut self) -> Option { + pub async fn requested_track(&mut self) -> Option<(TrackProducer, Option)> { self.requested.1.recv().await.ok() } @@ -145,7 +145,7 @@ impl Drop for BroadcastProducer { self.requested.0.close(); // Drain any remaining requests. - while let Ok(producer) = self.requested.1.try_recv() { + while let Ok((producer, _)) = self.requested.1.try_recv() { producer.abort(Error::Cancel); } @@ -175,6 +175,7 @@ impl BroadcastProducer { .now_or_never() .expect("should not have blocked") .expect("should be a request") + .0 } pub fn assert_no_request(&mut self) { @@ -187,11 +188,11 @@ impl BroadcastProducer { pub struct BroadcastConsumer { state: Lock, closed: watch::Receiver, - requested: async_channel::Sender, + requested: async_channel::Sender<(TrackProducer, Option)>, } impl BroadcastConsumer { - pub fn subscribe_track(&self, track: &Track) -> TrackConsumer { + pub fn subscribe_track(&self, track: &Track, delivery_timeout: Option) -> TrackConsumer { let mut state = self.state.lock(); // Return any explictly published track. @@ -206,12 +207,15 @@ impl BroadcastConsumer { // Otherwise we have never seen this track before and need to create a new producer. let track = track.clone().produce(); - let producer = track.producer; + let mut producer = track.producer; let consumer = track.consumer; + // Set the delivery timeout on the producer + producer.delivery_timeout = delivery_timeout; + // Insert the producer into the lookup so we will deduplicate requests. // This is not a subscriber so it doesn't count towards "used" subscribers. - match self.requested.try_send(producer.clone()) { + match self.requested.try_send((producer.clone(), delivery_timeout)) { Ok(()) => {} Err(_) => { // If the BroadcastProducer is closed, immediately close the track. @@ -276,14 +280,14 @@ mod test { let consumer = producer.consume(); - let mut track1_sub = consumer.subscribe_track(&track1.producer.info); + let mut track1_sub = consumer.subscribe_track(&track1.producer.info, None); track1_sub.assert_group(); let mut track2 = Track::new("track2").produce(); producer.insert_track(track2.consumer); let consumer2 = producer.consume(); - let mut track2_consumer = consumer2.subscribe_track(&track2.producer.info); + let mut track2_consumer = consumer2.subscribe_track(&track2.producer.info, None); track2_consumer.assert_no_group(); track2.producer.append_group(); @@ -315,7 +319,7 @@ mod test { let consumer3 = producer.consume(); producer.assert_used(); - let track1 = consumer3.subscribe_track(&Track::new("track1")); + let track1 = consumer3.subscribe_track(&Track::new("track1"), None); // It doesn't matter if a subscription is alive, we only care about the broadcast handle. // TODO is this the right behavior? @@ -337,8 +341,8 @@ mod test { track1.producer.append_group(); producer.insert_track(track1.consumer); - let mut track1c = consumer.subscribe_track(&track1.producer.info); - let track2 = consumer.subscribe_track(&Track::new("track2")); + let mut track1c = consumer.subscribe_track(&track1.producer.info, None); + let track2 = consumer.subscribe_track(&Track::new("track2"), None); drop(producer); consumer.assert_closed(); @@ -374,12 +378,12 @@ mod test { let consumer = producer.consume(); let consumer2 = consumer.clone(); - let mut track1 = consumer.subscribe_track(&Track::new("track1")); + let mut track1 = consumer.subscribe_track(&Track::new("track1"), None); track1.assert_not_closed(); track1.assert_no_group(); // Make sure we deduplicate requests while track1 is still active. - let mut track2 = consumer2.subscribe_track(&Track::new("track1")); + let mut track2 = consumer2.subscribe_track(&Track::new("track1"), None); track2.assert_is_clone(&track1); // Get the requested track, and there should only be one. @@ -395,13 +399,13 @@ mod test { track2.assert_group(); // Make sure that tracks are cancelled when the producer is dropped. - let track4 = consumer.subscribe_track(&Track::new("track2")); + let track4 = consumer.subscribe_track(&Track::new("track2"), None); drop(producer); // Make sure the track is errored, not closed. track4.assert_error(); - let track5 = consumer2.subscribe_track(&Track::new("track3")); + let track5 = consumer2.subscribe_track(&Track::new("track3"), None); track5.assert_error(); } @@ -410,7 +414,7 @@ mod test { let mut broadcast = Broadcast::produce(); // Subscribe to a track that doesn't exist - this creates a request - let consumer1 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer1 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"), None); // Get the requested track producer let producer1 = broadcast.producer.assert_request(); @@ -422,7 +426,7 @@ mod test { ); // Making a new consumer will keep the producer alive - let consumer2 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer2 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"), None); consumer2.assert_is_clone(&consumer1); // Drop the consumer subscription @@ -449,7 +453,7 @@ mod test { tokio::time::sleep(std::time::Duration::from_millis(1)).await; // Now the cleanup task should have run and we can subscribe again to the unknown track. - let consumer3 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer3 = broadcast.consumer.subscribe_track(&Track::new("unknown_track"), None); let producer2 = broadcast.producer.assert_request(); // Drop the consumer, now the producer should be unused diff --git a/rs/moq-lite/src/model/frame.rs b/rs/moq-lite/src/model/frame.rs index 23c2b29f9..f7a884498 100644 --- a/rs/moq-lite/src/model/frame.rs +++ b/rs/moq-lite/src/model/frame.rs @@ -102,6 +102,7 @@ impl FrameProducer { info: self.info.clone(), state: self.state.subscribe(), index: 0, + arrival_time: tokio::time::Instant::now(), } } @@ -132,6 +133,9 @@ pub struct FrameConsumer { // The number of frames we've read. // NOTE: Cloned readers inherit this offset, but then run in parallel. index: usize, + + // Track when the frame arrived for delivery timeout checks + pub arrival_time: tokio::time::Instant, } impl FrameConsumer { diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index b90078474..7d33389a4 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -53,6 +53,7 @@ struct TrackState { pub struct TrackProducer { pub info: Track, state: watch::Sender, + pub delivery_timeout: Option, } impl TrackProducer { @@ -60,6 +61,7 @@ impl TrackProducer { Self { info, state: Default::default(), + delivery_timeout: None, } } diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index 525a79baf..d52c77c37 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -278,7 +278,7 @@ async fn serve_fetch( // NOTE: The auth token is already scoped to the broadcast. let broadcast = origin.consume_broadcast("").ok_or(StatusCode::NOT_FOUND)?; - let mut track = broadcast.subscribe_track(&track); + let mut track = broadcast.subscribe_track(&track, None); let Ok(group) = track.next_group().await else { return Err(StatusCode::INTERNAL_SERVER_ERROR.into());