-
Notifications
You must be signed in to change notification settings - Fork 138
WIP: Delivery timeout #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughDelivery timeout support has been added throughout the system. A new 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-lite/src/lite/subscribe.rs (1)
27-52: Reject invalid delivery_timeout flag values.
Right now any non-1flag becomesNone, silently accepting malformed data. This should error on invalid values to preserve protocol correctness.🛠️ Proposed fix (validate flag with bool::decode / encode)
- let delivery_timeout = if u8::decode(r, version)? == 1 { - Some(u64::decode(r, version)?) - } else { - None - }; + let has_timeout = bool::decode(r, version)?; + let delivery_timeout = if has_timeout { + Some(u64::decode(r, version)?) + } else { + None + }; @@ - if let Some(timeout) = self.delivery_timeout { - 1u8.encode(w, version); - timeout.encode(w, version); - } else { - 0u8.encode(w, version); - } + let has_timeout = self.delivery_timeout.is_some(); + has_timeout.encode(w, version); + if let Some(timeout) = self.delivery_timeout { + timeout.encode(w, version); + }rs/moq-lite/src/lite/subscriber.rs (1)
312-323: Delivery timeout isn’t enforced in run_frame yet.
_delivery_timeoutis unused and the TODO remains, so inbound frames will never trigger a timeout. Please implement the check (using the chosen frame timestamp/arrival time) or remove the parameter until supported to avoid a false sense of enforcement.If you want, I can draft the timeout check once you confirm which timestamp should be authoritative.
🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/ietf/subscribe.rs`:
- Around line 83-87: The delivery_timeout presence flag currently decodes with
u8::decode(...) == 1 which silently accepts invalid values; change this to use
bool::decode(r, version) to validate the flag (or decode the u8 and explicitly
match 0/1 and return an Err for other values) and then conditionally decode the
u64 only when the flag is true; update the code around delivery_timeout
(referencing delivery_timeout, u8::decode, bool::decode, r, version) to reject
invalid encodings instead of treating any non-1 as absent.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 235-246: The code redundantly clones `track` into
`track_with_timeout` and reassigns `delivery_timeout`; remove the extra clone
and assignment and insert the original `track` directly into the subscription
state. Specifically, in the block around `let mut state = self.state.lock();`
and the `state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/track.rs (1)
53-65: Documentdelivery_timeoutsemantics (units/scope).
PublicOption<u64>is ambiguous without units or definition (e.g., milliseconds vs ticks, per-frame vs per-group). A short doc comment would prevent misuse across call sites.rs/moq-lite/src/lite/subscribe.rs (1)
13-19: Clarifydelivery_timeoutunits in Subscribe.
A brief comment would help keep producers/consumers aligned on units and semantics.rs/moq-lite/src/ietf/subscriber.rs (1)
400-411: TODO: Delivery timeout check not yet implemented.The
_delivery_timeoutparameter is accepted but unused, with a TODO comment indicating future implementation usingframe.info.sent_timestamp. This is acceptable for WIP.Would you like me to help implement the delivery timeout check logic, or open an issue to track this task?
| let delivery_timeout = if u8::decode(r, version)? == 1 { | ||
| Some(u64::decode(r, version)?) | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate the delivery_timeout presence flag.
Using u8::decode + == 1 silently accepts invalid values. Prefer bool::decode (or an explicit match) so invalid encodings are rejected.
🛠️ Proposed fix
- let delivery_timeout = if u8::decode(r, version)? == 1 {
- Some(u64::decode(r, version)?)
- } else {
- None
- };
+ let has_timeout = bool::decode(r, version)?;
+ let delivery_timeout = if has_timeout {
+ Some(u64::decode(r, version)?)
+ } else {
+ None
+ };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let delivery_timeout = if u8::decode(r, version)? == 1 { | |
| Some(u64::decode(r, version)?) | |
| } else { | |
| None | |
| }; | |
| let has_timeout = bool::decode(r, version)?; | |
| let delivery_timeout = if has_timeout { | |
| Some(u64::decode(r, version)?) | |
| } else { | |
| None | |
| }; |
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscribe.rs` around lines 83 - 87, The delivery_timeout
presence flag currently decodes with u8::decode(...) == 1 which silently accepts
invalid values; change this to use bool::decode(r, version) to validate the flag
(or decode the u8 and explicitly match 0/1 and return an Err for other values)
and then conditionally decode the u64 only when the flag is true; update the
code around delivery_timeout (referencing delivery_timeout, u8::decode,
bool::decode, r, version) to reject invalid encodings instead of treating any
non-1 as absent.
| // Set this track subscription to carry the delivery timeout | ||
| let mut track_with_timeout = track.clone(); | ||
| track_with_timeout.delivery_timeout = delivery_timeout; | ||
|
|
||
| let mut state = self.state.lock(); | ||
| state.subscribes.insert( | ||
| request_id, | ||
| TrackState { | ||
| producer: track.clone(), | ||
| producer: track_with_timeout.clone(), | ||
| alias: None, | ||
| }, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust
echo "---"
# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20Repository: moq-dev/moq
Length of output: 3727
🏁 Script executed:
#!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20Repository: moq-dev/moq
Length of output: 899
🏁 Script executed:
#!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40Repository: moq-dev/moq
Length of output: 128
🏁 Script executed:
#!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f
# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30Repository: moq-dev/moq
Length of output: 1598
🏁 Script executed:
#!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'Repository: moq-dev/moq
Length of output: 814
🏁 Script executed:
#!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rustRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'Repository: moq-dev/moq
Length of output: 956
🏁 Script executed:
#!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'Repository: moq-dev/moq
Length of output: 1315
🏁 Script executed:
#!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50Repository: moq-dev/moq
Length of output: 856
🏁 Script executed:
#!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'Repository: moq-dev/moq
Length of output: 933
Remove redundant delivery_timeout assignment (lines 236-237).
The track received from broadcast.requested_track() is a clone of the producer created at broadcast.rs:209-214, which already has delivery_timeout set. Since TrackProducer derives Clone, the field is preserved. The assignment at line 237 is redundant and can be removed; use track directly instead of creating track_with_timeout.
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 235 - 246, The code
redundantly clones `track` into `track_with_timeout` and reassigns
`delivery_timeout`; remove the extra clone and assignment and insert the
original `track` directly into the subscription state. Specifically, in the
block around `let mut state = self.state.lock();` and the
`state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
I implemented simplified delivery timeouts for a research work 6 months ago. I asked Claude to help bring those changes into the latest version of the repo. This hasn't been completely tested yet, but my original implementation did work for my purposes. I haven't touched anything on the JavaScript side yet.
Paper (soon to be updated): https://arxiv.org/abs/2507.15673
Relevant issue: #678