Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions litebox/src/sync/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use thiserror::Error;

/// A manager of all available futexes.
///
/// Note: currently, this only supports "private" futexes, since it assumes only a single process.
/// In the future, this may be expanded to support multi-process futexes.
/// Supports both private and shared futexes. Callers provide an
/// `address_space_id` discriminator to distinguish futexes at the same virtual
/// address in different address spaces. Entries are only matched (for wake) when
/// both the address and address-space ID agree.
pub struct FutexManager<Platform: RawSyncPrimitivesProvider> {
/// Chaining hash table to map from futex address to waiter lists.
table: alloc::boxed::Box<[LoanList<Platform, FutexEntry<Platform>>; HASH_TABLE_ENTRIES]>,
Expand All @@ -41,6 +43,9 @@ const HASH_TABLE_ENTRIES: usize = 256;

struct FutexEntry<Platform: RawSyncPrimitivesProvider> {
addr: usize,
/// Opaque discriminator distinguishing address spaces. Entries with
/// different discriminators never match, even at the same virtual address.
address_space_id: u64,
waker: Waker<Platform>,
bitset: u32,
done: AtomicBool,
Expand All @@ -62,9 +67,16 @@ impl<Platform: RawSyncPrimitivesProvider + RawPointerProvider + TimeProvider>
}
}

/// Returns the hash table bucket for the given futex address.
fn bucket(&self, addr: usize) -> &LoanList<Platform, FutexEntry<Platform>> {
let hash: usize = self.hash_builder.hash_one(addr).truncate();
/// Returns the hash table bucket for the given futex key.
fn bucket(
&self,
addr: usize,
address_space_id: u64,
) -> &LoanList<Platform, FutexEntry<Platform>> {
let hash: usize = self
.hash_builder
.hash_one((addr, address_space_id))
.truncate();
&self.table[hash % HASH_TABLE_ENTRIES]
}

Expand All @@ -80,22 +92,27 @@ impl<Platform: RawSyncPrimitivesProvider + RawPointerProvider + TimeProvider>
/// If `bitset` is `Some`, then the waiter is only woken if the wake call's
/// `bitset` has a non-zero intersection with the waiter's mask. Specifying
/// `None` is equivalent to setting all bits in the mask.
///
/// `address_space_id` is an opaque discriminator that distinguishes futexes
/// at the same virtual address in different address spaces.
pub fn wait(
&self,
cx: &WaitContext<'_, Platform>,
futex_addr: Platform::RawMutPointer<u32>,
expected_value: u32,
bitset: Option<NonZeroU32>,
address_space_id: u64,
) -> Result<(), FutexError> {
let bitset = bitset.unwrap_or(ALL_BITS).get();
let addr = futex_addr.as_usize();
if !addr.is_multiple_of(align_of::<u32>()) {
return Err(FutexError::NotAligned);
}

let bucket = self.bucket(addr);
let bucket = self.bucket(addr, address_space_id);
let mut entry = pin!(LoanListEntry::new(FutexEntry {
addr,
address_space_id,
waker: cx.waker().clone(),
bitset,
done: AtomicBool::new(false),
Expand Down Expand Up @@ -131,23 +148,30 @@ impl<Platform: RawSyncPrimitivesProvider + RawPointerProvider + TimeProvider>
/// (subject to the `num_to_wake` limit). If `bitset` is `None`, then all
/// waiters are eligible to be woken.
///
/// `address_space_id` must match the value passed to the corresponding
/// [`wait`](Self::wait) call.
///
/// Returns the number of waiters that were woken up.
pub fn wake(
&self,
futex_addr: Platform::RawMutPointer<u32>,
num_to_wake_up: NonZeroU32,
bitset: Option<NonZeroU32>,
address_space_id: u64,
) -> Result<u32, FutexError> {
let addr = futex_addr.as_usize();
if !addr.is_multiple_of(align_of::<u32>()) {
return Err(FutexError::NotAligned);
}
let bitset = bitset.unwrap_or(ALL_BITS).get();
let mut woken = 0;
let bucket = self.bucket(addr);
let bucket = self.bucket(addr, address_space_id);
// Extract matching entries from the bucket until we've woken enough.
let entries = bucket.extract_if(|entry| {
if entry.addr != addr || entry.bitset & bitset == 0 {
if entry.addr != addr
|| entry.address_space_id != address_space_id
|| entry.bitset & bitset == 0
{
return core::ops::ControlFlow::Continue(false);
}
woken += 1;
Expand Down Expand Up @@ -218,7 +242,7 @@ mod tests {
barrier_clone.wait(); // Sync with main thread

// Wait for value 0
futex_manager_clone.wait(&WaitState::new(platform).context(), futex_addr, 0, None)
futex_manager_clone.wait(&WaitState::new(platform).context(), futex_addr, 0, None, 0)
});

barrier.wait(); // Wait for waiter to be ready
Expand All @@ -231,7 +255,7 @@ mod tests {
futex_word.as_ptr() as usize,
);
let woken = futex_manager
.wake(futex_addr, NonZeroU32::new(1).unwrap(), None)
.wake(futex_addr, NonZeroU32::new(1).unwrap(), None, 0)
.unwrap();

// Wait for waiter thread to complete
Expand Down Expand Up @@ -270,6 +294,7 @@ mod tests {
futex_addr,
0,
None,
0,
)
});

Expand All @@ -283,7 +308,7 @@ mod tests {
futex_word.as_ptr() as usize,
);
let woken = futex_manager
.wake(futex_addr, NonZeroU32::new(1).unwrap(), None)
.wake(futex_addr, NonZeroU32::new(1).unwrap(), None, 0)
.unwrap();

// Wait for waiter thread to complete
Expand Down Expand Up @@ -324,6 +349,7 @@ mod tests {
futex_addr,
0,
None,
0,
)
});
waiters.push(waiter);
Expand All @@ -339,7 +365,7 @@ mod tests {
futex_word.as_ptr() as usize,
);
let woken = futex_manager
.wake(futex_addr, NonZeroU32::new(u32::MAX).unwrap(), None)
.wake(futex_addr, NonZeroU32::new(u32::MAX).unwrap(), None, 0)
.unwrap();

// Wait for all waiter threads to complete
Expand Down
4 changes: 3 additions & 1 deletion litebox_shim_linux/src/syscalls/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ impl<FS: ShimFS> Task<FS> {
let Some(count) = core::num::NonZeroU32::new(count) else {
return Ok(0);
};
self.global.futex_manager.wake(addr, count, None)? as usize
self.global.futex_manager.wake(addr, count, None, 0)? as usize
}
FutexArgs::Wait {
addr,
Expand All @@ -1306,6 +1306,7 @@ impl<FS: ShimFS> Task<FS> {
addr,
val,
None,
0,
)?;
0
}
Expand Down Expand Up @@ -1333,6 +1334,7 @@ impl<FS: ShimFS> Task<FS> {
addr,
val,
core::num::NonZeroU32::new(bitmask),
0,
)?;
0
}
Expand Down
Loading