Skip to content

chore: Move flow expiration from PQ to timers#1327

Open
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/new-flow-timers
Open

chore: Move flow expiration from PQ to timers#1327
sergeymatov wants to merge 1 commit intomainfrom
pr/smatov/new-flow-timers

Conversation

@sergeymatov
Copy link
Contributor

The previous expiration mechanism used a thread-local PriorityQueue plus an ExpirationsNF pipeline stage that had to run on every packet batch to process expired entries.

New approach spawns per-flow timer. It sleeps till it's marked as FlowStatus::Expired when the deadline is confirmed. DashMap cleanup is intentionally deferred and it's no longer storing weak refs but Arc<FlowInfo>.

@sergeymatov sergeymatov requested a review from a team as a code owner March 10, 2026 08:37
@sergeymatov sergeymatov requested review from Copilot and daniel-noland and removed request for a team March 10, 2026 08:37
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the flow-table expiration mechanism (thread-local PQ + ExpirationsNF pipeline stage) with per-flow tokio timers that mark flows Expired, relying on lazy cleanup in lookup() / drain_stale().

Changes:

  • Remove ExpirationsNF and the thread-local priority-queue expiration machinery.
  • Add per-flow tokio timer spawning on insert, plus lazy time-based expiration in lookup() and proactive stale draining on size threshold.
  • Update dataplane pipeline composition, tests, docs, and dependencies to match the new expiration model.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
nat/src/portfw/test.rs Removes ExpirationsNF from the test pipeline and uses FlowTable::default().
flow-entry/src/flow_table/thread_local_pq.rs Deletes the thread-local PQ implementation used for expirations.
flow-entry/src/flow_table/table.rs Stores Arc<FlowInfo> directly; spawns per-flow tokio timers; adds lazy expiration in lookup() and drain_stale(); updates tests.
flow-entry/src/flow_table/nf_lookup.rs Updates tests to use tokio timing and removes the expirations stage dependency.
flow-entry/src/flow_table/nf_expirations.rs Deletes ExpirationsNF.
flow-entry/src/flow_table/mod.rs Removes module exports for expirations/PQ; keeps FlowLookup/FlowTable exports.
flow-entry/src/flow_table/display.rs Simplifies display logic now that table values are strong Arcs.
flow-entry/src/flow_table/README.md Updates documentation to describe tokio timers + lazy cleanup design.
flow-entry/Cargo.toml Drops priority-queue/thread_local; adds tokio (and dev features for async tests).
dataplane/src/packet_processor/mod.rs Removes ExpirationsNF from the router pipeline.
Cargo.lock Removes priority-queue from flow-entry’s dependency set and adds tokio.

Comment on lines +196 to +200
if tokio::runtime::Handle::try_current().is_ok() {
let fi = val.clone();
tokio::task::spawn(async move {
loop {
let deadline = fi.expires_at();
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic, other]

insert_common spawns a new tokio task on every call whenever a runtime is present. This means repeated reinsert() calls (or inserting the same Arc<FlowInfo> again under the same key) will create multiple concurrent timers for the same flow, which can lead to unbounded task growth and extra wakeups.

Consider deduplicating timer creation (e.g., skip spawning when the replaced value is the same Arc via Arc::ptr_eq, or track a “timer started” flag per FlowInfo).

Copilot uses AI. Check for mistakes.
Comment on lines +198 to +210
tokio::task::spawn(async move {
loop {
let deadline = fi.expires_at();
tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await;
let new_deadline = fi.expires_at();
if new_deadline > deadline {
// Deadline was extended (e.g. by StatefulNat); sleep again.
continue;
}
fi.update_status(FlowStatus::Expired);
break;
}
});
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 7
tags: [logic, other]

The per-flow timer task holds an Arc<FlowInfo> until the deadline elapses, even if the entry is removed from the DashMap earlier (e.g., via lookup() removing Cancelled/Expired entries or drain_stale). With many flows and long timeouts this can keep a large number of FlowInfo allocations (and their timer tasks) alive longer than necessary.

If early cleanup matters, consider wiring a cancellation mechanism (e.g., store/abort a JoinHandle, use a cancellation token/notify in FlowInfo, or have a single shared reaper task rather than one task per flow).

Copilot uses AI. Check for mistakes.
Comment on lines 167 to 168
// Entries are all gone (removed by per-flow tokio timers)
let num_entries = flow_table.len().unwrap();
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [docs, style]

This test comment says entries are “removed by per-flow tokio timers”, but the current implementation only marks flows as Expired in the timer and relies on lazy removal (e.g., lookup() / drain_stale) to actually delete DashMap entries. Consider rewording this to reflect the real behavior, or add an explicit cleanup step if the test intends to assert removal rather than just expiration.

Copilot uses AI. Check for mistakes.
Comment on lines 335 to +345
#[allow(clippy::len_without_is_empty)]
/// Tell how many flows are in the table if it can be locked
/// This is mostly for testing
/// Returns the number of *active* (non-expired, non-cancelled) flows in the table.
/// This is mostly for testing.
pub fn len(&self) -> Option<usize> {
let table = self.table.try_read().ok()?;
Some(table.len())
Some(
table
.iter()
.filter(|e| e.value().status() == FlowStatus::Active)
.count(),
)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [api_design]

FlowTable::len() now does an O(n) scan and returns only the count of Active entries (not the raw map size). Since the method name strongly implies a cheap “number of entries” operation, this is easy to misuse in non-test code (e.g., stats/telemetry paths) and can produce surprising results.

Consider either renaming this to something like active_len() (and/or adding a separate raw_len()), or making the behavior more explicit in the API so callers don’t accidentally rely on len() for map size/perf-sensitive code.

Copilot uses AI. Check for mistakes.
@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch 4 times, most recently from 27dedc9 to a08d109 Compare March 11, 2026 13:32
@sergeymatov sergeymatov requested a review from Copilot March 11, 2026 16:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

Comment on lines +179 to +189
// Proactively purge stale entries when the raw table size exceeds the threshold.
// This bounds memory growth when flows expire faster than they are looked up,
// since expired entries otherwise accumulate in the `DashMap` until a lookup hits them.
let raw_len = table.len();
if raw_len > self.reap_threshold {
warn!(
"The number of flows ({raw_len}) exceeds {}. Reaping stale entries...",
self.reap_threshold
);
Self::drain_stale_with_read_lock(&table);
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [logic]

insert_common triggers a full-table drain_stale_with_read_lock() scan on every insert once table.len() exceeds reap_threshold. If the table is legitimately large but mostly Active (so drain_stale removes little or nothing), this becomes O(n) work per insert and can degrade to O(n²) behavior under load.

Consider adding hysteresis/cooldown so the expensive scan only happens when the size crosses the threshold (from below), or at most once per time window / per X inserts, or only if a previous scan actually removed entries (and otherwise back off).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@sergeymatov sergeymatov Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inserts_while_large: AtomicUsize counter + DRAIN_EVERY_N_INSERTS = 1000 constant were added specifically for this. The drain only fires on insert 0, 1000, 2000, etc while the table stays over threshold, turning O(n) per insert into O(n/1000) amortized.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.


You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +342 to +360
let to_remove: Vec<FlowKey> = table
.iter()
.filter_map(|entry| {
let val = entry.value();
match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()),
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
Some(*entry.key())
}
FlowStatus::Active => None,
}
})
.collect();
let removed = to_remove.len();
for key in &to_remove {
table.remove(key);
}
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

drain_stale_with_read_lock() first collects FlowKeys to remove and then removes by key. Since the outer lock is a shared read lock, inserts/replacements for the same key can happen concurrently; this creates an ABA race where a freshly-inserted active flow under a reused key can be removed because the key was collected when an older entry was stale.

Consider removing conditionally (re-check status/deadline at removal time) or using a DashMap API that can filter/retain under the shard lock to avoid removing a different value than the one you inspected.

Suggested change
let to_remove: Vec<FlowKey> = table
.iter()
.filter_map(|entry| {
let val = entry.value();
match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()),
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
Some(*entry.key())
}
FlowStatus::Active => None,
}
})
.collect();
let removed = to_remove.len();
for key in &to_remove {
table.remove(key);
}
let mut removed = 0usize;
// Use DashMap::retain so that the decision to remove an entry is made
// while holding the shard lock for that entry, avoiding an ABA race on
// keys that might be concurrently reused.
table.retain(|_key, val| {
let is_stale = match val.status() {
FlowStatus::Expired | FlowStatus::Cancelled => true,
FlowStatus::Active if val.expires_at() <= now => {
// Deadline passed but the tokio timer has not fired yet; mark and remove.
val.update_status(FlowStatus::Expired);
true
}
FlowStatus::Active => false,
};
if is_stale {
removed += 1;
false
} else {
true
}
});

