chore: Move flow expiration from PQ to timers#1327
Conversation
There was a problem hiding this comment.
Pull request overview
This PR replaces the flow-table expiration mechanism (thread-local PQ + ExpirationsNF pipeline stage) with per-flow tokio timers that mark flows Expired, relying on lazy cleanup in lookup() / drain_stale().
Changes:
- Remove
ExpirationsNFand the thread-local priority-queue expiration machinery. - Add per-flow tokio timer spawning on insert, plus lazy time-based expiration in
lookup()and proactive stale draining on size threshold. - Update dataplane pipeline composition, tests, docs, and dependencies to match the new expiration model.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| nat/src/portfw/test.rs | Removes ExpirationsNF from the test pipeline and uses FlowTable::default(). |
| flow-entry/src/flow_table/thread_local_pq.rs | Deletes the thread-local PQ implementation used for expirations. |
| flow-entry/src/flow_table/table.rs | Stores Arc<FlowInfo> directly; spawns per-flow tokio timers; adds lazy expiration in lookup() and drain_stale(); updates tests. |
| flow-entry/src/flow_table/nf_lookup.rs | Updates tests to use tokio timing and removes the expirations stage dependency. |
| flow-entry/src/flow_table/nf_expirations.rs | Deletes ExpirationsNF. |
| flow-entry/src/flow_table/mod.rs | Removes module exports for expirations/PQ; keeps FlowLookup/FlowTable exports. |
| flow-entry/src/flow_table/display.rs | Simplifies display logic now that table values are strong Arcs. |
| flow-entry/src/flow_table/README.md | Updates documentation to describe tokio timers + lazy cleanup design. |
| flow-entry/Cargo.toml | Drops priority-queue/thread_local; adds tokio (and dev features for async tests). |
| dataplane/src/packet_processor/mod.rs | Removes ExpirationsNF from the router pipeline. |
| Cargo.lock | Removes priority-queue from flow-entry’s dependency set and adds tokio. |
flow-entry/src/flow_table/table.rs
Outdated
| if tokio::runtime::Handle::try_current().is_ok() { | ||
| let fi = val.clone(); | ||
| tokio::task::spawn(async move { | ||
| loop { | ||
| let deadline = fi.expires_at(); |
There was a problem hiding this comment.
confidence: 8
tags: [logic, other]insert_common spawns a new tokio task on every call whenever a runtime is present. This means repeated reinsert() calls (or inserting the same Arc<FlowInfo> again under the same key) will create multiple concurrent timers for the same flow, which can lead to unbounded task growth and extra wakeups.
Consider deduplicating timer creation (e.g., skip spawning when the replaced value is the same Arc via Arc::ptr_eq, or track a “timer started” flag per FlowInfo).
flow-entry/src/flow_table/table.rs
Outdated
| tokio::task::spawn(async move { | ||
| loop { | ||
| let deadline = fi.expires_at(); | ||
| tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await; | ||
| let new_deadline = fi.expires_at(); | ||
| if new_deadline > deadline { | ||
| // Deadline was extended (e.g. by StatefulNat); sleep again. | ||
| continue; | ||
| } | ||
| fi.update_status(FlowStatus::Expired); | ||
| break; | ||
| } | ||
| }); |
There was a problem hiding this comment.
confidence: 7
tags: [logic, other]The per-flow timer task holds an Arc<FlowInfo> until the deadline elapses, even if the entry is removed from the DashMap earlier (e.g., via lookup() removing Cancelled/Expired entries or drain_stale). With many flows and long timeouts this can keep a large number of FlowInfo allocations (and their timer tasks) alive longer than necessary.
If early cleanup matters, consider wiring a cancellation mechanism (e.g., store/abort a JoinHandle, use a cancellation token/notify in FlowInfo, or have a single shared reaper task rather than one task per flow).
| // Entries are all gone (removed by per-flow tokio timers) | ||
| let num_entries = flow_table.len().unwrap(); |
There was a problem hiding this comment.
confidence: 9
tags: [docs, style]This test comment says entries are “removed by per-flow tokio timers”, but the current implementation only marks flows as Expired in the timer and relies on lazy removal (e.g., lookup() / drain_stale) to actually delete DashMap entries. Consider rewording this to reflect the real behavior, or add an explicit cleanup step if the test intends to assert removal rather than just expiration.
| #[allow(clippy::len_without_is_empty)] | ||
| /// Tell how many flows are in the table if it can be locked | ||
| /// This is mostly for testing | ||
| /// Returns the number of *active* (non-expired, non-cancelled) flows in the table. | ||
| /// This is mostly for testing. | ||
| pub fn len(&self) -> Option<usize> { | ||
| let table = self.table.try_read().ok()?; | ||
| Some(table.len()) | ||
| Some( | ||
| table | ||
| .iter() | ||
| .filter(|e| e.value().status() == FlowStatus::Active) | ||
| .count(), | ||
| ) |
There was a problem hiding this comment.
confidence: 8
tags: [api_design]FlowTable::len() now does an O(n) scan and returns only the count of Active entries (not the raw map size). Since the method name strongly implies a cheap “number of entries” operation, this is easy to misuse in non-test code (e.g., stats/telemetry paths) and can produce surprising results.
Consider either renaming this to something like active_len() (and/or adding a separate raw_len()), or making the behavior more explicit in the API so callers don’t accidentally rely on len() for map size/perf-sensitive code.
27dedc9 to
a08d109
Compare
flow-entry/src/flow_table/table.rs
Outdated
| // Proactively purge stale entries when the raw table size exceeds the threshold. | ||
| // This bounds memory growth when flows expire faster than they are looked up, | ||
| // since expired entries otherwise accumulate in the `DashMap` until a lookup hits them. | ||
| let raw_len = table.len(); | ||
| if raw_len > self.reap_threshold { | ||
| warn!( | ||
| "The number of flows ({raw_len}) exceeds {}. Reaping stale entries...", | ||
| self.reap_threshold | ||
| ); | ||
| Self::drain_stale_with_read_lock(&table); | ||
| } |
There was a problem hiding this comment.
confidence: 9
tags: [logic]insert_common triggers a full-table drain_stale_with_read_lock() scan on every insert once table.len() exceeds reap_threshold. If the table is legitimately large but mostly Active (so drain_stale removes little or nothing), this becomes O(n) work per insert and can degrade to O(n²) behavior under load.
Consider adding hysteresis/cooldown so the expensive scan only happens when the size crosses the threshold (from below), or at most once per time window / per X inserts, or only if a previous scan actually removed entries (and otherwise back off).
There was a problem hiding this comment.
The inserts_while_large: AtomicUsize counter + DRAIN_EVERY_N_INSERTS = 1000 constant were added specifically for this. The drain only fires on insert 0, 1000, 2000, etc while the table stays over threshold, turning O(n) per insert into O(n/1000) amortized.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.
You can also share your feedback on Copilot code review. Take the survey.
flow-entry/src/flow_table/table.rs
Outdated
| let to_remove: Vec<FlowKey> = table | ||
| .iter() | ||
| .filter_map(|entry| { | ||
| let val = entry.value(); | ||
| match val.status() { | ||
| FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()), | ||
| FlowStatus::Active if val.expires_at() <= now => { | ||
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | ||
| val.update_status(FlowStatus::Expired); | ||
| Some(*entry.key()) | ||
| } | ||
| FlowStatus::Active => None, | ||
| } | ||
| }) | ||
| .collect(); | ||
| let removed = to_remove.len(); | ||
| for key in &to_remove { | ||
| table.remove(key); | ||
| } |
There was a problem hiding this comment.
confidence: 8
tags: [logic]drain_stale_with_read_lock() first collects FlowKeys to remove and then removes by key. Since the outer lock is a shared read lock, inserts/replacements for the same key can happen concurrently; this creates an ABA race where a freshly-inserted active flow under a reused key can be removed because the key was collected when an older entry was stale.
Consider removing conditionally (re-check status/deadline at removal time) or using a DashMap API that can filter/retain under the shard lock to avoid removing a different value than the one you inspected.
| let to_remove: Vec<FlowKey> = table | |
| .iter() | |
| .filter_map(|entry| { | |
| let val = entry.value(); | |
| match val.status() { | |
| FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()), | |
| FlowStatus::Active if val.expires_at() <= now => { | |
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | |
| val.update_status(FlowStatus::Expired); | |
| Some(*entry.key()) | |
| } | |
| FlowStatus::Active => None, | |
| } | |
| }) | |
| .collect(); | |
| let removed = to_remove.len(); | |
| for key in &to_remove { | |
| table.remove(key); | |
| } | |
| let mut removed = 0usize; | |
| // Use DashMap::retain so that the decision to remove an entry is made | |
| // while holding the shard lock for that entry, avoiding an ABA race on | |
| // keys that might be concurrently reused. | |
| table.retain(|_key, val| { | |
| let is_stale = match val.status() { | |
| FlowStatus::Expired | FlowStatus::Cancelled => true, | |
| FlowStatus::Active if val.expires_at() <= now => { | |
| // Deadline passed but the tokio timer has not fired yet; mark and remove. | |
| val.update_status(FlowStatus::Expired); | |
| true | |
| } | |
| FlowStatus::Active => false, | |
| }; | |
| if is_stale { | |
| removed += 1; | |
| false | |
| } else { | |
| true | |
| } | |
| }); |
| let table = self.table.read().unwrap(); | ||
| let item = table.get(flow_key)?.upgrade(); | ||
| let Some(item) = item else { | ||
| debug!( | ||
| "lookup: Removing flow key {:?}, found empty weak reference", | ||
| flow_key | ||
| ); | ||
| Self::remove_with_read_lock(&table, flow_key); | ||
| return None; | ||
| }; | ||
| let item = table.get(flow_key)?.value().clone(); | ||
| let status = item.status(); |
There was a problem hiding this comment.
confidence: 8
tags: [logic]lookup() clones the Arc<FlowInfo> out of the DashMap and later (on the expired/cancelled paths) calls remove_with_read_lock() by key. Because the DashMap entry guard is dropped before the removal, another thread can replace the entry for the same key in between, and this removal can delete the new flow instead of the stale one.
Consider doing an atomic/conditional removal (e.g., remove only if the current value is still the one you examined, or re-check staleness under the shard lock before removing).
flow-entry/src/flow_table/table.rs
Outdated
| // Upgrade to check status and read the deadline. If the Arc has | ||
| // already been dropped (no DashMap entry, no in-flight holders), | ||
| // there is nothing left to expire. | ||
| let Some(fi) = fi_weak.upgrade() else { break }; |
There was a problem hiding this comment.
This check could be made before (outside) spawning a task, right?
flow-entry/src/flow_table/table.rs
Outdated
| break; | ||
| } | ||
| let deadline = fi.expires_at(); | ||
| // Drop the strong ref before sleeping so this task does not |
There was a problem hiding this comment.
Why do we need this? I mean changing from strong to weak references?
A flow entry will live as long as it is in the table and it has a timer (task).
If the flow is removed from the table, it will be invisible. It may remain in memory, but as soon as the task finishes, it will be gone, right?
There was a problem hiding this comment.
Well yep, I rework this to strong refs
flow-entry/src/flow_table/table.rs
Outdated
| "The number of flows ({raw_len}) exceeds {}. Reaping stale entries...", | ||
| self.reap_threshold | ||
| ); | ||
| Self::drain_stale_with_read_lock(&table); |
There was a problem hiding this comment.
If I understand correctly, this is the only mechanism now to delete flow entries?
Fredi-raspall
left a comment
There was a problem hiding this comment.
I have similar concerns that copilot.
I would actually keep a join-handle in the flow info to be able to cancel flows that are no longer wanted. The existing removal mechanism relies on time, but we have use cases were we want to explicitly remove/invalidate/cancel a flow. Right now, we do that by changing the status to Cancel (see method invalidate()). It may be a good idea to extend the invalidate to one where the task is cancelled from the join handle?
Lastly, there is the scaling concern. Maybe we should add some tests for that?
cf06878 to
30a152e
Compare
flow-entry/src/flow_table/table.rs
Outdated
| let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val)); | ||
| if need_timer { | ||
| if tokio::runtime::Handle::try_current().is_err() { | ||
| warn!( |
There was a problem hiding this comment.
confidence: 9
tags: [logic]In non-tokio contexts this warn! will fire on every insert, which can spam logs (and slow tests) since many unit tests insert flows outside a runtime. Consider downgrading this to debug!, or logging it only once (e.g., with a OnceLock/AtomicBool) and/or including a counter/rate-limiter so repeated inserts don’t produce unbounded warning output.
| warn!( | |
| debug!( |
There was a problem hiding this comment.
This is a legit concern, but moving this to debug! unconditionally also seems bad. What is a good compromise so we don't spam logs in unit tests but get the warning in production?
There was a problem hiding this comment.
Maybe we can collapse here in panic? I thought as 2nd option here
|
Apart from the AI co-authorship, this looks like it is correct to me. We'll merge after 26.01 releases so it gets some extended runtime before shipping. Also, please address the copilot comments that make sense to address such as the one about the warn! in unit tests. |
|
@sergeymatov, @Fredi-raspall Can you resolve any conversations that have been addressed for this PR? |
|
Ignore my previous comment about this being fine, I think we have a reap issue here where expired flows will never be garbage collected in practice. See my inline comment. |
30a152e to
3484191
Compare
flow-entry/src/flow_table/table.rs
Outdated
| // In non-tokio contexts (shuttle tests, sync unit tests) a warning is logged | ||
| // and no timer is spawned; lazy time-checking in `lookup` handles expiration. |
There was a problem hiding this comment.
confidence: 7
tags: [docs]The comment says “In non-tokio contexts … a warning is logged”, but the code logs at debug! when no runtime is present. This can mislead readers when diagnosing expiration behavior.
Update the comment to reflect the actual log level/behavior (or change the log level to match the comment).
| // In non-tokio contexts (shuttle tests, sync unit tests) a warning is logged | |
| // and no timer is spawned; lazy time-checking in `lookup` handles expiration. | |
| // In non-tokio contexts (shuttle tests, sync unit tests) a debug log message | |
| // is emitted and no timer is spawned; lazy time-checking in `lookup` handles | |
| // expiration. |
flow-entry/src/flow_table/table.rs
Outdated
| if fi.status() != FlowStatus::Active { | ||
| break; | ||
| } | ||
| let new_deadline = fi.expires_at(); | ||
| if new_deadline > deadline { | ||
| // Deadline was extended (e.g. by StatefulNat); sleep again. | ||
| deadline = new_deadline; | ||
| continue; | ||
| } | ||
| fi.update_status(FlowStatus::Expired); | ||
| // Remove the entry from the DashMap. If the entry was replaced in the meantime, | ||
| // the `ptr_eq` will make sure we don't remove the new entry that replaced it. | ||
| if let Ok(table) = table_arc.read() { | ||
| table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi)); | ||
| } |
There was a problem hiding this comment.
confidence: 9
tags: [logic]In the per-flow timer task, when fi.status() != FlowStatus::Active the loop breaks without removing the flow’s DashMap entry. That means flows that are Cancelled (or externally marked Expired) can stay in the table indefinitely if they’re never looked up again, defeating the goal of timer-driven cleanup.
Consider removing the table entry (with the existing remove_if + Arc::ptr_eq guard) before breaking in the non-Active branch, so non-Active flows are eventually collected even without subsequent lookups.
flow-entry/src/flow_table/table.rs
Outdated
| use std::borrow::Borrow; | ||
| use std::fmt::Debug; | ||
| use std::hash::Hash; | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; |
There was a problem hiding this comment.
confidence: 8
tags: [style]This file imports AtomicUsize/Ordering from std::sync::atomic, but the codebase generally uses concurrency::sync::atomic::* so tests under shuttle/loom can model atomics correctly (e.g. nat/src/stateful/apalloc/port_alloc.rs). Using std atomics here may reduce determinism/coverage under those concurrency-test modes.
Prefer concurrency::sync::atomic::{AtomicUsize, Ordering} for inserts_while_large.
| use std::sync::atomic::{AtomicUsize, Ordering}; | |
| use concurrency::sync::atomic::{AtomicUsize, Ordering}; |
flow-entry/src/flow_table/README.md
Outdated
| When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is | ||
| present) that sleeps until the flow's deadline and then calls | ||
| `update_status(FlowStatus::Expired)`. The DashMap entry is not removed by the | ||
| timer; instead, `lookup()` performs lazy cleanup: if a looked-up entry is | ||
| `Expired` or `Cancelled` it is removed from the map and `None` is returned. | ||
| The same lazy path covers the case where a deadline passes without a timer | ||
| firing (e.g. in non-tokio test contexts). |
There was a problem hiding this comment.
confidence: 9
tags: [docs]The README says the tokio timer “does not remove” the DashMap entry and that cleanup is only done lazily in lookup(), but the current implementation in FlowTable::insert_common does call remove_if from the timer task.
Please update this section to match the actual behavior (timer removes its own entry, with lazy cleanup as a fallback), or adjust the code to match the documented behavior.
309cf01 to
0cd4412
Compare
flow-entry/src/flow_table/table.rs
Outdated
| if let Ok(table) = table_arc.read() { | ||
| table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi)); | ||
| } |
There was a problem hiding this comment.
confidence: 8
tags: [logic]The expiration task silently ignores a poisoned RwLock (if let Ok(table) = table_arc.read() { ... }). If the lock is poisoned, the task will skip removing the entry, leaving an Expired/Cancelled flow resident indefinitely and making this failure mode hard to detect.
Consider handling the poisoned case explicitly (e.g., log a warn!/error! and still proceed with into_inner(), or expect/unwrap to match the rest of FlowTable’s “panics on poisoned lock” contract).
| if let Ok(table) = table_arc.read() { | |
| table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi)); | |
| } | |
| let table = table_arc.read().unwrap_or_else(|poisoned| { | |
| warn!( | |
| "flow expiration task: FlowTable RwLock poisoned; \ | |
| proceeding with possibly inconsistent table state" | |
| ); | |
| poisoned.into_inner() | |
| }); | |
| table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi)); |
flow-entry/src/flow_table/README.md
Outdated
| `Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the deadline, | ||
| handles extensions by re-sleeping, and on exit (deadline elapsed, Cancelled, or | ||
| externally Expired) marks the flow `Expired` and removes its DashMap entry via | ||
| `remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact. |
There was a problem hiding this comment.
confidence: 9
tags: [docs]README claims the timer task “on exit (deadline elapsed, Cancelled, or externally Expired) marks the flow Expired”. In the implementation, the task only calls update_status(Expired) on the deadline-elapsed path; if the flow is already Cancelled/Expired, it just breaks and attempts removal, preserving the existing status (and it won’t observe that status change until it wakes).
Please adjust the README wording to reflect the actual behavior (which status transitions happen in each path, and when cancellation/external expiry is observed by the task).
| `Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the deadline, | |
| handles extensions by re-sleeping, and on exit (deadline elapsed, Cancelled, or | |
| externally Expired) marks the flow `Expired` and removes its DashMap entry via | |
| `remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact. | |
| `Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the current | |
| deadline and handles extensions by re-sleeping. When it wakes and finds that | |
| the deadline has elapsed (and the flow is not already in a terminal state), it | |
| marks the flow `Expired` and removes its DashMap entry via `remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact. If, instead, the flow has | |
| already been marked `Cancelled` or `Expired` by some other path while the task | |
| was sleeping, the task exits without changing the status and only attempts the | |
| same conditional removal; such external changes are observed only when the task | |
| next wakes. |
0cd4412 to
2184f31
Compare
| async fn test_lookup_nf_with_expiration() { | ||
| let flow_table = Arc::new(FlowTable::default()); | ||
| let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone()); | ||
| let flowinfo_creator = FlowInfoCreator::new(flow_table.clone(), Duration::from_secs(1)); |
There was a problem hiding this comment.
confidence: 9
tags: [logic, style]These tests construct Arc<FlowTable> using std::sync::Arc, but FlowLookup::new expects concurrency::sync::Arc. This can break compilation when building/running tests with the shuttle feature (where concurrency::sync::Arc is not std::sync::Arc). Prefer importing/using concurrency::sync::Arc in the test module for consistency with the crate’s concurrency abstraction.
| ) -> (Arc<FlowTable>, DynPipeline<TestBuffer>, PortFwTableWriter) { | ||
| // build a pipeline with flow lookup + port forwarder | ||
| let mut writer = PortFwTableWriter::new(); | ||
| let flow_table = Arc::new(FlowTable::new(1024)); | ||
| let flow_table = Arc::new(FlowTable::default()); | ||
| let flow_lookup_nf = FlowLookup::new("flow-lookup", flow_table.clone()); |
There was a problem hiding this comment.
confidence: 8
tags: [style]This test module uses std::sync::Arc, but other NAT modules/tests use concurrency::sync::Arc to remain compatible with the workspace’s shuttle concurrency mode. To avoid type mismatches when running cargo test --features shuttle, consider switching this file to concurrency::sync::Arc as well (and using it consistently for Arc<FlowTable>/Arc<PortFwEntry>).
flow-entry/src/flow_table/README.md
Outdated
| retain a flow (e.g. pipeline stages that tag packets) clone the `Arc` out of | ||
| `lookup()`. | ||
|
|
||
| When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is |
There was a problem hiding this comment.
I just realized that this might be an issue as an expired flow may never be looked up again. We probably need to have a mechanism to actively reap this on expiry. Can we still go back to the WeakPtr idea with the tokio timer callback holding the Arc? I'm open to other mechanisms.
| ) -> usize { | ||
| let now = Instant::now(); | ||
| let mut removed = 0usize; | ||
| // `retain` holds the write lock on each DashMap shard while evaluating the |
There was a problem hiding this comment.
I'm not sure this is right, retain will acquire the shard write lock but it is not guaranteed to hold it across the whole retain call, it could repeatedly lock it.
The bigger issue is that no one can have a reference to any element in the DashMap when running retain, is that the case?
| // guarantees each concurrent caller gets a unique value, ensuring exactly | ||
| // one drain per N inserts even under concurrent inserts. | ||
| let raw_len = table.len(); | ||
| if raw_len > self.reap_threshold { |
There was a problem hiding this comment.
The main issue here is that if we have a lot of flows, we will try to reap on every insert. Reaping may not put us below the threshold. We need a better approach to only reap periodically, not every time just because we have a lot of flows.
2184f31 to
1fd725e
Compare
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
1fd725e to
4934253
Compare
The previous expiration mechanism used a thread-local
PriorityQueueplus anExpirationsNFpipeline stage that had to run on every packet batch to process expired entries.New approach spawns per-flow timer. It sleeps till it's marked as
FlowStatus::Expiredwhen the deadline is confirmed.DashMapcleanup is intentionally deferred and it's no longer storing weak refs butArc<FlowInfo>.