From 493425309f039689a9c443f5bf0e4ea1410e63c9 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Wed, 4 Mar 2026 16:49:28 +0400 Subject: [PATCH] chore: Move flow expiration from PQ to timers Signed-off-by: Sergey Matov Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sergey Matov --- Cargo.lock | 13 +- dataplane/src/packet_processor/mod.rs | 6 +- flow-entry/Cargo.toml | 4 +- flow-entry/src/flow_table/README.md | 49 +- flow-entry/src/flow_table/display.rs | 5 +- flow-entry/src/flow_table/mod.rs | 5 +- flow-entry/src/flow_table/nf_expirations.rs | 142 ------ flow-entry/src/flow_table/nf_lookup.rs | 46 +- flow-entry/src/flow_table/table.rs | 494 +++++++++++-------- flow-entry/src/flow_table/thread_local_pq.rs | 327 ------------ nat/src/portfw/test.rs | 13 +- 11 files changed, 356 insertions(+), 748 deletions(-) delete mode 100644 flow-entry/src/flow_table/nf_expirations.rs delete mode 100644 flow-entry/src/flow_table/thread_local_pq.rs diff --git a/Cargo.lock b/Cargo.lock index 8756e60ba..e82c97037 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,10 +1364,9 @@ dependencies = [ "dataplane-tracectl", "etherparse", "linkme", - "priority-queue", "shuttle", "thiserror 2.0.18", - "thread_local", + "tokio", "tracing", "tracing-test", ] @@ -4360,16 +4359,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "priority-queue" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93980406f12d9f8140ed5abe7155acb10bb1e69ea55c88960b9c2f117445ef96" -dependencies = [ - "equivalent", - "indexmap 2.13.0", -] - [[package]] name = "proc-macro-crate" version = "2.0.0" diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index a99655b07..1164fa3a9 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -12,7 +12,7 @@ use super::packet_processor::ipforward::IpForwarder; use concurrency::sync::Arc; -use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable}; +use flow_entry::flow_table::{FlowLookup, FlowTable}; use flow_filter::{FlowFilter, FlowFilterTableWriter}; use nat::portfw::{PortForwarder, PortFwTableWriter}; @@ -103,7 +103,6 @@ pub(crate) fn start_router( let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle()); let icmp_error_handler = IcmpErrorHandler::new(flow_table.clone()); let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone()); - let flow_expirations_nf = ExpirationsNF::new(flow_table.clone()); let portfw = PortForwarder::new( "port-forwarder", portfw_factory.handle(), @@ -111,7 +110,7 @@ pub(crate) fn start_router( ); // Build the pipeline for a router. The composition of the pipeline (in stages) is currently - // hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last + // hard-coded. Flow expiration is handled by per-flow tokio timers; no ExpirationsNF needed. DynPipeline::new() .set_data(pdata_clone) .add_stage(stage_ingress) @@ -124,7 +123,6 @@ pub(crate) fn start_router( .add_stage(stateful_nat) .add_stage(iprouter2) .add_stage(stage_egress) - .add_stage(flow_expirations_nf) .add_stage(pktdump) .add_stage(stats_stage) }; diff --git a/flow-entry/Cargo.toml b/flow-entry/Cargo.toml index a57b14401..5fe928f56 100644 --- a/flow-entry/Cargo.toml +++ b/flow-entry/Cargo.toml @@ -21,14 +21,14 @@ etherparse = { workspace = true } linkme = { workspace = true } net = { workspace = true } pipeline = { workspace = true } -priority-queue = { workspace = true } thiserror = { workspace = true } -thread_local = { workspace = true } +tokio = { workspace = true, features = ["rt", "time"] } tracectl = { workspace = true } tracing = { workspace = true } [dev-dependencies] bolero = { workspace = true, default-features = false } net = { workspace = true, features = ["bolero"] } +tokio = { workspace = true, features = ["macros", "rt", "time"] } tracing-test = { workspace = true, features = [] } shuttle = { workspace = true } diff --git a/flow-entry/src/flow_table/README.md b/flow-entry/src/flow_table/README.md index 5637b050e..fa3cc4fb1 100644 --- a/flow-entry/src/flow_table/README.md +++ b/flow-entry/src/flow_table/README.md @@ -1,28 +1,41 @@ # Flow Table -The current implementation of flow table uses `dash_map` and per-thread priority queue's (for timeouts) along with `Arc` and `Weak` to get a reasonable flow table with timeouts. -However, it leaves a lot of room for optimizations. +The flow table uses `DashMap` for concurrent key→value storage and per-flow +tokio timers for expiration. -## Flow Table Implementation +## Structure -The main `DashMap` holds `Weak` references to all the flow entries so that the memory gets automatically deallocated when the entry times out. +`FlowTable` wraps `Arc>>>`: -The priority queue's hold `Arc` references to the flow entries to keep them alive when they are not in any packet meta data. -When the entry times-out and is removed from the priority queue and the last packet referencing that flow is dropped, the memory for the entry is freed. +- The outer `RwLock` is write-locked only during resharding; all normal + operations take a read lock. +- The `Arc` lets timer tasks hold a reference to the table without a + back-reference to `FlowTable` itself. -Note that in the current implementation, a flow is not removed from the flow table until the last Arc to the flow_info is dropped or the flow entry is replaced. This can be changed if needed, or even have it be an option on the flow as to whether timeout removes the flow or not. +## Expiration -## Optimizations +When a flow is inserted, a tokio task is spawned holding `Arc`, +`Arc>`, and the `FlowKey`. The task sleeps until the current +deadline and handles extensions by re-sleeping. When it wakes and finds that +the deadline has elapsed (and the flow is not already in a terminal state), it +marks the flow `Expired` and removes its DashMap entry via `remove_if + Arc::ptr_eq` — +leaving any concurrent replacement intact. If, instead, the flow has +already been marked `Cancelled` or `Expired` by some other path while the task +was sleeping, the task exits without changing the status and only attempts the +same conditional removal; such external changes are observed only when the task +next wakes. -In the current implementation, there has to be periodic or on-timeout reaping the Weak reference in the hash table. -This is better done by having a version of `DashMap` that can reap the dead `Weak` reference as it walks the table on lookups, instead of waiting for key collisions. -The hope, for now, is that the entries in the hash table array will contain a small pointer and not take up too much extra memory. -Those dead `Weak` pointers will prevent shrinking of the hash table though, if the implementation supports that. +`lookup()` provides a fallback: it lazily removes stale entries inline, +covering non-tokio contexts and timer scheduling lag. -Second, the `priority_queue` crate uses a `HashMap` inside the queue in order to allow fast removal and re-insertion. -However, this wastes space and requires extra hashes. -The better way to do this is to have a custom priority queue integrated with the custom weak-reaping hash map so that the same hash table can be used for both operations. -This improves cache locality, reduces memory utlization, and avoids multiple hash table lookups in many cases. +## Non-tokio contexts (shuttle / sync tests) -However, in the interest of time to completion for the code, this module currently uses existing data structures instead of full custom implementations of everything. -However, the interface should be able to hide a change from the current to the optimized implementation. +No timer is spawned; a `debug!` is logged instead. Tests call +`flow_info.update_status(FlowStatus::Expired)` directly and rely on lazy +removal in `lookup()` or explicit `drain_stale()`. + +## Aggressive reap + +When the entry count exceeds `reap_threshold` (default 1 000 000), +`drain_stale()` runs automatically via `table.retain()` at most once every +1 000 inserts while the table remains over the threshold. diff --git a/flow-entry/src/flow_table/display.rs b/flow-entry/src/flow_table/display.rs index 25b344899..2762102e6 100644 --- a/flow-entry/src/flow_table/display.rs +++ b/flow-entry/src/flow_table/display.rs @@ -11,10 +11,7 @@ impl Display for FlowTable { Heading(format!("Flow Table ({} entries)", table.len())).fmt(f)?; for entry in table.iter() { let key = entry.key(); - match entry.value().upgrade() { - Some(value) => writeln!(f, "{key}\n{value}")?, - None => writeln!(f, "{key}: NONE")?, - } + writeln!(f, "{key}\n{}", entry.value())?; } } else { write!(f, "Failed to lock flow table")?; diff --git a/flow-entry/src/flow_table/mod.rs b/flow-entry/src/flow_table/mod.rs index 58a23a9f0..e92f447e1 100644 --- a/flow-entry/src/flow_table/mod.rs +++ b/flow-entry/src/flow_table/mod.rs @@ -2,17 +2,14 @@ // Copyright Open Network Fabric Authors mod display; -pub mod nf_expirations; pub mod nf_lookup; pub mod table; -mod thread_local_pq; +pub use nf_lookup::FlowLookup; pub use table::FlowTable; pub use net::flows::atomic_instant::AtomicInstant; pub use net::flows::flow_info::*; -pub use nf_expirations::ExpirationsNF; -pub use nf_lookup::FlowLookup; use tracectl::trace_target; trace_target!("flow-table", LevelFilter::INFO, &["pipeline"]); diff --git a/flow-entry/src/flow_table/nf_expirations.rs b/flow-entry/src/flow_table/nf_expirations.rs deleted file mode 100644 index c0d95103a..000000000 --- a/flow-entry/src/flow_table/nf_expirations.rs +++ /dev/null @@ -1,142 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Open Network Fabric Authors - -//! Network Function specific flow table. - -use concurrency::sync::Arc; -use net::buffer::PacketBufferMut; -use net::packet::Packet; -use pipeline::NetworkFunction; - -use crate::flow_table::FlowTable; - -use tracectl::trace_target; -trace_target!("flow-expiration", LevelFilter::INFO, &["pipeline"]); - -/// Network Function that reap expired entries from the flow table for the current thread. -/// -/// Note: This only reaps expired entries on the priority queue for the current thread. -/// It does not reap expired entries on other threads. -/// -/// This stage should be run after all other pipeline stages to reap any expired entries. -pub struct ExpirationsNF { - flow_table: Arc, -} - -impl ExpirationsNF { - pub fn new(flow_table: Arc) -> Self { - Self { flow_table } - } -} - -impl NetworkFunction for ExpirationsNF { - fn process<'a, Input: Iterator> + 'a>( - &'a mut self, - input: Input, - ) -> impl Iterator> + 'a { - self.flow_table.reap_expired(); - input - } -} - -#[cfg(test)] -mod test { - use net::buffer::TestBuffer; - use net::flows::FlowInfo; - use net::ip::UnicastIpAddr; - use net::packet::Packet; - use net::packet::VpcDiscriminant; - use net::tcp::TcpPort; - use net::vxlan::Vni; - use pipeline::NetworkFunction; - use std::net::IpAddr; - use std::sync::Arc; - use std::time::{Duration, Instant}; - - use crate::flow_table::FlowTable; - use crate::flow_table::nf_expirations::ExpirationsNF; - use net::{FlowKey, IpProtoKey, TcpProtoKey}; - - #[test] - fn test_expirations_nf() { - let flow_table = Arc::new(FlowTable::default()); - let mut expirations_nf = ExpirationsNF::new(flow_table.clone()); - let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); - let src_ip = "1.2.3.4".parse::().unwrap(); - let dst_ip = "5.6.7.8".parse::().unwrap(); - let src_port = TcpPort::new_checked(1025).unwrap(); - let dst_port = TcpPort::new_checked(2048).unwrap(); - - let flow_key = FlowKey::uni( - Some(src_vpcd), - src_ip.into(), - dst_ip, - IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), - ); - - // Insert an already expired flow entry and check that entry is there by looking it up - let flow_info = FlowInfo::new(Instant::now().checked_sub(Duration::from_secs(10)).unwrap()); - flow_table.insert(flow_key, flow_info); - assert!(flow_table.lookup(&flow_key).is_some()); - - // call process() on the NF (no packet is actually needed). NF should expire entry - let _output_iter = expirations_nf.process(std::iter::empty::>()); - assert!(flow_table.lookup(&flow_key).is_none()); - } - - #[test] - fn test_aggressive_expiration() { - const REAP_THRESHOLD_TEST: usize = 10_000; // must be < 64K for testing - - let mut flow_table = FlowTable::default(); - - // set the aggressive reap threshold - flow_table.set_reap_threshold(REAP_THRESHOLD_TEST); - - let flow_table = Arc::from(flow_table); - let mut expirations_nf = ExpirationsNF::new(flow_table.clone()); - let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); - let src_ip = "1.2.3.4".parse::().unwrap(); - let dst_ip = "5.6.7.8".parse::().unwrap(); - - // create REAP_THRESHOLD_TEST + 100 flows - for src_port in 1..=REAP_THRESHOLD_TEST + 100 { - #[allow(clippy::cast_possible_truncation)] - let src_port = TcpPort::new_checked(src_port as u16).unwrap(); - let dst_port = TcpPort::new_checked(100).unwrap(); - let flow_key = FlowKey::uni( - Some(src_vpcd), - src_ip.into(), - dst_ip, - IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), - ); - let flow_info = - FlowInfo::new(Instant::now().checked_add(Duration::from_mins(60)).unwrap()); - flow_table.insert(flow_key, flow_info); - } - // check we inserted more flows than the threshold - assert!(flow_table.len().unwrap() > REAP_THRESHOLD_TEST); - - // expire: no flow should be reaped because all are Active - let _: Vec<_> = expirations_nf - .process(std::iter::empty::>()) - .collect(); - assert!(flow_table.len().unwrap() > REAP_THRESHOLD_TEST); - - // pretend that all flows -but one- get Cancelled - for (num, flow_info) in flow_table.table.read().unwrap().iter().enumerate() { - if num != 13 { - flow_info - .upgrade() - .unwrap() - .update_status(net::flows::FlowStatus::Cancelled); - } - } - - // reap again, only one flow should be there - let _: Vec<_> = expirations_nf - .process(std::iter::empty::>()) - .collect(); - assert_eq!(flow_table.len().unwrap(), 1); - } -} diff --git a/flow-entry/src/flow_table/nf_lookup.rs b/flow-entry/src/flow_table/nf_lookup.rs index 485408e94..14494e7b4 100644 --- a/flow-entry/src/flow_table/nf_lookup.rs +++ b/flow-entry/src/flow_table/nf_lookup.rs @@ -57,6 +57,7 @@ impl NetworkFunction for FlowLookup { #[cfg(test)] mod test { + use concurrency::sync::Arc; use net::FlowKey; use net::buffer::PacketBufferMut; use net::buffer::TestBuffer; @@ -73,11 +74,9 @@ mod test { use pipeline::DynPipeline; use pipeline::NetworkFunction; use std::net::IpAddr; - use std::sync::Arc; use std::time::{Duration, Instant}; use tracing_test::traced_test; - use crate::flow_table::ExpirationsNF; use crate::flow_table::FlowTable; use crate::flow_table::nf_lookup::FlowLookup; @@ -111,7 +110,7 @@ mod test { assert!(output.meta().flow_info.is_some()); } - // A dummy NF that creates a flow entry for each packet, with a lifetime of 2 seconds + // A dummy NF that creates a flow entry for each packet, with a configurable lifetime struct FlowInfoCreator { flow_table: Arc, timeout: Duration, @@ -139,16 +138,14 @@ mod test { } #[traced_test] - #[test] - fn test_lookup_nf_with_expiration_nf() { + #[tokio::test] + async fn test_lookup_nf_with_expiration() { let flow_table = Arc::new(FlowTable::default()); let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone()); let flowinfo_creator = FlowInfoCreator::new(flow_table.clone(), Duration::from_secs(1)); - let expirations_nf = ExpirationsNF::new(flow_table.clone()); let mut pipeline: DynPipeline = DynPipeline::new() .add_stage(lookup_nf) - .add_stage(flowinfo_creator) - .add_stage(expirations_nf); + .add_stage(flowinfo_creator); const NUM_PACKETS: u16 = 1000; @@ -161,29 +158,23 @@ mod test { // process the NUM_PACKETS let packets_out = pipeline.process(packets_in.clone()); assert_eq!(packets_out.count(), NUM_PACKETS as usize); - let num_entries = flow_table.len().unwrap(); + let num_entries = flow_table.active_len().unwrap(); assert_eq!(num_entries, NUM_PACKETS as usize); - // wait twice as much as entry lifetimes. All flow entries should be gone after this. - std::thread::sleep(std::time::Duration::from_secs(2)); - pipeline - .process(std::iter::empty::>()) - .count(); + // wait twice as much as entry lifetimes — all flow timers should have fired by now + tokio::time::sleep(Duration::from_secs(2)).await; - // Entries are all gone - let num_entries = flow_table.len().unwrap(); + // All flows are now expired (marked by per-flow tokio timers) + let num_entries = flow_table.active_len().unwrap(); assert_eq!(num_entries, 0); } //#[traced_test] - #[test] - fn test_lookups_with_related_flows() { + #[tokio::test] + async fn test_lookups_with_related_flows() { let flow_table = Arc::new(FlowTable::default()); let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone()); - let expirations_nf = ExpirationsNF::new(flow_table.clone()); - let mut pipeline: DynPipeline = DynPipeline::new() - .add_stage(lookup_nf) - .add_stage(expirations_nf); + let mut pipeline: DynPipeline = DynPipeline::new().add_stage(lookup_nf); { let mut packet_1 = build_test_udp_ipv4_packet("10.0.0.1", "20.0.0.1", 80, 500); @@ -231,16 +222,13 @@ mod test { assert!(Arc::ptr_eq(&related_1, flow_2_pkt)); assert!(Arc::ptr_eq(&related_2, &flow_1)); assert!(Arc::ptr_eq(&related_2, flow_1_pkt)); - assert_eq!(flow_table.len().unwrap(), 2); + assert_eq!(flow_table.active_len().unwrap(), 2); } - // wait 3 secs. Flow 1 should have been removed - std::thread::sleep(Duration::from_secs(3)); - pipeline - .process(std::iter::empty::>()) - .count(); + // wait 3 secs. Flow 1 should have been expired by its tokio timer + tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!(flow_table.len().unwrap(), 1); + assert_eq!(flow_table.active_len().unwrap(), 1); // build identical packets and process them again let mut packet_1 = build_test_udp_ipv4_packet("10.0.0.1", "20.0.0.1", 80, 500); diff --git a/flow-entry/src/flow_table/table.rs b/flow-entry/src/flow_table/table.rs index 6bdc98eca..c4e5199ce 100644 --- a/flow-entry/src/flow_table/table.rs +++ b/flow-entry/src/flow_table/table.rs @@ -2,17 +2,17 @@ // Copyright Open Network Fabric Authors use ahash::RandomState; +use concurrency::sync::atomic::{AtomicUsize, Ordering}; use dashmap::DashMap; use net::FlowKey; use std::borrow::Borrow; use std::fmt::Debug; use std::hash::Hash; use std::time::Instant; -use tracing::debug; +use tracing::{debug, warn}; -use concurrency::sync::{Arc, RwLock, RwLockReadGuard, Weak}; +use concurrency::sync::{Arc, RwLock, RwLockReadGuard}; -use crate::flow_table::thread_local_pq::{PQAction, ThreadLocalPriorityQueue}; use net::flows::{FlowInfo, FlowStatus}; #[derive(Debug, thiserror::Error)] @@ -21,14 +21,21 @@ pub enum FlowTableError { InvalidShardCount(usize), } -type PriorityQueue = ThreadLocalPriorityQueue>; -type Table = DashMap, RandomState>; +type Table = DashMap, RandomState>; #[derive(Debug)] pub struct FlowTable { // TODO(mvachhar) move this to a cross beam sharded lock - pub(crate) table: RwLock, - pub(crate) priority_queue: PriorityQueue, + // + // We need to push table lock ref down into tokio tasks + // so invoked timer cleans up the table entry instead of just marking + // the flow info expired and leaving cleanup to lazy expiration in lookup(). + pub(crate) table: Arc>, + reap_threshold: usize, + /// Counts inserts that occurred while `table.len() > reap_threshold`. + /// The drain scan runs only every `DRAIN_EVERY_N_INSERTS` such inserts, + /// bounding the O(n) scan cost when the table is legitimately large. + inserts_while_large: AtomicUsize, } impl Default for FlowTable { @@ -44,19 +51,30 @@ fn hasher_state() -> &'static RandomState { } impl FlowTable { + /// When the raw `DashMap` entry count exceeds this threshold, `insert_common` will + /// proactively purge all stale (Expired / Cancelled / deadline-passed) entries to + /// prevent unbounded memory growth. + pub const AGGRESSIVE_REAP_THRESHOLD: usize = 1_000_000; + + /// When the table is over `reap_threshold`, run the O(n) drain scan at most once + /// every this many inserts. Between scans the table may grow by up to this many + /// entries above the threshold, which is an acceptable bounded overshoot. + pub const DRAIN_EVERY_N_INSERTS: usize = 1_000; + #[must_use] pub fn new(num_shards: usize) -> Self { Self { - table: RwLock::new(Table::with_hasher_and_shard_amount( + table: Arc::new(RwLock::new(Table::with_hasher_and_shard_amount( hasher_state().clone(), num_shards, - )), - priority_queue: PriorityQueue::new(None), + ))), + reap_threshold: Self::AGGRESSIVE_REAP_THRESHOLD, + inserts_while_large: AtomicUsize::new(0), } } pub fn set_reap_threshold(&mut self, reap_threshold: usize) { - self.priority_queue.set_reap_threshold(reap_threshold); + self.reap_threshold = reap_threshold; } /// Reshard the flow table into the given number of shards. @@ -171,17 +189,90 @@ impl FlowTable { fn insert_common(&self, flow_key: FlowKey, val: &Arc) -> Option> { let table = self.table.read().unwrap(); - let expires_at = val.expires_at(); - let result = table.insert(flow_key, Arc::downgrade(val)); - self.priority_queue.push(flow_key, val.clone(), expires_at); - let ret = match result { - Some(w) => w.upgrade(), - None => None, - }; - - let Some(ret) = ret else { - return ret; - }; + let result = table.insert(flow_key, val.clone()); + + // Proactively purge stale entries when the table is over the threshold, but + // only every DRAIN_EVERY_N_INSERTS inserts while it remains large. This + // bounds memory growth while avoiding an O(n) scan on every single insert: + // once the table is legitimately large with mostly-Active flows, draining on + // every insert would degrade to O(n²) behaviour under load. + // + // `inserts_while_large` is a monotonically increasing counter that only + // increments while we are over the threshold; it naturally stops when the + // table returns to a healthy size, so no reset is needed. `fetch_add` + // guarantees each concurrent caller gets a unique value, ensuring exactly + // one drain per N inserts even under concurrent inserts. + let raw_len = table.len(); + if raw_len > self.reap_threshold { + let n = self.inserts_while_large.fetch_add(1, Ordering::Relaxed); + if n.is_multiple_of(Self::DRAIN_EVERY_N_INSERTS) { + warn!( + "The number of flows ({raw_len}) exceeds {}. Reaping stale entries \ + (insert #{n} while large)...", + self.reap_threshold + ); + Self::drain_stale_with_read_lock(&table); + } + } + + // Spawn a per-flow expiration timer when running inside a tokio runtime. + // After the deadline elapses, the timer marks the flow Expired and removes its + // own DashMap entry so expired flows are collected + // promptly even when no subsequent lookup for the same key ever arrives. + // Lazy cleanup in lookup() remains as a fallback for non-tokio contexts and + // for entries cancelled or expired via other paths. + // In non-tokio contexts (shuttle tests, sync unit tests) a debug is logged + // and no timer is spawned; lazy time-checking in `lookup` handles expiration. + // + // Only spawn a timer for a genuinely new Arc. If the same Arc is being + // reinserted (e.g. via reinsert()), its existing timer loop already handles + // extended deadlines via the `new_deadline > deadline` re-check, so spawning + // a second task would be redundant and would cause unbounded task growth. + let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val)); + if need_timer { + if tokio::runtime::Handle::try_current().is_err() { + debug!( + "insert: no tokio runtime present, flow expiration timer not spawned; \ + relying on lazy expiration in lookup()" + ); + } else { + let fi = val.clone(); + // Get the ref to the table + let table_arc = self.table.clone(); + let initial_deadline = fi.expires_at(); + tokio::task::spawn(async move { + let mut deadline = initial_deadline; + loop { + tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await; + if fi.status() != FlowStatus::Active { + // Cancelled or externally marked Expired — fall through to removal. + break; + } + let new_deadline = fi.expires_at(); + if new_deadline > deadline { + // Deadline was extended (e.g. by StatefulNat); sleep again. + deadline = new_deadline; + continue; + } + fi.update_status(FlowStatus::Expired); + break; + } + // Reached on every break (normal expiry or Cancelled/Expired early exit). + // `continue` bypasses this, so removal only fires when the loop terminates. + // Use remove_if + ptr_eq so a concurrently inserted replacement is left intact. + let table = table_arc.read().unwrap_or_else(|poisoned| { + debug!( + "flow expiration task: FlowTable RwLock poisoned; \ + proceeding with possibly inconsistent table state" + ); + poisoned.into_inner() + }); + table.remove_if(&flow_key, |_, v| Arc::ptr_eq(v, &fi)); + }); + } + } + + let ret = result?; if ret.status() == FlowStatus::Expired { return None; @@ -192,6 +283,11 @@ impl FlowTable { /// Lookup a flow in the table. /// + /// Performs lazy time-based expiration: if the matched entry is still + /// `Active` but its deadline has passed (e.g. because the tokio timer has + /// not yet fired, or no tokio runtime is present), the entry is marked + /// `Expired` and removed here. + /// /// # Panics /// /// Panics if this thread already holds the read lock on the table or @@ -203,21 +299,31 @@ impl FlowTable { { debug!("lookup: Looking up flow key {:?}", flow_key); let table = self.table.read().unwrap(); - let item = table.get(flow_key)?.upgrade(); - let Some(item) = item else { - debug!( - "lookup: Removing flow key {:?}, found empty weak reference", - flow_key - ); - Self::remove_with_read_lock(&table, flow_key); - return None; - }; + let item = table.get(flow_key)?.value().clone(); + // NOTE: the DashMap shard guard from `.get()` is dropped here. Between this + // point and any removal below, another thread may have replaced the entry under + // the same key with a fresh flow. We therefore use `remove_if` with an + // `Arc::ptr_eq` guard so that we only delete the specific Arc we examined — + // a concurrent replacement will cause `ptr_eq` to be false and the new entry + // will be left intact. let status = item.status(); match status { - FlowStatus::Active => Some(item), + FlowStatus::Active => { + // Lazy expiration: cover non-tokio contexts and timer scheduling lag. + if item.expires_at() <= Instant::now() { + debug!( + "lookup: Flow key {:?} has passed its deadline, expiring", + flow_key + ); + item.update_status(FlowStatus::Expired); + table.remove_if(flow_key, |_, v| Arc::ptr_eq(v, &item)); + return None; + } + Some(item) + } FlowStatus::Expired | FlowStatus::Cancelled => { debug!("lookup: Flow key {:?} is '{status}', removing", flow_key); - Self::remove_with_read_lock(&table, flow_key); + table.remove_if(flow_key, |_, v| Arc::ptr_eq(v, &item)); None } } @@ -240,115 +346,82 @@ impl FlowTable { } fn remove_with_read_lock( - table: &RwLockReadGuard, RandomState>>, + table: &RwLockReadGuard, RandomState>>, flow_key: &Q, ) -> Option<(FlowKey, Arc)> where FlowKey: Borrow, Q: Hash + Eq + ?Sized + Debug, { - let result = table.remove(flow_key); - let (k, w) = result?; - let old_val = w.upgrade()?; - if old_val.status() == FlowStatus::Expired { + let (k, v) = table.remove(flow_key)?; + if v.status() == FlowStatus::Expired { return None; } - Some((k, old_val)) + Some((k, v)) } - fn decide_expiry(now: &Instant, k: &FlowKey, v: &Arc) -> PQAction { - // Note(mvachhar) - // - //I'm not sure if marking the entry as expired is worthwhile here - // nor am I sure of the performance cost of doing this. - // It isn't strictly needed, though it means other holders of the Arc may - // be able to read stale data and wouldn't know the entry is expired. - // - // If the common case is that the entry has no other references here, - // then this operation should be cheap, though not free due to the - // dereference of the value and the lock acquisition. - let expires_at = v.expires_at(); - if now >= &expires_at { - debug!("decide_expiry: Reap for flow key {k:?} with expires_at {expires_at:?}"); - PQAction::Reap - } else if v.status() == FlowStatus::Cancelled { - debug!("decide_expiry: Cancel for flow key {k:?}, which was cancelled"); - PQAction::Cancel - } else { - debug!("decide_expiry: Update for flow key {k:?} with time {expires_at:?}"); - PQAction::Update(expires_at) - } - } - - // Pass by value here since the PQ doesn't know the value is an Arc - // and we get ownership of the value here - #[allow(clippy::needless_pass_by_value)] - fn do_reap(k: &FlowKey, v: Arc) { - v.update_status(FlowStatus::Expired); - debug!("do_reap: Updated flow status for {k:?} to expired"); - } - - /// Remove all of the flow entries for the provided `FlowKey`s, returning the number of - /// entries removed - /// - /// # Panics + /// Remove all stale entries from the table (entries that are `Expired`, `Cancelled`, or + /// whose deadline has already passed). /// - /// Panics if this thread already holds the read lock on the table or - /// if the table lock is poisoned. - fn remove_flow_entries(&self, reaped_keys: &Vec) -> usize { - let num_keys = reaped_keys.len(); - let mut removed = 0; - let table = self.table.read().unwrap(); - for flow_key in reaped_keys { - if let Some((_key, _flow_info)) = table.remove(flow_key) { - removed += 1; - } - } - debug!("Removed {removed} flow-entries out of {num_keys} keys"); - num_keys - } - - /// Reap expired entries from the priority queue for the current thread. - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the flow table. + /// Returns the number of entries removed. /// /// # Panics /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_expired(&self) -> usize { - let reaped_keys = self - .priority_queue - .reap_expired(Self::decide_expiry, Self::do_reap); - self.remove_flow_entries(&reaped_keys) - } - - pub fn reap_all_expired(&self) -> usize { - let reaped_keys = self - .priority_queue - .reap_all_expired(Self::decide_expiry, Self::do_reap); - self.remove_flow_entries(&reaped_keys) + /// Panics if this thread already holds the read lock on the table or if the lock is poisoned. + pub fn drain_stale(&self) -> usize { + let table = self.table.read().unwrap(); + Self::drain_stale_with_read_lock(&table) } - #[cfg(all(test, feature = "shuttle"))] - pub fn reap_all_expired_with_time(&self, time: &Instant) -> usize { - let reaped_keys = self.priority_queue.reap_all_expired_with_time( - time, - Self::decide_expiry, - Self::do_reap, - ); - self.remove_flow_entries(&reaped_keys) + fn drain_stale_with_read_lock( + table: &RwLockReadGuard, RandomState>>, + ) -> usize { + let now = Instant::now(); + let mut removed = 0usize; + // `retain` holds the write lock on each DashMap shard while evaluating the + // predicate, making the staleness check and the removal atomic per shard. + // This closes the race that the previous collect-then-remove-by-key pattern + // had: a concurrent insert could no longer slip a fresh entry under a key + // between the time we marked it for removal and the time we called remove(). + // As a bonus, retain is a single O(n) pass with no temporary Vec allocation. + table.retain(|_key, val| { + let stale = match val.status() { + FlowStatus::Expired | FlowStatus::Cancelled => true, + FlowStatus::Active if val.expires_at() <= now => { + // Deadline passed but the tokio timer has not fired yet; mark and remove. + val.update_status(FlowStatus::Expired); + true + } + FlowStatus::Active => false, + }; + if stale { + removed += 1; + } + !stale + }); + debug!("drain_stale: Removed {removed} stale flows"); + removed } #[allow(clippy::len_without_is_empty)] - /// Tell how many flows are in the table if it can be locked - /// This is mostly for testing + /// Returns the total number of entries physically stored in the table, regardless of + /// their expiration status. This is mostly for testing. pub fn len(&self) -> Option { let table = self.table.try_read().ok()?; Some(table.len()) } + + /// Returns the number of *active* (non-expired, non-cancelled) flows in the table. + /// This is mostly for testing. + pub fn active_len(&self) -> Option { + let table = self.table.try_read().ok()?; + Some( + table + .iter() + .filter(|e| e.value().status() == FlowStatus::Active) + .count(), + ) + } } #[cfg(test)] @@ -413,20 +486,16 @@ mod tests { let flow_info = FlowInfo::new(now + two_seconds); flow_table.insert(flow_key, flow_info); - // Wait 1 second, should still be present + // Wait 1 second — flow not yet expired, lazy lookup should return Some. thread::sleep(one_second); - // Reap expired entries after 1 second (should not reap our entry) - flow_table.reap_expired(); assert!( flow_table.lookup(&flow_key).is_some(), "Flow key should still be present after 1 second" ); - // Wait another 2 seconds (total 3s), should be expired + // Wait another 2 seconds (total 3s) — flow expired. + // Lazy expiration in lookup cleans it up. thread::sleep(two_seconds); - // Reap expired entries - flow_table.reap_expired(); - assert!( flow_table.lookup(&flow_key).is_none(), "Flow key should have expired and been removed" @@ -434,7 +503,7 @@ mod tests { } #[test] - fn test_flow_table_weak_ref_replaced_on_insert() { + fn test_flow_table_entry_replaced_on_insert() { let now = Instant::now(); let first_expiry_time = now + Duration::from_secs(5); let second_expiry_time = now + Duration::from_secs(10); @@ -450,51 +519,32 @@ mod tests { }), )); - // Insert first entry - let first_flow_info = FlowInfo::new(first_expiry_time); - let first_flow_info_arc = Arc::new(first_flow_info); - let weak_flow_info_reference = Arc::downgrade(&first_flow_info_arc); - flow_table.insert_from_arc(flow_key, &first_flow_info_arc); - drop(first_flow_info_arc); + // Insert first entry. + let first_arc = Arc::new(FlowInfo::new(first_expiry_time)); + flow_table.insert_from_arc(flow_key, &first_arc); - // The weak reference stored in the table should still resolve + // The entry stored in the table should be the first arc. { let table = flow_table.table.read().unwrap(); let entry = table .get(&flow_key) .expect("entry should exist after first insert"); - let resolved = entry - .upgrade() - .expect("weak ref should resolve after first insert"); - assert_eq!(resolved.as_ref().expires_at(), first_expiry_time); - } // drops `entry` (shard read lock) and `table` (outer RwLock read guard) - - // The weak reference we kept outside of the table should still resolve, too. Upgrade - // it: we now have two strong references, one from the priority queue and one from the - // upgrade. - let upgrade = weak_flow_info_reference.upgrade(); - assert_eq!(Arc::strong_count(&upgrade.unwrap()), 2); - - // Insert a second entry under the same key but with a different value. - let second_flow_info = FlowInfo::new(second_expiry_time); - flow_table.insert_from_arc(flow_key, &Arc::new(second_flow_info)); - - // The weak reference should now resolve to second_arc, not first_arc. + assert_eq!(entry.value().expires_at(), first_expiry_time); + } + + // Insert a second entry under the same key. + let second_arc = Arc::new(FlowInfo::new(second_expiry_time)); + flow_table.insert_from_arc(flow_key, &second_arc); + + // The table should now point to the second entry. { let table = flow_table.table.read().unwrap(); let entry = table .get(&flow_key) .expect("entry should exist after second insert"); - let resolved = entry - .upgrade() - .expect("weak ref should resolve after second insert"); - assert_ne!(resolved.as_ref().expires_at(), first_expiry_time); - assert_eq!(resolved.as_ref().expires_at(), second_expiry_time); + assert_ne!(entry.value().expires_at(), first_expiry_time); + assert_eq!(entry.value().expires_at(), second_expiry_time); } - - // The strong reference from the priority queue for the first entry has been dropped. - assert_eq!(Weak::strong_count(&weak_flow_info_reference), 0); - assert!(weak_flow_info_reference.upgrade().is_none()); } #[test] @@ -503,21 +553,22 @@ mod tests { bolero::check!() .with_type::() .for_each(|flow_key| { - flow_table.insert(*flow_key, FlowInfo::new(Instant::now())); - let flow_info_str = format!("{:?}", flow_table.lookup(flow_key).unwrap()); - - // We purposely keep the flow alive here to make sure lookup reaps it - let _flow_info = flow_table.lookup(flow_key).unwrap(); + // Insert with a future expiry so early lookups see the flow. + flow_table.insert( + *flow_key, + FlowInfo::new(Instant::now() + Duration::from_secs(60)), + ); + let flow_info = flow_table.lookup(flow_key).unwrap(); assert!(flow_table.lookup(&flow_key.reverse(None)).is_none()); - thread::sleep(Duration::from_millis(100)); - flow_table.reap_all_expired(); + // Simulate expiration (what the tokio timer would do). + flow_info.update_status(FlowStatus::Expired); + // Lazy cleanup on next lookup. let result = flow_table.lookup(flow_key); assert!( result.is_none(), - "flow_key lookup is not none {result:#?}, inserted {flow_info_str}, now: {:?}", - Instant::now() + "expired flow should be removed by lookup, inserted {flow_info:?}" ); }); } @@ -528,7 +579,11 @@ mod tests { bolero::check!() .with_type::() .for_each(|flow_key| { - flow_table.insert(*flow_key, FlowInfo::new(Instant::now())); + // Use a future expiry so the flow stays active long enough for remove(). + flow_table.insert( + *flow_key, + FlowInfo::new(Instant::now() + Duration::from_secs(60)), + ); let flow_info = flow_table.lookup(flow_key).unwrap(); assert!(flow_table.lookup(&flow_key.reverse(None)).is_none()); @@ -540,6 +595,56 @@ mod tests { assert!(flow_table.lookup(flow_key).is_none()); }); } + + #[test] + fn test_aggressive_reap_threshold() { + // Must be small enough to stay within u16 port range (< 65_535). + const REAP_THRESHOLD_TEST: usize = 10_000; + + let mut flow_table = FlowTable::default(); + flow_table.set_reap_threshold(REAP_THRESHOLD_TEST); + + let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); + let src_ip: IpAddr = "1.2.3.4".parse().unwrap(); + let dst_ip: IpAddr = "5.6.7.8".parse().unwrap(); + + // Insert REAP_THRESHOLD_TEST + 100 flows, all Active with a far-future expiry. + for src_port in 1..=REAP_THRESHOLD_TEST + 100 { + #[allow(clippy::cast_possible_truncation)] + let src_port = TcpPort::new_checked(src_port as u16).unwrap(); + let dst_port = TcpPort::new_checked(100).unwrap(); + let flow_key = FlowKey::Unidirectional(FlowKeyData::new( + Some(src_vpcd), + src_ip, + dst_ip, + IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), + )); + let flow_info = FlowInfo::new(Instant::now() + Duration::from_secs(3600)); + flow_table.insert(flow_key, flow_info); + } + + // We inserted more flows than the threshold. + assert!(flow_table.active_len().unwrap() > REAP_THRESHOLD_TEST); + + // drain_stale: nothing should be reaped because all are Active with far-future expiry. + let reaped = flow_table.drain_stale(); + assert_eq!(reaped, 0); + assert!(flow_table.active_len().unwrap() > REAP_THRESHOLD_TEST); + + // Mark all flows except the first one as Cancelled. + let mut kept = 0usize; + for entry in flow_table.table.read().unwrap().iter() { + if kept == 0 { + kept += 1; + continue; + } + entry.value().update_status(FlowStatus::Cancelled); + } + + // drain_stale: all Cancelled flows should be purged, leaving exactly 1. + flow_table.drain_stale(); + assert_eq!(flow_table.active_len().unwrap(), 1); + } } #[concurrency_mode(shuttle)] @@ -554,7 +659,6 @@ mod tests { move || { let now = Instant::now(); let two_seconds = Duration::from_secs(2); - let one_second = Duration::from_secs(1); let flow_table = FlowTable::default(); let flow_key = FlowKey::Unidirectional(FlowKeyData::new( @@ -570,21 +674,21 @@ mod tests { let flow_info = FlowInfo::new(now + two_seconds); flow_table.insert(flow_key, flow_info); - // Reap expired entries after 1 second (should not reap our entry) - // Shuttle does not model time, hence this hack - flow_table.reap_all_expired_with_time(&(now + one_second)); + // Flow is active; lookup should return Some. assert!( flow_table.lookup(&flow_key).is_some(), - "Flow key should still be present after 1 second" + "Flow key should be present" ); - // Reap expired entries - // Shuttle does not model time, hence this hack - flow_table.reap_all_expired_with_time(&(now + two_seconds)); + // Simulate timer expiration by marking the flow directly. + if let Some(fi) = flow_table.lookup(&flow_key) { + fi.update_status(FlowStatus::Expired); + } + // Lazy cleanup on next lookup. assert!( flow_table.lookup(&flow_key).is_none(), - "Flow key should have expired and been removed" + "Flow key should be gone after expiration" ); }, 100, @@ -594,7 +698,7 @@ mod tests { #[allow(clippy::too_many_lines)] #[test] #[tracing_test::traced_test] - fn test_flow_table_concurrent_insert_remove_lookup_timeout() { + fn test_flow_table_concurrent_insert_remove_lookup_expire() { const N: usize = 3; let two_seconds = Duration::from_secs(2); @@ -630,15 +734,20 @@ mod tests { let mut flow_info_holder = Some(flow_info); let mut handles = vec![]; + + // "expirer" thread — simulates what the tokio timer would do. handles.push( thread::Builder::new() - .name("timeout_reaper".to_string()) + .name("expirer".to_string()) .spawn({ let flow_table = flow_table.clone(); + let flow_key = flow_keys[0]; move || { for _ in 0..N { thread::yield_now(); - flow_table.reap_expired(); + if let Some(fi) = flow_table.lookup(&flow_key) { + fi.update_status(FlowStatus::Expired); + } } } }) @@ -703,22 +812,11 @@ mod tests { handle.join().unwrap(); } - // Shuttle does not model time so we need this hack - let reap_time = now + two_seconds; - flow_table.reap_all_expired_with_time(&reap_time); - - // After all threads, all keys should be either gone or expired - for key in &flow_keys { - let result = flow_table.lookup(key); - assert!( - result.is_none(), - "Flow key {:#?} should have expired at {:?} and been removed, now at create: {:?}, reap time: {:?}", - *key, - result.unwrap().expires_at(), - now, - reap_time - ); - } + // After all threads, flow[0] should be expired/gone (expirer thread ran). + assert!( + flow_table.lookup(&flow_keys[0]).is_none(), + "Flow key[0] should have been expired" + ); }, 100, ); diff --git a/flow-entry/src/flow_table/thread_local_pq.rs b/flow-entry/src/flow_table/thread_local_pq.rs deleted file mode 100644 index 60664e67d..000000000 --- a/flow-entry/src/flow_table/thread_local_pq.rs +++ /dev/null @@ -1,327 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Open Network Fabric Authors - -use std::cmp::Ordering; -use std::hash::{Hash, Hasher}; -use std::time::Instant; - -use ahash::RandomState; -use concurrency::sync::RwLock; -// Should we just move this to std::collections::BinaryHeap? -// We aren't using the hash table feature right now, though we may want it later. -use priority_queue::PriorityQueue; -use thread_local::ThreadLocal; -use tracing::{debug, warn}; - -use tracectl::trace_target; -trace_target!( - "flow-table-pq", - LevelFilter::INFO, - &["flow-expiration", "pipeline"] -); - -#[repr(transparent)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct Priority(Instant); - -impl PartialOrd for Priority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for Priority { - fn cmp(&self, other: &Self) -> Ordering { - match self.0.cmp(&other.0) { - Ordering::Equal => Ordering::Equal, - Ordering::Less => Ordering::Greater, - Ordering::Greater => Ordering::Less, - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - key: K, - value: V, -} - -impl Hash for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - fn hash(&self, state: &mut H) { - self.key.hash(state); - } -} - -impl PartialEq for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - fn eq(&self, other: &Self) -> bool { - self.key == other.key - } -} - -impl Eq for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ -} - -#[derive(Debug)] -pub(crate) struct ThreadLocalPriorityQueue -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - #[allow(clippy::type_complexity)] - pqs: ThreadLocal, Priority, RandomState>>>, - reap_threshold: usize, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PQAction { - Reap, - Cancel, - Update(Instant), -} - -impl ThreadLocalPriorityQueue -where - K: Send + Sync + Hash + PartialEq + Eq, - V: Send + Sync, -{ - pub(crate) const AGRESSIVE_REAP_THRESHOLD: usize = 1_000_000; - pub fn new(reap_threshold: Option) -> Self { - Self { - pqs: ThreadLocal::new(), - reap_threshold: reap_threshold.unwrap_or(Self::AGRESSIVE_REAP_THRESHOLD), - } - } - - pub fn set_reap_threshold(&mut self, reap_threahold: usize) { - self.reap_threshold = reap_threahold; - } - - fn get_pq_lock(&self) -> &RwLock, Priority, RandomState>> { - self.pqs - .get_or(|| RwLock::new(PriorityQueue::with_default_hasher())) - } - - /// Insert an entry into the priority queue. If the queue already contains an entry with the - /// same key, the old entry is first removed from the queue, and then the new entry is inserted. - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn push(&self, key: K, value: V, expires_at: Instant) { - let new_entry = Entry { key, value }; - let pq = self.get_pq_lock(); - let mut pq_lock = pq.write().unwrap(); - - // Calling the PriorityQueue .push() does not ensure that the entry will be in the queue. - // Its documentation mentions: - // - // "If an element equal to item is already in the queue, its priority is updated and the - // old priority is returned in Some; otherwise, item is inserted with priority and None is - // returned." - // - // But "equal to item" actually means "hashing to the same value", and in the case of a - // struct Entry, the hash function only hashes the key, not the value, so if we try to push - // a new entry with the same key but a different value, the old entry will be updated with - // the new priority but the value will not be updated. To avoid this, always remove any - // entry with a similar key (whatever the value) before trying to push the new one. - let _ = pq_lock.remove(&new_entry); - pq_lock.push(new_entry, Priority(expires_at)); - } - - /// Reap expired entries from the priority queue. - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_expired( - &self, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let pql = self.get_pq_lock(); - let mut pq = pql.write().unwrap(); - Self::reap_expired_locked_with_time( - &mut pq, - &Instant::now(), - on_expired, - on_reaped, - self.reap_threshold, - ) - } - - /// Reap expired entries from all priority queues (regardless of current thread) - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_all_expired( - &self, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - self.reap_all_expired_with_time_internal(&Instant::now(), &on_expired, &on_reaped) - } - - #[allow(unused)] // This is unused for now if shuttle is not enabled - #[cfg(test)] - pub fn reap_all_expired_with_time( - &self, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - self.reap_all_expired_with_time_internal(now, &on_expired, &on_reaped) - } - - fn reap_all_expired_with_time_internal( - &self, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let pqs = self.pqs.iter(); - let mut all_reaped = vec![]; - for pq in pqs { - let mut pq = pq.write().unwrap(); - let mut reaped = Self::reap_expired_locked_with_time( - &mut pq, - now, - &on_expired, - &on_reaped, - self.reap_threshold, - ); - all_reaped.append(&mut reaped); - } - all_reaped - } - - fn aggressive_reap( - pq: &mut concurrency::sync::RwLockWriteGuard< - PriorityQueue, Priority, RandomState>, - >, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let reaped: Vec<_> = pq - .extract_if(|entry, _prio| { - let action = on_expired(now, &entry.key, &entry.value); - action == PQAction::Reap || action == PQAction::Cancel - }) - .map(|(entry, _prio)| { - let key = entry.key; - on_reaped(&key, entry.value); - key - }) - .collect(); - debug!("Reaped {} flows", reaped.len()); - reaped - } - - fn reap_expired_locked_with_time( - pq: &mut concurrency::sync::RwLockWriteGuard< - PriorityQueue, Priority, RandomState>, - >, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - reap_threshold: usize, - ) -> Vec { - let len = pq.len(); - if len > reap_threshold { - warn!("The number of flows ({len}) exceeds {reap_threshold}. Reaping..."); - return Self::aggressive_reap(pq, now, on_expired, on_reaped); - } - - let mut expired = Vec::new(); - debug!( - "Reaping expired flows at {:?}, queue size {}", - now, - pq.len() - ); - - while let Some((_, expires_at)) = pq.peek() { - if *now >= expires_at.0 { - let ret = pq.pop(); - let Some(entry) = ret else { - break; - }; - // This is going to copy the entry and key, even if it is to be reinserted, - // which sucks. Find a better way to do this and placate the rust - // borrow checker. Without this copy, the borrow checker will - // complain that you cannot pop the entry because of the borrow in the peek. - // - // This is probably fine for now though as we use K that is a FlowKey, - // copying it isn't ideal but probably cheap enough, and the value is an Arc. - expired.push(entry); - } else { - break; - } - } - - debug!( - "Found {} expired flows at {:?}, queue size {}", - expired.len(), - now, - pq.len() - ); - - let mut reaped = vec![]; - for (entry, _) in expired { - match on_expired(now, &entry.key, &entry.value) { - PQAction::Reap | PQAction::Cancel => { - // entry.value is consumed here, but the key is kept so that - // the corresponding flow-entry, pointing to a dropped flow-info - // can be removed from the flow-table. - on_reaped(&entry.key, entry.value); - reaped.push(entry.key); - } - PQAction::Update(new_expires_at) => { - pq.push(entry, Priority(new_expires_at)); - } - } - } - reaped - } -} - -impl Default for ThreadLocalPriorityQueue -where - K: Send + Sync + Hash + PartialEq + Eq, - V: Send + Sync, -{ - fn default() -> Self { - Self::new(None) - } -} diff --git a/nat/src/portfw/test.rs b/nat/src/portfw/test.rs index ec90764b2..41ae6435e 100644 --- a/nat/src/portfw/test.rs +++ b/nat/src/portfw/test.rs @@ -6,7 +6,7 @@ mod nf_test { use crate::portfw::protocol::PortFwFlowStatus; use crate::portfw::{PortForwarder, PortFwEntry, PortFwKey, PortFwState, PortFwTableWriter}; - use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable}; + use flow_entry::flow_table::{FlowLookup, FlowTable}; use net::buffer::TestBuffer; use net::flows::FlowStatus; use net::flows::flow_info_item::ExtractRef; @@ -16,10 +16,10 @@ mod nf_test { use net::packet::{DoneReason, Packet, VpcDiscriminant}; use std::time::Duration; + use concurrency::sync::Arc; use lpm::prefix::Prefix; use pipeline::{DynPipeline, NetworkFunction}; use std::str::FromStr; - use std::sync::Arc; use tracing_test::traced_test; fn get_flow_status(packet: &Packet) -> Option { @@ -186,14 +186,11 @@ mod nf_test { ) -> (Arc, DynPipeline, PortFwTableWriter) { // build a pipeline with flow lookup + port forwarder let mut writer = PortFwTableWriter::new(); - let flow_table = Arc::new(FlowTable::new(1024)); + let flow_table = Arc::new(FlowTable::default()); let flow_lookup_nf = FlowLookup::new("flow-lookup", flow_table.clone()); let nf = PortForwarder::new("port-forwarder", writer.reader(), flow_table.clone()); - let flow_expirations = ExpirationsNF::new(flow_table.clone()); - let pipeline: DynPipeline = DynPipeline::new() - .add_stage(flow_lookup_nf) - .add_stage(nf) - .add_stage(flow_expirations); + let pipeline: DynPipeline = + DynPipeline::new().add_stage(flow_lookup_nf).add_stage(nf); // set port-forwarding rules writer.update_table(ruleset).unwrap();