Copilot uses AI. Check for mistakes.
Comment on lines 269 to 271
let table = self.table.read().unwrap();
let item = table.get(flow_key)?.upgrade();
let Some(item) = item else {
debug!(
"lookup: Removing flow key {:?}, found empty weak reference",
flow_key
);
Self::remove_with_read_lock(&table, flow_key);
return None;
};
let item = table.get(flow_key)?.value().clone();
let status = item.status();
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

lookup() clones the Arc<FlowInfo> out of the DashMap and later (on the expired/cancelled paths) calls remove_with_read_lock() by key. Because the DashMap entry guard is dropped before the removal, another thread can replace the entry for the same key in between, and this removal can delete the new flow instead of the stale one.

Consider doing an atomic/conditional removal (e.g., remove only if the current value is still the one you examined, or re-check staleness under the shard lock before removing).

Copilot uses AI. Check for mistakes.
// Upgrade to check status and read the deadline. If the Arc has
// already been dropped (no DashMap entry, no in-flight holders),
// there is nothing left to expire.
let Some(fi) = fi_weak.upgrade() else { break };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check could be made before (outside) spawning a task, right?

break;
}
let deadline = fi.expires_at();
// Drop the strong ref before sleeping so this task does not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I mean changing from strong to weak references?
A flow entry will live as long as it is in the table and it has a timer (task).
If the flow is removed from the table, it will be invisible. It may remain in memory, but as soon as the task finishes, it will be gone, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yep, I rework this to strong refs

"The number of flows ({raw_len}) exceeds {}. Reaping stale entries...",
self.reap_threshold
);
Self::drain_stale_with_read_lock(&table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this is the only mechanism now to delete flow entries?

Copy link
Contributor

@Fredi-raspall Fredi-raspall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have similar concerns that copilot.
I would actually keep a join-handle in the flow info to be able to cancel flows that are no longer wanted. The existing removal mechanism relies on time, but we have use cases were we want to explicitly remove/invalidate/cancel a flow. Right now, we do that by changing the status to Cancel (see method invalidate()). It may be a good idea to extend the invalidate to one where the task is cancelled from the join handle?
Lastly, there is the scaling concern. Maybe we should add some tests for that?

@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch 5 times, most recently from cf06878 to 30a152e Compare March 23, 2026 14:30
@sergeymatov sergeymatov requested a review from Copilot March 23, 2026 14:38
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val));
if need_timer {
if tokio::runtime::Handle::try_current().is_err() {
warn!(
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [logic]

In non-tokio contexts this warn! will fire on every insert, which can spam logs (and slow tests) since many unit tests insert flows outside a runtime. Consider downgrading this to debug!, or logging it only once (e.g., with a OnceLock/AtomicBool) and/or including a counter/rate-limiter so repeated inserts don’t produce unbounded warning output.

Suggested change
warn!(
debug!(

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a legit concern, but moving this to debug! unconditionally also seems bad. What is a good compromise so we don't spam logs in unit tests but get the warning in production?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can collapse here in panic? I thought as 2nd option here

Copy link
Contributor

@mvachhar mvachhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was AI used in this PR? If so, you should update the commit message to include the AI. See 6644ee9
for an example.

@mvachhar
Copy link
Contributor

mvachhar commented Mar 23, 2026

Apart from the AI co-authorship, this looks like it is correct to me. We'll merge after 26.01 releases so it gets some extended runtime before shipping. Also, please address the copilot comments that make sense to address such as the one about the warn! in unit tests.

@mvachhar
Copy link
Contributor

@sergeymatov, @Fredi-raspall Can you resolve any conversations that have been addressed for this PR?

@mvachhar
Copy link
Contributor

Ignore my previous comment about this being fine, I think we have a reap issue here where expired flows will never be garbage collected in practice. See my inline comment.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.

Comment on lines +224 to +225
// In non-tokio contexts (shuttle tests, sync unit tests) a warning is logged
// and no timer is spawned; lazy time-checking in `lookup` handles expiration.
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 7
tags: [docs]

The comment says “In non-tokio contexts … a warning is logged”, but the code logs at debug! when no runtime is present. This can mislead readers when diagnosing expiration behavior.

Update the comment to reflect the actual log level/behavior (or change the log level to match the comment).

Suggested change
// In non-tokio contexts (shuttle tests, sync unit tests) a warning is logged
// and no timer is spawned; lazy time-checking in `lookup` handles expiration.
// In non-tokio contexts (shuttle tests, sync unit tests) a debug log message
// is emitted and no timer is spawned; lazy time-checking in `lookup` handles
// expiration.

Copilot uses AI. Check for mistakes.
Comment on lines +247 to +261
if fi.status() != FlowStatus::Active {
break;
}
let new_deadline = fi.expires_at();
if new_deadline > deadline {
// Deadline was extended (e.g. by StatefulNat); sleep again.
deadline = new_deadline;
continue;
}
fi.update_status(FlowStatus::Expired);
// Remove the entry from the DashMap. If the entry was replaced in the meantime,
// the `ptr_eq` will make sure we don't remove the new entry that replaced it.
if let Ok(table) = table_arc.read() {
table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi));
}
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [logic]

In the per-flow timer task, when fi.status() != FlowStatus::Active the loop breaks without removing the flow’s DashMap entry. That means flows that are Cancelled (or externally marked Expired) can stay in the table indefinitely if they’re never looked up again, defeating the goal of timer-driven cleanup.

Consider removing the table entry (with the existing remove_if + Arc::ptr_eq guard) before breaking in the non-Active branch, so non-Active flows are eventually collected even without subsequent lookups.

Copilot uses AI. Check for mistakes.
use std::borrow::Borrow;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [style]

This file imports AtomicUsize/Ordering from std::sync::atomic, but the codebase generally uses concurrency::sync::atomic::* so tests under shuttle/loom can model atomics correctly (e.g. nat/src/stateful/apalloc/port_alloc.rs). Using std atomics here may reduce determinism/coverage under those concurrency-test modes.

Prefer concurrency::sync::atomic::{AtomicUsize, Ordering} for inserts_while_large.

Suggested change
use std::sync::atomic::{AtomicUsize, Ordering};
use concurrency::sync::atomic::{AtomicUsize, Ordering};

Copilot uses AI. Check for mistakes.
Comment on lines +13 to +19
When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is
present) that sleeps until the flow's deadline and then calls
`update_status(FlowStatus::Expired)`. The DashMap entry is not removed by the
timer; instead, `lookup()` performs lazy cleanup: if a looked-up entry is
`Expired` or `Cancelled` it is removed from the map and `None` is returned.
The same lazy path covers the case where a deadline passes without a timer
firing (e.g. in non-tokio test contexts).
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [docs]

The README says the tokio timer “does not remove” the DashMap entry and that cleanup is only done lazily in lookup(), but the current implementation in FlowTable::insert_common does call remove_if from the timer task.

Please update this section to match the actual behavior (timer removes its own entry, with lazy cleanup as a fallback), or adjust the code to match the documented behavior.

Copilot uses AI. Check for mistakes.
@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch 2 times, most recently from 309cf01 to 0cd4412 Compare March 24, 2026 14:05
@sergeymatov sergeymatov requested a review from Copilot March 24, 2026 14:14
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

Comment on lines +263 to +265
if let Ok(table) = table_arc.read() {
table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi));
}
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [logic]

The expiration task silently ignores a poisoned RwLock (if let Ok(table) = table_arc.read() { ... }). If the lock is poisoned, the task will skip removing the entry, leaving an Expired/Cancelled flow resident indefinitely and making this failure mode hard to detect.

Consider handling the poisoned case explicitly (e.g., log a warn!/error! and still proceed with into_inner(), or expect/unwrap to match the rest of FlowTable’s “panics on poisoned lock” contract).

Suggested change
if let Ok(table) = table_arc.read() {
table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi));
}
let table = table_arc.read().unwrap_or_else(|poisoned| {
warn!(
"flow expiration task: FlowTable RwLock poisoned; \
proceeding with possibly inconsistent table state"
);
poisoned.into_inner()
});
table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi));

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +21
`Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the deadline,
handles extensions by re-sleeping, and on exit (deadline elapsed, Cancelled, or
externally Expired) marks the flow `Expired` and removes its DashMap entry via
`remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact.
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [docs]

