From 7a34acc00939f36d9bb10848d95819652fbc7b5c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 23 Feb 2023 14:38:40 -0800 Subject: [PATCH 1/3] Remove globals from parking spot tests Use `std::thread::scope` to keep everything local to just the tests. --- crates/runtime/src/parking_spot.rs | 136 +++++++++++++++-------------- 1 file changed, 70 insertions(+), 66 deletions(-) diff --git a/crates/runtime/src/parking_spot.rs b/crates/runtime/src/parking_spot.rs index dee1019b22bc..4fb6e9c42b31 100644 --- a/crates/runtime/src/parking_spot.rs +++ b/crates/runtime/src/parking_spot.rs @@ -176,55 +176,53 @@ impl ParkingSpot { #[cfg(test)] mod tests { use super::ParkingSpot; - use once_cell::sync::Lazy; use std::ptr::addr_of; use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; - - static PARKING_SPOT: Lazy = Lazy::new(ParkingSpot::default); - - static ATOMIC: AtomicU64 = AtomicU64::new(0); + use std::time::{Duration, Instant}; #[test] fn atomic_wait_notify() { - let thread1 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - ATOMIC.store(1, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 1, None); - }); + let parking_spot = &ParkingSpot::default(); + let atomic = &AtomicU64::new(0); + + thread::scope(|s| { + let atomic_key = addr_of!(atomic) as u64; + let thread1 = s.spawn(move || { + atomic.store(1, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 1, None); + }); + + let thread2 = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != 1 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 1, None); + } + atomic.store(2, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 2, None); + }); + + let thread3 = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != 2 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 2, None); + } + atomic.store(3, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); - let thread2 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 1 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 1, None); - } - ATOMIC.store(2, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 2, None); - }); + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 3, None); + }); - let thread3 = thread::spawn(|| { - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 2 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 2, None); + while atomic.load(Ordering::SeqCst) != 3 { + parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 3, None); } - ATOMIC.store(3, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); + atomic.store(4, Ordering::SeqCst); + parking_spot.unpark(atomic_key, u32::MAX); - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) == 3, None); + thread1.join().unwrap(); + thread2.join().unwrap(); + thread3.join().unwrap(); }); - - let atomic_key = addr_of!(ATOMIC) as u64; - while ATOMIC.load(Ordering::SeqCst) != 3 { - PARKING_SPOT.park(atomic_key, || ATOMIC.load(Ordering::SeqCst) != 3, None); - } - ATOMIC.store(4, Ordering::SeqCst); - PARKING_SPOT.unpark(atomic_key, u32::MAX); - - thread1.join().unwrap(); - thread2.join().unwrap(); - thread3.join().unwrap(); } mod parking_lot { @@ -302,47 +300,53 @@ mod tests { num_threads: u32, num_single_unparks: u32, ) { - let mut tests = Vec::with_capacity(num_latches); - - for _ in 0..num_latches { - let test = Arc::new(SingleLatchTest::new(num_threads)); - let mut threads = Vec::with_capacity(num_threads as _); - for _ in 0..num_threads { - let test = test.clone(); - threads.push(thread::spawn(move || test.run())); + let spot = ParkingSpot::default(); + + thread::scope(|s| { + let mut tests = Vec::with_capacity(num_latches); + + for _ in 0..num_latches { + let test = Arc::new(SingleLatchTest::new(num_threads, &spot)); + let mut threads = Vec::with_capacity(num_threads as _); + for _ in 0..num_threads { + let test = test.clone(); + threads.push(s.spawn(move || test.run())); + } + tests.push((test, threads)); } - tests.push((test, threads)); - } - for unpark_index in 0..num_single_unparks { - thread::sleep(delay); - for (test, _) in &tests { - test.unpark_one(unpark_index); + for unpark_index in 0..num_single_unparks { + thread::sleep(delay); + for (test, _) in &tests { + test.unpark_one(unpark_index); + } } - } - for (test, threads) in tests { - test.finish(num_single_unparks); - for thread in threads { - thread.join().expect("Test thread panic"); + for (test, threads) in tests { + test.finish(num_single_unparks); + for thread in threads { + thread.join().expect("Test thread panic"); + } } - } + }); } - struct SingleLatchTest { + struct SingleLatchTest<'a> { semaphore: AtomicIsize, num_awake: AtomicU32, /// Total number of threads participating in this test. num_threads: u32, + spot: &'a ParkingSpot, } - impl SingleLatchTest { - pub fn new(num_threads: u32) -> Self { + impl<'a> SingleLatchTest<'a> { + pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self { Self { // This implements a fair (FIFO) semaphore, and it starts out unavailable. semaphore: AtomicIsize::new(0), num_awake: AtomicU32::new(0), num_threads, + spot, } } @@ -373,14 +377,14 @@ mod tests { // still be threads that has not yet parked. while num_threads_left > 0 { let mut num_waiting_on_address = 0; - PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| { + self.spot.with_lot(self.semaphore_addr(), |thread_data| { num_waiting_on_address = thread_data.num_parked; }); assert!(num_waiting_on_address <= num_threads_left); let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); - let num_unparked = PARKING_SPOT.unpark(self.semaphore_addr(), u32::MAX); + let num_unparked = self.spot.unpark(self.semaphore_addr(), u32::MAX); assert!(num_unparked >= num_waiting_on_address); assert!(num_unparked <= num_threads_left); @@ -398,7 +402,7 @@ mod tests { // Make sure no thread is parked on our semaphore address let mut num_waiting_on_address = 0; - PARKING_SPOT.with_lot(self.semaphore_addr(), |thread_data| { + self.spot.with_lot(self.semaphore_addr(), |thread_data| { num_waiting_on_address = thread_data.num_parked; }); assert_eq!(num_waiting_on_address, 0); @@ -414,7 +418,7 @@ mod tests { // We need to wait. let validate = || true; - PARKING_SPOT.park(self.semaphore_addr(), validate, None); + self.spot.park(self.semaphore_addr(), validate, None); } pub fn up(&self) { @@ -426,7 +430,7 @@ mod tests { // the thread we want to pass ownership to has decremented the semaphore counter, // but not yet parked. loop { - match PARKING_SPOT.unpark(self.semaphore_addr(), 1) { + match self.spot.unpark(self.semaphore_addr(), 1) { 1 => break, 0 => (), i => panic!("Should not wake up {i} threads"), From 866105254c9b718a77a6cd24815b7803665a4de8 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 23 Feb 2023 14:38:59 -0800 Subject: [PATCH 2/3] Fix a panic due to a race in `unpark` and `park` This commit fixes a panic in the `ParkingSpot` implementation where an `unpark` signal may not get acknowledged when a waiter times out, causing the waiter to remove itself from the internal map but panic thinking that it missed an unpark signal. The fix in this commit is to consume unpark signals when a timeout happens. This can lead to another possible race I've detailed in the comments which I believe is allowed by the specification of park/unpark in wasm. --- crates/runtime/src/parking_spot.rs | 86 +++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/crates/runtime/src/parking_spot.rs b/crates/runtime/src/parking_spot.rs index 4fb6e9c42b31..d771e6a4af1f 100644 --- a/crates/runtime/src/parking_spot.rs +++ b/crates/runtime/src/parking_spot.rs @@ -104,17 +104,53 @@ impl ParkingSpot { let spot = inner.get_mut(&key).expect("failed to get spot"); if timed_out { - if let Some(timeout) = timeout { - if Instant::now() < timeout { - // Did not sleep long enough, try again. - continue; - } + // If waiting on the cvar timed out then due to how system cvars + // are implemented we may need to continue to sleep longer. If + // the deadline has not been reached then turn the crank again + // and go back to sleep. + if Instant::now() < timeout.unwrap() { + continue; + } + + // Opportunistically consume `to_unpark` signals even on + // timeout. From the perspective of `unpark` this "agent" raced + // between its own timeout and receiving the unpark signal, but + // from unpark's perspective it's definitely going to wake up N + // agents as returned from the `unpark` return value. + // + // Note that this may actually prevent other threads from + // getting unaprked. For example: + // + // * Thread A parks with a timeout + // * Thread B parks with no timeout + // * Thread C decides to unpark 1 thread + // * Thread A's cvar wakes up due to a timeout, blocks on the + // lock + // * Thread C finishes unpark and signals the cvar once + // * Thread B wakes up + // * Thread A and B contend for the lock and A wins + // * A consumes the "to_unpark" value + // * B goes back to sleep since `to_unpark == 0`, thinking that + // a spurious wakeup happened. + // + // It's believed that this is ok, however, since from C's + // perspective one agent was still woken up and is allowed to + // continue, notably A in this case. C doesn't know that A raced + // with B and "stole" its wakeup signal. + if spot.to_unpark > 0 { + spot.to_unpark -= 1; } } else { if spot.to_unpark == 0 { + // If no timeout happen but nothing has unparked this spot (as + // signaled through `to_unpark`) then this is indicative of a + // spurious wakeup. In this situation turn the crank again and + // go back to sleep as this interface doesn't allow for spurious + // wakeups. continue; } - + // No timeout happened, and some other thread registered to + // unpark this thread, so consume one unpark notification. spot.to_unpark -= 1; } @@ -444,4 +480,42 @@ mod tests { } } } + + #[test] + fn wait_with_timeout() { + let parking_spot = &ParkingSpot::default(); + let atomic = &AtomicU64::new(0); + + thread::scope(|s| { + let atomic_key = addr_of!(atomic) as u64; + + const N: u64 = 5; + const M: u64 = 1000; + + let thread = s.spawn(move || { + while atomic.load(Ordering::SeqCst) != N * M { + let timeout = Instant::now() + Duration::from_millis(1); + parking_spot.park( + atomic_key, + || atomic.load(Ordering::SeqCst) != N * M, + Some(timeout), + ); + } + }); + + let mut threads = vec![thread]; + for _ in 0..N { + threads.push(s.spawn(move || { + for _ in 0..M { + atomic.fetch_add(1, Ordering::SeqCst); + parking_spot.unpark(atomic_key, 1); + } + })); + } + + for thread in threads { + thread.join().unwrap(); + } + }); + } } From 64161ca2513112984465bf1b1b8165caa1462160 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 23 Feb 2023 16:52:52 -0600 Subject: [PATCH 3/3] Update crates/runtime/src/parking_spot.rs Co-authored-by: Andrew Brown --- crates/runtime/src/parking_spot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/runtime/src/parking_spot.rs b/crates/runtime/src/parking_spot.rs index d771e6a4af1f..33f9bce0ff80 100644 --- a/crates/runtime/src/parking_spot.rs +++ b/crates/runtime/src/parking_spot.rs @@ -119,7 +119,7 @@ impl ParkingSpot { // agents as returned from the `unpark` return value. // // Note that this may actually prevent other threads from - // getting unaprked. For example: + // getting unparked. For example: // // * Thread A parks with a timeout // * Thread B parks with no timeout