-
Notifications
You must be signed in to change notification settings - Fork 141
WIP: Delivery timeout #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -220,10 +220,10 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| loop { | ||
| // Keep serving requests until there are no more consumers. | ||
| // This way we'll clean up the task when the broadcast is no longer needed. | ||
| let track = tokio::select! { | ||
| let (track, delivery_timeout) = tokio::select! { | ||
| _ = broadcast.unused() => break, | ||
| producer = broadcast.requested_track() => match producer { | ||
| Some(producer) => producer, | ||
| Some((track, timeout)) => (track, timeout), | ||
| None => break, | ||
| }, | ||
| _ = self.session.closed() => break, | ||
|
|
@@ -232,18 +232,22 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| let request_id = self.control.next_request_id().await?; | ||
| let mut this = self.clone(); | ||
|
|
||
| // Set this track subscription to carry the delivery timeout | ||
| let mut track_with_timeout = track.clone(); | ||
| track_with_timeout.delivery_timeout = delivery_timeout; | ||
|
|
||
| let mut state = self.state.lock(); | ||
| state.subscribes.insert( | ||
| request_id, | ||
| TrackState { | ||
| producer: track.clone(), | ||
| producer: track_with_timeout.clone(), | ||
| alias: None, | ||
| }, | ||
| ); | ||
|
Comment on lines
+235
to
246
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust
echo "---"
# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20Repository: moq-dev/moq Length of output: 3727 🏁 Script executed: #!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20Repository: moq-dev/moq Length of output: 899 🏁 Script executed: #!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40Repository: moq-dev/moq Length of output: 128 🏁 Script executed: #!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f
# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30Repository: moq-dev/moq Length of output: 1598 🏁 Script executed: #!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'Repository: moq-dev/moq Length of output: 814 🏁 Script executed: #!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rustRepository: moq-dev/moq Length of output: 37 🏁 Script executed: #!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'Repository: moq-dev/moq Length of output: 956 🏁 Script executed: #!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'Repository: moq-dev/moq Length of output: 1315 🏁 Script executed: #!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50Repository: moq-dev/moq Length of output: 856 🏁 Script executed: #!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq Length of output: 37 🏁 Script executed: #!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'Repository: moq-dev/moq Length of output: 933 Remove redundant delivery_timeout assignment (lines 236-237). The 🤖 Prompt for AI Agents |
||
|
|
||
| let path = path.to_owned(); | ||
| web_async::spawn(async move { | ||
| if let Err(err) = this.run_subscribe(request_id, path, track).await { | ||
| if let Err(err) = this.run_subscribe(request_id, path, track_with_timeout, delivery_timeout).await { | ||
| tracing::debug!(%err, id = %request_id, "error running subscribe"); | ||
| } | ||
| this.state.lock().subscribes.remove(&request_id); | ||
|
|
@@ -258,6 +262,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| request_id: RequestId, | ||
| broadcast: Path<'_>, | ||
| track: TrackProducer, | ||
| delivery_timeout: Option<u64>, | ||
| ) -> Result<(), Error> { | ||
| self.control.send(ietf::Subscribe { | ||
| request_id, | ||
|
|
@@ -267,6 +272,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| group_order: GroupOrder::Descending, | ||
| // we want largest group | ||
| filter_type: FilterType::LargestObject, | ||
| delivery_timeout, | ||
| })?; | ||
|
|
||
| // TODO we should send a joining fetch, but it's annoying to implement. | ||
|
|
@@ -290,6 +296,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| tracing::warn!(sub_group_id = %group.sub_group_id, "subgroup ID is not supported, stripping"); | ||
| } | ||
|
|
||
| let delivery_timeout; | ||
| let producer = { | ||
| let mut state = self.state.lock(); | ||
| let request_id = match state.aliases.get(&group.track_alias) { | ||
|
|
@@ -300,6 +307,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| } | ||
| }; | ||
| let track = state.subscribes.get_mut(&request_id).ok_or(Error::NotFound)?; | ||
| delivery_timeout = track.producer.delivery_timeout; | ||
|
|
||
| let group = Group { | ||
| sequence: group.group_id, | ||
|
|
@@ -309,14 +317,18 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
|
|
||
| let res = tokio::select! { | ||
| _ = producer.unused() => Err(Error::Cancel), | ||
| res = self.run_group(group, stream, producer.clone()) => res, | ||
| res = self.run_group(group, stream, producer.clone(), delivery_timeout) => res, | ||
| }; | ||
|
|
||
| match res { | ||
| Err(Error::Cancel) | Err(Error::Transport(_)) => { | ||
| tracing::trace!(group = %producer.info.sequence, "group cancelled"); | ||
| producer.abort(Error::Cancel); | ||
| } | ||
| Err(Error::DeliveryTimeout) => { | ||
| tracing::info!(group = %producer.info.sequence, "group delivery timeout"); | ||
| producer.close(); | ||
| } | ||
| Err(err) => { | ||
| tracing::debug!(%err, group = %producer.info.sequence, "group error"); | ||
| producer.abort(err); | ||
|
|
@@ -335,6 +347,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| group: ietf::GroupHeader, | ||
| stream: &mut Reader<S::RecvStream, Version>, | ||
| mut producer: GroupProducer, | ||
| delivery_timeout: Option<u64>, | ||
| ) -> Result<(), Error> { | ||
| while let Some(id_delta) = stream.decode_maybe::<u64>().await? { | ||
| if id_delta != 0 { | ||
|
|
@@ -365,10 +378,14 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
|
|
||
| let res = tokio::select! { | ||
| _ = frame.unused() => Err(Error::Cancel), | ||
| res = self.run_frame(stream, frame.clone()) => res, | ||
| res = self.run_frame(stream, frame.clone(), delivery_timeout) => res, | ||
| }; | ||
|
|
||
| if let Err(err) = res { | ||
| if matches!(err, Error::DeliveryTimeout) { | ||
| // Simply break out of the frame writing loop | ||
| return Err(Error::DeliveryTimeout); | ||
| } | ||
| frame.abort(err.clone()); | ||
| return Err(err); | ||
| } | ||
|
|
@@ -384,11 +401,14 @@ impl<S: web_transport_trait::Session> Subscriber<S> { | |
| &mut self, | ||
| stream: &mut Reader<S::RecvStream, Version>, | ||
| mut frame: FrameProducer, | ||
| _delivery_timeout: Option<u64>, | ||
| ) -> Result<(), Error> { | ||
| let mut remain = frame.info.size; | ||
|
|
||
| tracing::trace!(size = %frame.info.size, "reading frame"); | ||
|
|
||
| // TODO: Implement delivery timeout check here using frame.info.sent_timestamp if available | ||
|
|
||
| while remain > 0 { | ||
| let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?; | ||
| remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate the delivery_timeout presence flag.
Using
u8::decode+== 1silently accepts invalid values. Preferbool::decode(or an explicit match) so invalid encodings are rejected.🛠️ Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents