Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rs/hang/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ pub struct BroadcastConsumer {

impl BroadcastConsumer {
pub fn new(inner: moq_lite::BroadcastConsumer) -> Self {
let catalog = inner.subscribe_track(&Catalog::default_track()).into();
let catalog = inner.subscribe_track(&Catalog::default_track(), None).into();
Self { inner, catalog }
}

pub fn subscribe(&self, track: &moq_lite::Track, latency: std::time::Duration) -> TrackConsumer {
TrackConsumer::new(self.inner.subscribe_track(track), latency)
TrackConsumer::new(self.inner.subscribe_track(track, None), latency)
}
}

Expand Down
4 changes: 2 additions & 2 deletions rs/libmoq/src/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl Consume {
let track = consume.broadcast.subscribe_track(&moq_lite::Track {
name: rendition.clone(),
priority: video.priority,
});
}, None);
let track = TrackConsumer::new(track, latency);

let channel = oneshot::channel();
Expand Down Expand Up @@ -216,7 +216,7 @@ impl Consume {
let track = consume.broadcast.subscribe_track(&moq_lite::Track {
name: rendition.clone(),
priority: audio.priority,
});
}, None);
let track = TrackConsumer::new(track, latency);

let channel = oneshot::channel();
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> {
Some(announce) = origin.announced() => match announce {
(path, Some(broadcast)) => {
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track);
let track = broadcast.subscribe_track(&track, None);
clock = Some(clock::Subscriber::new(track));
}
(path, None) => {
Expand Down
4 changes: 4 additions & 0 deletions rs/moq-lite/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub enum Error {
#[error("unauthorized")]
Unauthorized,

#[error("delivery timeout exceeded")]
DeliveryTimeout,

#[error("unexpected message")]
UnexpectedMessage,

Expand Down Expand Up @@ -105,6 +108,7 @@ impl Error {
Self::TooLarge => 18,
Self::TooManyParameters => 19,
Self::InvalidRole => 20,
Self::DeliveryTimeout => 21,
Self::App(app) => *app + 64,
}
}
Expand Down
62 changes: 53 additions & 9 deletions rs/moq-lite/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
priority: msg.subscriber_priority,
};

let track = broadcast.subscribe_track(&track);
let track = broadcast.subscribe_track(&track, msg.delivery_timeout);

let (tx, rx) = oneshot::channel();
let mut subscribes = self.subscribes.lock();
Expand All @@ -109,9 +109,10 @@ impl<S: web_transport_trait::Session> Publisher<S> {
let request_id = msg.request_id;
let subscribes = self.subscribes.clone();
let version = self.version;
let delivery_timeout = msg.delivery_timeout;

web_async::spawn(async move {
if let Err(err) = Self::run_track(session, track, request_id, rx, version).await {
if let Err(err) = Self::run_track(session, track, request_id, delivery_timeout, rx, version).await {
control
.send(ietf::PublishDone {
request_id,
Expand Down Expand Up @@ -149,6 +150,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
session: S,
mut track: TrackConsumer,
request_id: RequestId,
delivery_timeout: Option<u64>,
mut cancel: oneshot::Receiver<()>,
version: Version,
) -> Result<(), Error> {
Expand Down Expand Up @@ -212,6 +214,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
msg,
track.info.priority,
group,
delivery_timeout,
version,
));

Expand Down Expand Up @@ -241,6 +244,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
msg: ietf::GroupHeader,
priority: u8,
mut group: GroupConsumer,
delivery_timeout: Option<u64>,
version: Version,
) -> Result<(), Error> {
// TODO add a way to open in priority order.
Expand All @@ -257,6 +261,39 @@ impl<S: web_transport_trait::Session> Publisher<S> {

tracing::trace!(?msg, "sending group header");

let result = Self::run_group_inner(&mut stream, &msg, &mut group, delivery_timeout).await;

// Handle errors specially
match result {
Err(Error::DeliveryTimeout) => {
tracing::warn!(sequence = %msg.group_id, "group delivery timeout - resetting stream");
// Reset the stream on delivery timeout
stream.abort(&Error::DeliveryTimeout);
return Err(Error::DeliveryTimeout);
}
Err(Error::Cancel) => {
tracing::debug!(sequence = %msg.group_id, "group cancelled");
return Err(Error::Cancel);
}
Err(err) => {
tracing::debug!(?err, sequence = %msg.group_id, "group error");
return Err(err);
}
Ok(()) => {
stream.finish()?;
stream.closed().await?;
tracing::debug!(sequence = %msg.group_id, "finished group");
Ok(())
}
}
}

async fn run_group_inner(
stream: &mut Writer<S::SendStream, Version>,
msg: &ietf::GroupHeader,
group: &mut GroupConsumer,
delivery_timeout: Option<u64>,
) -> Result<(), Error> {
loop {
let frame = tokio::select! {
biased;
Expand Down Expand Up @@ -292,6 +329,20 @@ impl<S: web_transport_trait::Session> Publisher<S> {
chunk = frame.read_chunk() => chunk,
};

// Check delivery timeout before writing chunk
if let Some(timeout) = delivery_timeout {
let now = tokio::time::Instant::now();
if now.duration_since(frame.arrival_time).as_millis() > timeout as u128 {
tracing::warn!(
"Delivery timeout exceeded. Arrival: {:?}, now: {:?}. Dropping group {}",
frame.arrival_time,
now,
group.info.sequence
);
return Err(Error::DeliveryTimeout);
}
}

match chunk? {
Some(mut chunk) => stream.write_all(&mut chunk).await?,
None => break,
Expand All @@ -300,13 +351,6 @@ impl<S: web_transport_trait::Session> Publisher<S> {
}
}

stream.finish()?;

// Wait until everything is acknowledged by the peer so we can still cancel the stream.
stream.closed().await?;

tracing::debug!(sequence = %msg.group_id, "finished group");

Ok(())
}

Expand Down
17 changes: 17 additions & 0 deletions rs/moq-lite/src/ietf/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Subscribe<'a> {
pub subscriber_priority: u8,
pub group_order: GroupOrder,
pub filter_type: FilterType,
pub delivery_timeout: Option<u64>,
}

impl Message for Subscribe<'_> {
Expand Down Expand Up @@ -79,13 +80,20 @@ impl Message for Subscribe<'_> {
// Ignore parameters, who cares.
let _params = Parameters::decode(r, version)?;

let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
Comment on lines +83 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Validate the delivery_timeout presence flag.

Using u8::decode + == 1 silently accepts invalid values. Prefer bool::decode (or an explicit match) so invalid encodings are rejected.

🛠️ Proposed fix
-		let delivery_timeout = if u8::decode(r, version)? == 1 {
-			Some(u64::decode(r, version)?)
-		} else {
-			None
-		};
+		let has_timeout = bool::decode(r, version)?;
+		let delivery_timeout = if has_timeout {
+			Some(u64::decode(r, version)?)
+		} else {
+			None
+		};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
let has_timeout = bool::decode(r, version)?;
let delivery_timeout = if has_timeout {
Some(u64::decode(r, version)?)
} else {
None
};
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscribe.rs` around lines 83 - 87, The delivery_timeout
presence flag currently decodes with u8::decode(...) == 1 which silently accepts
invalid values; change this to use bool::decode(r, version) to validate the flag
(or decode the u8 and explicitly match 0/1 and return an Err for other values)
and then conditionally decode the u64 only when the flag is true; update the
code around delivery_timeout (referencing delivery_timeout, u8::decode,
bool::decode, r, version) to reject invalid encodings instead of treating any
non-1 as absent.


Ok(Self {
request_id,
track_namespace,
track_name,
subscriber_priority,
group_order,
filter_type,
delivery_timeout,
})
}

Expand All @@ -104,6 +112,13 @@ impl Message for Subscribe<'_> {

self.filter_type.encode(w, version);
0u8.encode(w, version); // no parameters

if let Some(timeout) = self.delivery_timeout {
1u8.encode(w, version);
timeout.encode(w, version);
} else {
0u8.encode(w, version);
}
}
}

Expand Down Expand Up @@ -283,6 +298,7 @@ mod tests {
subscriber_priority: 128,
group_order: GroupOrder::Descending,
filter_type: FilterType::LargestObject,
delivery_timeout: None,
};

let encoded = encode_message(&msg);
Expand All @@ -303,6 +319,7 @@ mod tests {
subscriber_priority: 255,
group_order: GroupOrder::Descending,
filter_type: FilterType::LargestObject,
delivery_timeout: None,
};

let encoded = encode_message(&msg);
Expand Down
32 changes: 26 additions & 6 deletions rs/moq-lite/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
loop {
// Keep serving requests until there are no more consumers.
// This way we'll clean up the task when the broadcast is no longer needed.
let track = tokio::select! {
let (track, delivery_timeout) = tokio::select! {
_ = broadcast.unused() => break,
producer = broadcast.requested_track() => match producer {
Some(producer) => producer,
Some((track, timeout)) => (track, timeout),
None => break,
},
_ = self.session.closed() => break,
Expand All @@ -232,18 +232,22 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
let request_id = self.control.next_request_id().await?;
let mut this = self.clone();

// Set this track subscription to carry the delivery timeout
let mut track_with_timeout = track.clone();
track_with_timeout.delivery_timeout = delivery_timeout;

let mut state = self.state.lock();
state.subscribes.insert(
request_id,
TrackState {
producer: track.clone(),
producer: track_with_timeout.clone(),
alias: None,
},
);
Comment on lines +235 to 246
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust

echo "---"

# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20

Repository: moq-dev/moq

Length of output: 3727


🏁 Script executed:

#!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20

Repository: moq-dev/moq

Length of output: 899


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40

Repository: moq-dev/moq

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f

# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30

Repository: moq-dev/moq

Length of output: 1598


🏁 Script executed:

#!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'

Repository: moq-dev/moq

Length of output: 814


🏁 Script executed:

#!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'

Repository: moq-dev/moq

Length of output: 956


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'

Repository: moq-dev/moq

Length of output: 1315


🏁 Script executed:

#!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50

Repository: moq-dev/moq

Length of output: 856


🏁 Script executed:

#!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'

Repository: moq-dev/moq

Length of output: 933


Remove redundant delivery_timeout assignment (lines 236-237).

The track received from broadcast.requested_track() is a clone of the producer created at broadcast.rs:209-214, which already has delivery_timeout set. Since TrackProducer derives Clone, the field is preserved. The assignment at line 237 is redundant and can be removed; use track directly instead of creating track_with_timeout.

🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 235 - 246, The code
redundantly clones `track` into `track_with_timeout` and reassigns
`delivery_timeout`; remove the extra clone and assignment and insert the
original `track` directly into the subscription state. Specifically, in the
block around `let mut state = self.state.lock();` and the
`state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.


let path = path.to_owned();
web_async::spawn(async move {
if let Err(err) = this.run_subscribe(request_id, path, track).await {
if let Err(err) = this.run_subscribe(request_id, path, track_with_timeout, delivery_timeout).await {
tracing::debug!(%err, id = %request_id, "error running subscribe");
}
this.state.lock().subscribes.remove(&request_id);
Expand All @@ -258,6 +262,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
request_id: RequestId,
broadcast: Path<'_>,
track: TrackProducer,
delivery_timeout: Option<u64>,
) -> Result<(), Error> {
self.control.send(ietf::Subscribe {
request_id,
Expand All @@ -267,6 +272,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
group_order: GroupOrder::Descending,
// we want largest group
filter_type: FilterType::LargestObject,
delivery_timeout,
})?;

// TODO we should send a joining fetch, but it's annoying to implement.
Expand All @@ -290,6 +296,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, stripping");
}

let delivery_timeout;
let producer = {
let mut state = self.state.lock();
let request_id = match state.aliases.get(&group.track_alias) {
Expand All @@ -300,6 +307,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
}
};
let track = state.subscribes.get_mut(&request_id).ok_or(Error::NotFound)?;
delivery_timeout = track.producer.delivery_timeout;

let group = Group {
sequence: group.group_id,
Expand All @@ -309,14 +317,18 @@ impl<S: web_transport_trait::Session> Subscriber<S> {

let res = tokio::select! {
_ = producer.unused() => Err(Error::Cancel),
res = self.run_group(group, stream, producer.clone()) => res,
res = self.run_group(group, stream, producer.clone(), delivery_timeout) => res,
};

match res {
Err(Error::Cancel) | Err(Error::Transport(_)) => {
tracing::trace!(group = %producer.info.sequence, "group cancelled");
producer.abort(Error::Cancel);
}
Err(Error::DeliveryTimeout) => {
tracing::info!(group = %producer.info.sequence, "group delivery timeout");
producer.close();
}
Err(err) => {
tracing::debug!(%err, group = %producer.info.sequence, "group error");
producer.abort(err);
Expand All @@ -335,6 +347,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
group: ietf::GroupHeader,
stream: &mut Reader<S::RecvStream, Version>,
mut producer: GroupProducer,
delivery_timeout: Option<u64>,
) -> Result<(), Error> {
while let Some(id_delta) = stream.decode_maybe::<u64>().await? {
if id_delta != 0 {
Expand Down Expand Up @@ -365,10 +378,14 @@ impl<S: web_transport_trait::Session> Subscriber<S> {

let res = tokio::select! {
_ = frame.unused() => Err(Error::Cancel),
res = self.run_frame(stream, frame.clone()) => res,
res = self.run_frame(stream, frame.clone(), delivery_timeout) => res,
};

if let Err(err) = res {
if matches!(err, Error::DeliveryTimeout) {
// Simply break out of the frame writing loop
return Err(Error::DeliveryTimeout);
}
frame.abort(err.clone());
return Err(err);
}
Expand All @@ -384,11 +401,14 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
&mut self,
stream: &mut Reader<S::RecvStream, Version>,
mut frame: FrameProducer,
_delivery_timeout: Option<u64>,
) -> Result<(), Error> {
let mut remain = frame.info.size;

tracing::trace!(size = %frame.info.size, "reading frame");

// TODO: Implement delivery timeout check here using frame.info.sent_timestamp if available

while remain > 0 {
let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?;
remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?;
Expand Down
Loading
Loading