README claims the timer task “on exit (deadline elapsed, Cancelled, or externally Expired) marks the flow Expired”. In the implementation, the task only calls update_status(Expired) on the deadline-elapsed path; if the flow is already Cancelled/Expired, it just breaks and attempts removal, preserving the existing status (and it won’t observe that status change until it wakes).

Please adjust the README wording to reflect the actual behavior (which status transitions happen in each path, and when cancellation/external expiry is observed by the task).

Suggested change
`Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the deadline,
handles extensions by re-sleeping, and on exit (deadline elapsed, Cancelled, or
externally Expired) marks the flow `Expired` and removes its DashMap entry via
`remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact.
`Arc<RwLock<Table>>`, and the `FlowKey`. The task sleeps until the current
deadline and handles extensions by re-sleeping. When it wakes and finds that
the deadline has elapsed (and the flow is not already in a terminal state), it
marks the flow `Expired` and removes its DashMap entry via `remove_if + Arc::ptr_eq` — leaving any concurrent replacement intact. If, instead, the flow has
already been marked `Cancelled` or `Expired` by some other path while the task
was sleeping, the task exits without changing the status and only attempts the
same conditional removal; such external changes are observed only when the task
next wakes.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 2 comments.

Comment on lines +142 to 145
async fn test_lookup_nf_with_expiration() {
let flow_table = Arc::new(FlowTable::default());
let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone());
let flowinfo_creator = FlowInfoCreator::new(flow_table.clone(), Duration::from_secs(1));
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 9
tags: [logic, style]

These tests construct Arc<FlowTable> using std::sync::Arc, but FlowLookup::new expects concurrency::sync::Arc. This can break compilation when building/running tests with the shuttle feature (where concurrency::sync::Arc is not std::sync::Arc). Prefer importing/using concurrency::sync::Arc in the test module for consistency with the crate’s concurrency abstraction.

Copilot uses AI. Check for mistakes.
Comment on lines 186 to 190
) -> (Arc<FlowTable>, DynPipeline<TestBuffer>, PortFwTableWriter) {
// build a pipeline with flow lookup + port forwarder
let mut writer = PortFwTableWriter::new();
let flow_table = Arc::new(FlowTable::new(1024));
let flow_table = Arc::new(FlowTable::default());
let flow_lookup_nf = FlowLookup::new("flow-lookup", flow_table.clone());
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confidence: 8
tags: [style]

This test module uses std::sync::Arc, but other NAT modules/tests use concurrency::sync::Arc to remain compatible with the workspace’s shuttle concurrency mode. To avoid type mismatches when running cargo test --features shuttle, consider switching this file to concurrency::sync::Arc as well (and using it consistently for Arc<FlowTable>/Arc<PortFwEntry>).

Copilot uses AI. Check for mistakes.
retain a flow (e.g. pipeline stages that tag packets) clone the `Arc` out of
`lookup()`.

When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this might be an issue as an expired flow may never be looked up again. We probably need to have a mechanism to actively reap this on expiry. Can we still go back to the WeakPtr idea with the tokio timer callback holding the Arc? I'm open to other mechanisms.

) -> usize {
let now = Instant::now();
let mut removed = 0usize;
// `retain` holds the write lock on each DashMap shard while evaluating the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right, retain will acquire the shard write lock but it is not guaranteed to hold it across the whole retain call, it could repeatedly lock it.

The bigger issue is that no one can have a reference to any element in the DashMap when running retain, is that the case?

// guarantees each concurrent caller gets a unique value, ensuring exactly
// one drain per N inserts even under concurrent inserts.
let raw_len = table.len();
if raw_len > self.reap_threshold {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue here is that if we have a lot of flows, we will try to reap on every insert. Reaping may not put us below the threshold. We need a better approach to only reap periodically, not every time just because we have a lot of flows.

@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch from 2184f31 to 1fd725e Compare March 24, 2026 16:15
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Sergey Matov <sergey.matov@githedgehog.com>
@sergeymatov sergeymatov force-pushed the pr/smatov/new-flow-timers branch from 1fd725e to 4934253 Compare March 24, 2026 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants