Skip to content

Conversation

@ac-freeman
Copy link

I implemented simplified delivery timeouts for a research work 6 months ago. I asked Claude to help bring those changes into the latest version of the repo. This hasn't been completely tested yet, but my original implementation did work for my purposes. I haven't touched anything on the JavaScript side yet.

Paper (soon to be updated): https://arxiv.org/abs/2507.15673
Relevant issue: #678

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 21, 2026

Walkthrough

Delivery timeout support has been added throughout the system. A new DeliveryTimeout error variant was introduced to the Error enum. The Subscribe message structure was extended to include an optional delivery_timeout field for both IETF and lite protocols. The subscribe_track method now accepts an additional delivery_timeout parameter. The broadcast request channel now transmits TrackProducer paired with delivery_timeout instead of TrackProducer alone. FrameConsumer tracks arrival time, and TrackProducer includes a delivery_timeout field. Delivery timeout propagation flows through publisher and subscriber paths with associated error handling and timeout enforcement logic at the frame transmission level.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'WIP: Delivery timeout' is a vague, generic label that doesn't clearly convey the specific changes made in this changeset. Provide a more descriptive title that summarizes the main feature being added, e.g., 'Add delivery timeout support for broadcast subscriptions' or 'Implement delivery timeout mechanism across publisher and subscriber paths'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description provides context about the implementation being a reintroduction of prior work for research purposes, including relevant issue and paper references, which relates to the changeset's delivery timeout feature additions.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-lite/src/lite/subscribe.rs (1)

27-52: Reject invalid delivery_timeout flag values.
Right now any non-1 flag becomes None, silently accepting malformed data. This should error on invalid values to preserve protocol correctness.

🛠️ Proposed fix (validate flag with bool::decode / encode)
-		let delivery_timeout = if u8::decode(r, version)? == 1 {
-			Some(u64::decode(r, version)?)
-		} else {
-			None
-		};
+		let has_timeout = bool::decode(r, version)?;
+		let delivery_timeout = if has_timeout {
+			Some(u64::decode(r, version)?)
+		} else {
+			None
+		};

@@
-		if let Some(timeout) = self.delivery_timeout {
-			1u8.encode(w, version);
-			timeout.encode(w, version);
-		} else {
-			0u8.encode(w, version);
-		}
+		let has_timeout = self.delivery_timeout.is_some();
+		has_timeout.encode(w, version);
+		if let Some(timeout) = self.delivery_timeout {
+			timeout.encode(w, version);
+		}
rs/moq-lite/src/lite/subscriber.rs (1)

312-323: Delivery timeout isn’t enforced in run_frame yet.

_delivery_timeout is unused and the TODO remains, so inbound frames will never trigger a timeout. Please implement the check (using the chosen frame timestamp/arrival time) or remove the parameter until supported to avoid a false sense of enforcement.

If you want, I can draft the timeout check once you confirm which timestamp should be authoritative.

🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/ietf/subscribe.rs`:
- Around line 83-87: The delivery_timeout presence flag currently decodes with
u8::decode(...) == 1 which silently accepts invalid values; change this to use
bool::decode(r, version) to validate the flag (or decode the u8 and explicitly
match 0/1 and return an Err for other values) and then conditionally decode the
u64 only when the flag is true; update the code around delivery_timeout
(referencing delivery_timeout, u8::decode, bool::decode, r, version) to reject
invalid encodings instead of treating any non-1 as absent.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 235-246: The code redundantly clones `track` into
`track_with_timeout` and reassigns `delivery_timeout`; remove the extra clone
and assignment and insert the original `track` directly into the subscription
state. Specifically, in the block around `let mut state = self.state.lock();`
and the `state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/track.rs (1)

53-65: Document delivery_timeout semantics (units/scope).
Public Option<u64> is ambiguous without units or definition (e.g., milliseconds vs ticks, per-frame vs per-group). A short doc comment would prevent misuse across call sites.

rs/moq-lite/src/lite/subscribe.rs (1)

13-19: Clarify delivery_timeout units in Subscribe.
A brief comment would help keep producers/consumers aligned on units and semantics.

rs/moq-lite/src/ietf/subscriber.rs (1)

400-411: TODO: Delivery timeout check not yet implemented.

The _delivery_timeout parameter is accepted but unused, with a TODO comment indicating future implementation using frame.info.sent_timestamp. This is acceptable for WIP.

Would you like me to help implement the delivery timeout check logic, or open an issue to track this task?

Comment on lines +83 to +87
let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Validate the delivery_timeout presence flag.

Using u8::decode + == 1 silently accepts invalid values. Prefer bool::decode (or an explicit match) so invalid encodings are rejected.

🛠️ Proposed fix
-		let delivery_timeout = if u8::decode(r, version)? == 1 {
-			Some(u64::decode(r, version)?)
-		} else {
-			None
-		};
+		let has_timeout = bool::decode(r, version)?;
+		let delivery_timeout = if has_timeout {
+			Some(u64::decode(r, version)?)
+		} else {
+			None
+		};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
let has_timeout = bool::decode(r, version)?;
let delivery_timeout = if has_timeout {
Some(u64::decode(r, version)?)
} else {
None
};
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscribe.rs` around lines 83 - 87, The delivery_timeout
presence flag currently decodes with u8::decode(...) == 1 which silently accepts
invalid values; change this to use bool::decode(r, version) to validate the flag
(or decode the u8 and explicitly match 0/1 and return an Err for other values)
and then conditionally decode the u64 only when the flag is true; update the
code around delivery_timeout (referencing delivery_timeout, u8::decode,
bool::decode, r, version) to reject invalid encodings instead of treating any
non-1 as absent.

Comment on lines +235 to 246
// Set this track subscription to carry the delivery timeout
let mut track_with_timeout = track.clone();
track_with_timeout.delivery_timeout = delivery_timeout;

let mut state = self.state.lock();
state.subscribes.insert(
request_id,
TrackState {
producer: track.clone(),
producer: track_with_timeout.clone(),
alias: None,
},
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust

echo "---"

# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20

Repository: moq-dev/moq

Length of output: 3727


🏁 Script executed:

#!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20

Repository: moq-dev/moq

Length of output: 899


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40

Repository: moq-dev/moq

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f

# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30

Repository: moq-dev/moq

Length of output: 1598


🏁 Script executed:

#!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'

Repository: moq-dev/moq

Length of output: 814


🏁 Script executed:

#!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'

Repository: moq-dev/moq

Length of output: 956


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'

Repository: moq-dev/moq

Length of output: 1315


🏁 Script executed:

#!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50

Repository: moq-dev/moq

Length of output: 856


🏁 Script executed:

#!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'

Repository: moq-dev/moq

Length of output: 933


Remove redundant delivery_timeout assignment (lines 236-237).

The track received from broadcast.requested_track() is a clone of the producer created at broadcast.rs:209-214, which already has delivery_timeout set. Since TrackProducer derives Clone, the field is preserved. The assignment at line 237 is redundant and can be removed; use track directly instead of creating track_with_timeout.

🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 235 - 246, The code
redundantly clones `track` into `track_with_timeout` and reassigns
`delivery_timeout`; remove the extra clone and assignment and insert the
original `track` directly into the subscription state. Specifically, in the
block around `let mut state = self.state.lock();` and the
`state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant