From e73e6a8d8733b92018596ca2cae391a893e69312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Tue, 24 Mar 2026 09:44:15 +0100 Subject: [PATCH] thread: add `pthread`-based threading module Parametrizable threading module. --- Cargo.lock | 23 ++ Cargo.toml | 9 + src/pal/BUILD | 35 +++ src/pal/Cargo.toml | 28 ++ src/pal/affinity.rs | 330 ++++++++++++++++++++++++ src/pal/errno.rs | 26 ++ src/pal/lib.rs | 49 ++++ src/thread/BUILD | 34 +++ src/thread/Cargo.toml | 27 ++ src/thread/lib.rs | 20 ++ src/thread/parameters.rs | 211 +++++++++++++++ src/thread/thread.rs | 541 +++++++++++++++++++++++++++++++++++++++ 12 files changed, 1333 insertions(+) create mode 100644 src/pal/BUILD create mode 100644 src/pal/Cargo.toml create mode 100644 src/pal/affinity.rs create mode 100644 src/pal/errno.rs create mode 100644 src/pal/lib.rs create mode 100644 src/thread/BUILD create mode 100644 src/thread/Cargo.toml create mode 100644 src/thread/lib.rs create mode 100644 src/thread/parameters.rs create mode 100644 src/thread/thread.rs diff --git a/Cargo.lock b/Cargo.lock index 33d4cd57..f7ce2640 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,6 +10,12 @@ version = "0.1.0" name = "elementary" version = "0.0.1" +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + [[package]] name = "log_builtin" version = "0.0.1" @@ -25,6 +31,15 @@ dependencies = [ "score_log", ] +[[package]] +name = "pal" +version = "0.0.1" +dependencies = [ + "containers", + "libc", + "score_log", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -100,6 +115,14 @@ dependencies = [ "elementary", ] +[[package]] +name = "thread" +version = "0.0.1" +dependencies = [ + "pal", + "score_log", +] + [[package]] name = "unicode-ident" version = "1.0.22" diff --git a/Cargo.toml b/Cargo.toml index 8aa01c8d..da56e5b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,8 @@ default-members = [ "src/containers", "src/sync", "src/elementary", + "src/pal", + "src/thread", "src/log/score_log_fmt_macro", "src/log/stdout_logger", "src/testing_macros", @@ -28,6 +30,8 @@ members = [ "src/containers", "src/sync", "src/elementary", + "src/pal", + "src/thread", "src/log/score_log", "src/log/score_log_fmt", "src/log/score_log_fmt_macro", @@ -44,12 +48,17 @@ license-file = "LICENSE.md" authors = ["S-CORE Contributors"] [workspace.dependencies] +libc = "0.2.177" + +containers = { path = "src/containers" } score_log = { path = "src/log/score_log" } score_log_fmt = { path = "src/log/score_log_fmt" } score_log_fmt_macro = { path = "src/log/score_log_fmt_macro" } stdout_logger = { path = "src/log/stdout_logger" } elementary = { path = "src/elementary" } testing_macros = { path = "src/testing_macros" } +pal = { path = "src/pal" } +thread = { path = "src/thread" } [workspace.lints.clippy] std_instead_of_core = "warn" diff --git a/src/pal/BUILD b/src/pal/BUILD new file mode 100644 index 00000000..61e41e75 --- /dev/null +++ b/src/pal/BUILD @@ -0,0 +1,35 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") + +rust_library( + name = "pal", + srcs = glob(["**/*.rs"]), + edition = "2021", + visibility = ["//src:__subpackages__"], + deps = [ + "//src/containers", + "//src/log/score_log", + "@score_crates//:libc", + ], +) + +rust_test( + name = "tests", + crate = "pal", + tags = [ + "unit_tests", + "ut", + ], +) diff --git a/src/pal/Cargo.toml b/src/pal/Cargo.toml new file mode 100644 index 00000000..b093c5bc --- /dev/null +++ b/src/pal/Cargo.toml @@ -0,0 +1,28 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +[package] +name = "pal" +description = "Minimal POSIX adaptation layer." +version.workspace = true +authors.workspace = true +readme.workspace = true +edition.workspace = true + +[lib] +path = "lib.rs" + +[dependencies] +libc.workspace = true +containers.workspace = true +score_log.workspace = true diff --git a/src/pal/affinity.rs b/src/pal/affinity.rs new file mode 100644 index 00000000..3c221090 --- /dev/null +++ b/src/pal/affinity.rs @@ -0,0 +1,330 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Affinity handling differs between Linux and QNX. +//! Module ensures similar behavior between both OSes. + +use crate::errno; +use containers::fixed_capacity::FixedCapacityVec; +use score_log::ScoreDebug; + +#[cfg(target_os = "linux")] +use libc::{cpu_set_t, sched_getaffinity, sched_setaffinity}; + +#[cfg(target_os = "nto")] +use libc::{ThreadCtl, _NTO_TCTL_RUNMASK_GET_AND_SET_INHERIT}; + +const MAX_CPU_NUM: usize = 1024; +const CPU_MASK_SIZE: usize = MAX_CPU_NUM / (u8::BITS as usize); + +/// Common CPU set representation. +/// Limited to 1024 CPUs. +#[derive(Clone, Copy, Debug, ScoreDebug, PartialEq, Eq)] +pub struct CpuSet { + mask: [u8; CPU_MASK_SIZE], +} + +impl CpuSet { + /// Create a new CPU set. + pub fn new(affinity: &[usize]) -> Self { + let mask = Self::create_mask(affinity); + Self { mask } + } + + /// Create mask based on provided list. + fn create_mask(affinity: &[usize]) -> [u8; CPU_MASK_SIZE] { + let mut mask = [0u8; _]; + for cpu_id in affinity.iter().copied() { + const MAX_CPU_ID: usize = MAX_CPU_NUM - 1; + assert!( + cpu_id <= MAX_CPU_ID, + "CPU ID provided to affinity exceeds max supported size, provided: {cpu_id}, max: {MAX_CPU_ID}" + ); + + let index = cpu_id / 8; + let offset = cpu_id % 8; + mask[index] |= 1 << offset; + } + + mask + } + + /// Create list based on provided mask. + fn create_list(mask: &[u8; CPU_MASK_SIZE]) -> FixedCapacityVec { + let mut list = FixedCapacityVec::new(MAX_CPU_NUM); + for cpu_id in 0..MAX_CPU_NUM { + let index = cpu_id / 8; + let offset = cpu_id % 8; + + if (mask[index] & (1 << offset)) != 0 { + // Error should not occur, since capacity is matching the mask size. + list.push(cpu_id).expect("failed to push CPU ID to the list"); + } + } + + list + } + + pub fn set(&mut self, affinity: &[usize]) { + self.mask = Self::create_mask(affinity); + } + + pub fn get(&self) -> FixedCapacityVec { + Self::create_list(&self.mask) + } +} + +#[cfg(target_os = "linux")] +impl From for CpuSet { + fn from(value: cpu_set_t) -> Self { + assert!( + core::mem::size_of::() == CPU_MASK_SIZE, + "unsupported native CPU mask size" + ); + + // SAFETY: CPU mask layout and size is ensured. + let mask: [u8; CPU_MASK_SIZE] = unsafe { core::mem::transmute(value) }; + Self { mask } + } +} + +#[cfg(target_os = "linux")] +impl From for cpu_set_t { + fn from(value: CpuSet) -> Self { + assert!( + core::mem::size_of::() == CPU_MASK_SIZE, + "unsupported native CPU mask size" + ); + + // SAFETY: CPU mask layout and size is ensured. + unsafe { core::mem::transmute(value.mask) } + } +} + +/// QNX representation of a CPU set. +/// +/// Number of CPUs is restricted to 1024 - same as for Linux. +/// QNX docs recommend the following: +/// - read the number of CPUs from `_syspage_ptr->num_cpu` +/// - allocate mask fields dynamically +/// +/// Current approach avoids dynamic alloc and aligns the behavior with Linux. +/// +/// Refer to QNX docs for more information: +/// https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/t/threadctl.html +#[cfg(target_os = "nto")] +#[repr(C)] +#[derive(Clone, Copy)] +struct NtoCpuSet { + // Expected to always be set to `32` - see comment above. + size: i32, + run_mask: [u32; 32], + // Expected to always be zeroed - left unaltered. + inherit_mask: [u32; 32], +} + +#[cfg(target_os = "nto")] +impl NtoCpuSet { + fn new(mask: [u32; 32]) -> Self { + Self { + size: 32, + run_mask: mask, + inherit_mask: [0; 32], + } + } +} + +#[cfg(target_os = "nto")] +impl From for CpuSet { + fn from(value: NtoCpuSet) -> Self { + // SAFETY: CPU mask layout and size is ensured. + let mask = unsafe { core::mem::transmute(value.run_mask) }; + Self { mask } + } +} + +#[cfg(target_os = "nto")] +impl From for NtoCpuSet { + fn from(value: CpuSet) -> Self { + // SAFETY: CPU mask layout and size is ensured. + let run_mask = unsafe { core::mem::transmute(value.mask) }; + Self::new(run_mask) + } +} + +/// Set affinity of a current thread. +pub fn set_affinity(cpu_set: CpuSet) { + #[cfg(target_os = "linux")] + { + let native_cpu_set = cpu_set_t::from(cpu_set); + let cpu_set_size = core::mem::size_of::(); + // SAFETY: + // Native CPU set is properly initialized. + // Pointer is ensured to be valid. + let rc = unsafe { sched_setaffinity(0, cpu_set_size, &native_cpu_set) }; + if rc != 0 { + let errno = errno(); + panic!("sched_setaffinity failed, rc: {rc}, errno: {errno}"); + } + } + + #[cfg(target_os = "nto")] + { + let mut native_cpu_set = NtoCpuSet::from(cpu_set); + // SAFETY: + // Native CPU set is properly initialized. + // `NtoCpuSet` layout must be as expected by `ThreadCtl`. + // Pointer is ensured to be valid. + let rc = unsafe { + ThreadCtl( + _NTO_TCTL_RUNMASK_GET_AND_SET_INHERIT as crate::c_int, + (&mut native_cpu_set as *mut NtoCpuSet).cast(), + ) + }; + if rc != 0 { + let errno = errno(); + panic!("ThreadCtl failed, rc: {rc}, errno: {errno}"); + } + } +} + +/// Get affinity of a current thread. +pub fn get_affinity() -> FixedCapacityVec { + #[cfg(target_os = "linux")] + { + let mut native_cpu_set = core::mem::MaybeUninit::zeroed(); + let cpu_set_size = core::mem::size_of::(); + // SAFETY: + // Provided native CPU set is zeroed, then initialized by a `sched_getaffinity` call. + // Pointer is ensured to be valid. + let rc = unsafe { sched_getaffinity(0, cpu_set_size, native_cpu_set.as_mut_ptr()) }; + if rc != 0 { + let errno = errno(); + panic!("sched_getaffinity failed, rc: {rc}, errno: {errno}"); + } + + // SAFETY: refer to a comment above. + let cpu_set = CpuSet::from(unsafe { native_cpu_set.assume_init() }); + cpu_set.get() + } + + #[cfg(target_os = "nto")] + { + let mut native_cpu_set = NtoCpuSet::new([0; _]); + // SAFETY: + // Native CPU set is properly initialized. + // `NtoCpuSet` layout must be as expected by `ThreadCtl`. + // Pointer is ensured to be valid. + let rc = unsafe { + ThreadCtl( + _NTO_TCTL_RUNMASK_GET_AND_SET_INHERIT as crate::c_int, + (&mut native_cpu_set as *mut NtoCpuSet).cast(), + ) + }; + if rc != 0 { + let errno = errno(); + panic!("ThreadCtl failed, rc: {rc}, errno: {errno}"); + } + + let cpu_set = CpuSet::from(native_cpu_set); + cpu_set.get() + } +} + +#[cfg(test)] +mod tests { + use crate::affinity::{get_affinity, set_affinity, CpuSet, MAX_CPU_NUM}; + use std::sync::mpsc::channel; + + #[test] + fn cpu_set_new_empty_succeeds() { + let cpu_set = CpuSet::new(&[]); + assert!(cpu_set.mask.iter().all(|x| *x == 0)); + } + + #[test] + fn cpu_set_new_some_succeeds() { + let cpu_set = CpuSet::new(&[0, 123, 1023]); + let mut data_vec = cpu_set.mask.to_vec(); + + // Test removes from `Vec`, start from the end. + assert_eq!(data_vec.remove(127), 0x80); + assert_eq!(data_vec.remove(15), 0x08); + assert_eq!(data_vec.remove(0), 0x01); + } + + #[test] + fn cpu_set_new_full_succeeds() { + let all_ids: Vec<_> = (0..MAX_CPU_NUM).collect(); + let cpu_set = CpuSet::new(&all_ids); + assert!(cpu_set.mask.iter().all(|x| *x == 0xFF)); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1024, max: 1023")] + fn cpu_set_new_out_of_range() { + let _ = CpuSet::new(&[0, 123, 1023, 1024]); + } + + #[test] + fn cpu_set_set_succeeds() { + let mut cpu_set = CpuSet::new(&[]); + cpu_set.set(&[0, 123, 1023]); + let mut data_vec = cpu_set.mask.to_vec(); + + // Test removes from `Vec`, start from the end. + assert_eq!(data_vec.remove(127), 0x80); + assert_eq!(data_vec.remove(15), 0x08); + assert_eq!(data_vec.remove(0), 0x01); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1024, max: 1023")] + fn cpu_set_set_out_of_range() { + let mut cpu_set = CpuSet::new(&[]); + cpu_set.set(&[0, 123, 1023, 1024]); + } + + #[test] + fn cpu_set_get_succeeds() { + let exp = vec![0, 123, 1023]; + let cpu_set = CpuSet::new(&exp); + let got = cpu_set.get(); + assert_eq!(exp, got.iter().copied().collect::>()); + } + + #[test] + fn set_affinity_succeeds() { + let exp_affinity = vec![0]; + let cpu_set = CpuSet::new(&exp_affinity); + let (tx, rx) = channel(); + let join_handle = std::thread::spawn(move || { + set_affinity(cpu_set); + tx.send(get_affinity()).unwrap(); + }); + join_handle.join().unwrap(); + + assert_eq!(rx.recv().unwrap().iter().copied().collect::>(), exp_affinity); + } + + #[test] + fn set_affinity_out_of_range() { + // Assuming test is not running on a 1000-core machine, still within valid CPU mask. + let cpu_set = CpuSet::new(&[1000]); + let join_handle = std::thread::spawn(move || { + set_affinity(cpu_set); + }); + let result = join_handle.join(); + assert!(result.is_err()); + } +} diff --git a/src/pal/errno.rs b/src/pal/errno.rs new file mode 100644 index 00000000..2de557b4 --- /dev/null +++ b/src/pal/errno.rs @@ -0,0 +1,26 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Unified `errno` access. + +#[cfg(target_os = "linux")] +use libc::__errno_location as errno_ptr; + +#[cfg(target_os = "nto")] +use libc::__get_errno_ptr as errno_ptr; + +/// Current errno value. +pub fn errno() -> crate::c_int { + // SAFETY: `errno_ptr` returns a pointer to a thread-local variable. + unsafe { *errno_ptr() } +} diff --git a/src/pal/lib.rs b/src/pal/lib.rs new file mode 100644 index 00000000..e2615181 --- /dev/null +++ b/src/pal/lib.rs @@ -0,0 +1,49 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Minimal POSIX adaptation layer. + +mod affinity; +mod errno; + +pub use affinity::{get_affinity, set_affinity, CpuSet}; +pub use errno::errno; + +pub use libc::{ + c_int, c_ulong, c_void, pid_t, pthread_attr_destroy, pthread_attr_getstacksize, pthread_attr_init, + pthread_attr_setstacksize, pthread_attr_t, pthread_create, pthread_getschedparam, pthread_join, pthread_self, + pthread_setschedparam, pthread_t, sched_get_priority_max, sched_get_priority_min, sched_param, SCHED_FIFO, + SCHED_OTHER, SCHED_RR, +}; + +#[cfg(target_os = "linux")] +pub use libc::{ + pthread_attr_getinheritsched, pthread_attr_getschedparam, pthread_attr_getschedpolicy, + pthread_attr_setinheritsched, pthread_attr_setschedparam, pthread_attr_setschedpolicy, PTHREAD_EXPLICIT_SCHED, + PTHREAD_INHERIT_SCHED, +}; + +#[cfg(target_os = "nto")] +pub const PTHREAD_INHERIT_SCHED: c_int = 0; +#[cfg(target_os = "nto")] +pub const PTHREAD_EXPLICIT_SCHED: c_int = 1; + +#[cfg(target_os = "nto")] +extern "C" { + pub fn pthread_attr_getinheritsched(attr: *const pthread_attr_t, inheritsched: *mut c_int) -> c_int; + pub fn pthread_attr_setinheritsched(attr: *mut pthread_attr_t, inheritsched: c_int) -> c_int; + pub fn pthread_attr_getschedparam(attr: *const pthread_attr_t, param: *mut sched_param) -> c_int; + pub fn pthread_attr_setschedparam(attr: *mut pthread_attr_t, param: *const sched_param) -> c_int; + pub fn pthread_attr_getschedpolicy(attr: *const pthread_attr_t, policy: *mut c_int) -> c_int; + pub fn pthread_attr_setschedpolicy(attr: *mut pthread_attr_t, policy: c_int) -> c_int; +} diff --git a/src/thread/BUILD b/src/thread/BUILD new file mode 100644 index 00000000..0b2b958c --- /dev/null +++ b/src/thread/BUILD @@ -0,0 +1,34 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") + +rust_library( + name = "thread", + srcs = glob(["**/*.rs"]), + edition = "2021", + visibility = ["//visibility:public"], + deps = [ + "//src/log/score_log", + "//src/pal", + ], +) + +rust_test( + name = "tests", + crate = "thread", + tags = [ + "unit_tests", + "ut", + ], +) diff --git a/src/thread/Cargo.toml b/src/thread/Cargo.toml new file mode 100644 index 00000000..8013db41 --- /dev/null +++ b/src/thread/Cargo.toml @@ -0,0 +1,27 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +[package] +name = "thread" +description = "`pthread`-based parametrizable threading module." +version.workspace = true +authors.workspace = true +readme.workspace = true +edition.workspace = true + +[lib] +path = "lib.rs" + +[dependencies] +pal.workspace = true +score_log.workspace = true diff --git a/src/thread/lib.rs b/src/thread/lib.rs new file mode 100644 index 00000000..aebd4f2e --- /dev/null +++ b/src/thread/lib.rs @@ -0,0 +1,20 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! `pthread`-based parametrizable threading module. + +mod parameters; +mod thread; + +pub use parameters::{SchedulerParameters, SchedulerPolicy, ThreadParameters}; +pub use thread::{spawn, JoinHandle, Result}; diff --git a/src/thread/parameters.rs b/src/thread/parameters.rs new file mode 100644 index 00000000..ff86f2ca --- /dev/null +++ b/src/thread/parameters.rs @@ -0,0 +1,211 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use pal::CpuSet; +use score_log::ScoreDebug; + +/// Scheduler policy. +#[repr(i32)] +#[derive(Clone, Copy, Debug, ScoreDebug, PartialEq, Eq)] +pub enum SchedulerPolicy { + Other = pal::SCHED_OTHER, + Fifo = pal::SCHED_FIFO, + RoundRobin = pal::SCHED_RR, +} + +impl SchedulerPolicy { + /// Get min thread priority for this policy. + pub fn priority_min(&self) -> i32 { + let policy_native = *self as i32; + // SAFETY: + // Native policy value is ensured. + // Operation is non-fallible. + unsafe { pal::sched_get_priority_min(policy_native) } + } + + /// Get max thread priority for this policy. + pub fn priority_max(&self) -> i32 { + let policy_native = *self as i32; + // SAFETY: + // Native policy value is ensured. + // Operation is non-fallible. + unsafe { pal::sched_get_priority_max(policy_native) } + } +} + +/// Indicates that provided scheduler policy is unknown or unsupported. +#[derive(Clone, Copy, Default, Debug)] +pub struct UnknownSchedulerPolicy; + +impl core::fmt::Display for UnknownSchedulerPolicy { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "unknown or unsupported scheduler policy") + } +} + +impl score_log::fmt::ScoreDebug for UnknownSchedulerPolicy { + fn fmt(&self, f: score_log::fmt::Writer, _spec: &score_log::fmt::FormatSpec) -> score_log::fmt::Result { + score_log::fmt::score_write!(f, "unknown or unsupported scheduler policy") + } +} + +impl core::error::Error for UnknownSchedulerPolicy {} + +impl TryFrom for SchedulerPolicy { + type Error = UnknownSchedulerPolicy; + + fn try_from(value: i32) -> Result { + match value { + pal::SCHED_OTHER => Ok(SchedulerPolicy::Other), + pal::SCHED_FIFO => Ok(SchedulerPolicy::Fifo), + pal::SCHED_RR => Ok(SchedulerPolicy::RoundRobin), + _ => Err(UnknownSchedulerPolicy), + } + } +} + +/// Scheduler parameters. +#[derive(Clone, Copy, Debug, ScoreDebug, PartialEq, Eq)] +pub struct SchedulerParameters { + policy: SchedulerPolicy, + priority: i32, +} + +impl SchedulerParameters { + /// Create a new [`SchedulerParameters`]. + /// + /// # Panics + /// + /// Priority must be in allowed range for the scheduler policy. + pub fn new(policy: SchedulerPolicy, priority: i32) -> Self { + let allowed_priority_range = policy.priority_min()..=policy.priority_max(); + if !allowed_priority_range.contains(&priority) { + panic!("priority is not in allowed range for the scheduler policy") + } + + Self { policy, priority } + } + + /// Scheduler policy. + pub fn policy(&self) -> SchedulerPolicy { + self.policy + } + + /// Thread priority. + pub fn priority(&self) -> i32 { + self.priority + } +} + +/// Thread parameters. +#[derive(Clone, Default, Debug, ScoreDebug, PartialEq, Eq)] +pub struct ThreadParameters { + pub(crate) scheduler_parameters: Option, + pub(crate) affinity: Option, + pub(crate) stack_size: Option, +} + +impl ThreadParameters { + /// Create a new [`ThreadParameters`] containing default values. + pub fn new() -> Self { + Self::default() + } + + /// Scheduler parameters, including scheduler policy and thread priority. + pub fn scheduler_parameters(mut self, scheduler_parameters: SchedulerParameters) -> Self { + self.scheduler_parameters = Some(scheduler_parameters); + self + } + + /// Set thread affinity - array of CPU core IDs that the thread can run on. + pub fn affinity(mut self, affinity: &[usize]) -> Self { + self.affinity = Some(CpuSet::new(affinity)); + self + } + + /// Set stack size. + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } +} + +#[cfg(all(test, not(miri)))] +mod tests { + use crate::parameters::{SchedulerParameters, SchedulerPolicy}; + use crate::ThreadParameters; + use pal::CpuSet; + + #[test] + fn scheduler_policy_min_max_priority() { + let policy = SchedulerPolicy::Fifo; + assert_eq!(policy.priority_min(), 1); + assert_eq!(policy.priority_max(), 99); + } + + #[test] + fn scheduler_policy_from_i32_succeeds() { + let policy_as_int = SchedulerPolicy::Other as i32; + let policy = SchedulerPolicy::try_from(policy_as_int); + assert!(policy.is_ok_and(|p| p == SchedulerPolicy::Other)); + } + + #[test] + fn scheduler_policy_from_i32_unknown() { + let result = SchedulerPolicy::try_from(123); + assert!(result.is_err()) + } + + #[test] + fn scheduler_parameters_new_succeeds() { + let policy = SchedulerPolicy::Fifo; + let priority = 40; + let params = SchedulerParameters::new(policy, priority); + assert_eq!(params.policy(), policy); + assert_eq!(params.priority(), priority); + } + + #[test] + #[should_panic(expected = "priority is not in allowed range for the scheduler policy")] + fn scheduler_parameters_new_priority_out_of_range() { + let policy = SchedulerPolicy::Other; + let priority = 1; + let _ = SchedulerParameters::new(policy, priority); + } + + #[test] + fn thread_parameters_new_succeeds() { + let new_tp = ThreadParameters::new(); + let def_tp = ThreadParameters::default(); + + assert_eq!(new_tp.scheduler_parameters, None); + assert_eq!(new_tp.affinity, None); + assert_eq!(new_tp.stack_size, None); + assert_eq!(new_tp, def_tp); + } + + #[test] + fn thread_parameters_parameters_succeeds() { + let exp_scheduler_parameters = SchedulerParameters::new(SchedulerPolicy::Fifo, 50); + let exp_affinity = vec![1, 2, 3]; + let exp_stack_size = 1024 * 1024; + let thread_parameters = ThreadParameters::new() + .scheduler_parameters(exp_scheduler_parameters) + .affinity(&exp_affinity) + .stack_size(exp_stack_size); + + assert_eq!(thread_parameters.scheduler_parameters, Some(exp_scheduler_parameters)); + assert_eq!(thread_parameters.affinity, Some(CpuSet::new(&exp_affinity))); + assert_eq!(thread_parameters.stack_size, Some(exp_stack_size)); + } +} diff --git a/src/thread/thread.rs b/src/thread/thread.rs new file mode 100644 index 00000000..cc9754be --- /dev/null +++ b/src/thread/thread.rs @@ -0,0 +1,541 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::parameters::{SchedulerPolicy, ThreadParameters}; +use core::cell::OnceCell; +use core::mem::MaybeUninit; +use core::ptr::null_mut; +use core::sync::atomic::{fence, Ordering}; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::sync::Arc; + +/// `pthread` attributes object. +struct Attributes { + attr_handle: pal::pthread_attr_t, +} + +impl Attributes { + /// Create `pthread` attributes object. + fn new() -> Self { + let mut attr = MaybeUninit::uninit(); + // SAFETY: initializes `attr`, pointer is ensured to be valid. + let rc = unsafe { pal::pthread_attr_init(attr.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_init failed, rc: {rc}"); + + // SAFETY: `attr` is initialized by a `pthread_attr_init` call. + let attr_handle = unsafe { attr.assume_init() }; + Self { attr_handle } + } + + /// Pointer to mutable internal handle. + fn ptr(&mut self) -> *mut pal::pthread_attr_t { + // SAFETY: + // Handle is initialized during object construction. + // Pointer is ensured to be valid. + &mut self.attr_handle + } + + /// Set inherit scheduling attributes. + fn inherit_scheduling_attributes(&mut self, inherit: bool) { + let inherit_native = if inherit { + pal::PTHREAD_INHERIT_SCHED + } else { + pal::PTHREAD_EXPLICIT_SCHED + }; + + // SAFETY: value is ensured to be valid. + let rc = unsafe { pal::pthread_attr_setinheritsched(self.ptr(), inherit_native) }; + assert!(rc == 0, "pthread_attr_setinheritsched failed, rc: {rc}"); + } + + /// Set thread priority. + fn priority(&mut self, priority: i32) { + let mut params = MaybeUninit::uninit(); + // Create `sched_param` struct. + // SAFETY: initializes `params`, pointer is ensured to be valid. + let rc = unsafe { pal::pthread_attr_getschedparam(self.ptr(), params.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_getschedparam failed, rc: {rc}"); + + // Store and modify `sched_param` struct. + // SAFETY: `params` is initialized by a `pthread_attr_getschedparam` call. + let mut params = unsafe { params.assume_init() }; + params.sched_priority = priority; + + // Set modified `sched_param`. + // SAFETY: + // `params` is initialized by a `pthread_attr_getschedparam` call. + // Pointer is ensured to be valid. + let rc = unsafe { pal::pthread_attr_setschedparam(self.ptr(), ¶ms) }; + assert!(rc == 0, "pthread_attr_setschedparam failed, rc: {rc}"); + } + + /// Set scheduler policy. + fn scheduler_policy(&mut self, scheduler_policy: SchedulerPolicy) { + let policy = scheduler_policy as i32; + // SAFETY: `policy` value is ensured to be valid. + let rc = unsafe { pal::pthread_attr_setschedpolicy(self.ptr(), policy) }; + assert!(rc == 0, "pthread_attr_setschedpolicy failed, rc: {rc}"); + } + + /// Set stack size. + fn stack_size(&mut self, stack_size: usize) { + // SAFETY: `stack_size` type is valid, invalid value will cause abort. + let rc = unsafe { pal::pthread_attr_setstacksize(self.ptr(), stack_size) }; + assert!(rc == 0, "pthread_attr_setstacksize failed, rc: {rc}"); + } + + /// Get reference to inner handle. + fn get(&self) -> &pal::pthread_attr_t { + &self.attr_handle + } +} + +impl Drop for Attributes { + fn drop(&mut self) { + // SAFETY: after drop handle is no longer needed and can be destructed. + let rc = unsafe { pal::pthread_attr_destroy(self.ptr()) }; + debug_assert!(rc == 0, "pthread_attr_destroy failed, rc: {rc}"); + } +} + +struct ThreadData { + f: F, +} + +/// `pthread` thread object. +struct Thread { + thread_handle: pal::pthread_t, +} + +impl Thread { + fn new(attributes: Attributes, f: F) -> Self + where + F: FnOnce() + Send + 'static, + { + let mut thread_handle = MaybeUninit::uninit(); + + // SAFETY: + // It is safe to use `extern "C"`. + // Provided callback is a wrapper containing panic handling. + extern "C" fn start_routine(data: *mut pal::c_void) -> *mut pal::c_void { + // SAFETY: + // `data` is ensured to be valid - it is boxed right before `pthread_create`. + // On `pthread_create` failure it is deallocated during error handling. + let data: Box> = unsafe { Box::from_raw(data.cast()) }; + (data.f)(); + null_mut() + } + + let data = Box::into_raw(Box::new(ThreadData { f })); + // SAFETY: + // Initializes `thread_handle`. + // Validity of all pointers is ensured. + let rc = unsafe { + pal::pthread_create( + thread_handle.as_mut_ptr(), + attributes.get(), + start_routine::, + data.cast(), + ) + }; + if rc != 0 { + // Reobtain and drop `ThreadData`. + // SAFETY: + // `data` is ensured to be valid - it is boxed right before `pthread_create`. + // On `pthread_create` success it is deallocated after `start_routine` finishes. + let _ = unsafe { Box::from_raw(data) }; + panic!("pthread_create failed, rc: {rc}"); + } + + Self { + // SAFETY: `thread_handle` is initialized by a `pthread_create` call. + thread_handle: unsafe { thread_handle.assume_init() }, + } + } +} + +/// A specialized [`Result`] type for threads. +/// Indicates the manner in which a thread exited. +pub type Result = core::result::Result>; + +/// Packet containing thread result. +/// +/// No need for a mutex because: +/// - state is set during thread. +/// - the caller will never read this packet until the thread has exited. +struct Packet(OnceCell>); + +impl Packet { + fn new() -> Self { + Self(OnceCell::new()) + } +} + +// SAFETY: +// Due to the usage of `OnceCell` manual implementation of `Sync` is required. +// The caller will never read this packet until the thread has exited. +// This is based on `std::thread` implementation. +unsafe impl Sync for Packet {} + +impl Drop for Packet { + fn drop(&mut self) { + // Make sure that panic was handled. + if let Some(result) = self.0.get() { + if result.is_err() { + panic!("unhandled panic occurred in a thread") + } + } + } +} + +/// Inner representation for [`JoinHandle`]. +pub struct JoinInner { + thread: Thread, + packet: Arc>, +} + +impl JoinInner { + fn new(thread: Thread, packet: Arc>) -> Self { + Self { thread, packet } + } + + /// Wait for the associated thread to finish. + /// + /// This function will return immediately if the associated thread has already finished. + pub fn join(mut self) -> Result { + // Perform native join. + self.join_internal(); + + // Obtain `packet` from `Arc`. + // This can only be done once thread finished. + fence(Ordering::Acquire); + let packet = Arc::get_mut(&mut self.packet).expect("thread not yet finished"); + let result = packet.0.take().expect("thread result uninitialized"); + + // Prevent destructor from running - thread is already joined. + core::mem::forget(self); + + result + } + + fn join_internal(&self) { + // Perform join. + let thread_handle = self.thread.thread_handle; + // SAFETY: `thread_handle` is ensured to be valid. + let rc = unsafe { pal::pthread_join(thread_handle, null_mut()) }; + assert!(rc == 0, "pthread_join failed, rc: {rc}"); + } +} + +impl Drop for JoinInner { + fn drop(&mut self) { + self.join_internal(); + } +} + +/// An owned permission to join on a thread (block on its termination). +/// +/// Thread is joined on [`Self::join`] and [`drop`] (if still joinable). +pub struct JoinHandle(JoinInner); + +impl JoinHandle { + /// Wait for the associated thread to finish. + /// + /// This function will return immediately if the associated thread has already finished. + pub fn join(self) -> Result { + self.0.join() + } +} + +/// Spawn a new thread, returning [`JoinHandle`] for it. +pub fn spawn(f: F, thread_parameters: ThreadParameters) -> JoinHandle +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + // Construct attributes based on provided parameters. + let mut attributes = Attributes::new(); + + if let Some(scheduler_parameters) = thread_parameters.scheduler_parameters { + attributes.inherit_scheduling_attributes(false); + attributes.scheduler_policy(scheduler_parameters.policy()); + attributes.priority(scheduler_parameters.priority()); + } + if let Some(stack_size) = thread_parameters.stack_size { + attributes.stack_size(stack_size); + } + + // Construct a wrapper containing affinity configuration and panic handling. + let packet = Arc::new(Packet::new()); + let packet_clone = packet.clone(); + let thread_wrapper = move || { + let result = catch_unwind(AssertUnwindSafe(|| { + // Set affinity. + if let Some(cpu_set) = thread_parameters.affinity { + pal::set_affinity(cpu_set); + } + + // Execute function. + f() + })); + + // Set thread result. + if packet_clone.0.set(result).is_err() { + panic!("thread result is already set"); + } + }; + + // Create a `Thread` and place it in a `JoinHandle`. + let thread = Thread::new(attributes, thread_wrapper); + JoinHandle(JoinInner::new(thread, packet)) +} + +#[cfg(all(test, not(miri)))] +mod tests { + use crate::parameters::{SchedulerParameters, SchedulerPolicy, ThreadParameters}; + use crate::thread::{spawn, Attributes}; + use core::mem::MaybeUninit; + use pal::get_affinity; + use std::sync::mpsc::channel; + + fn attr_inherit_scheduling_attributes(attrs: &Attributes) -> bool { + let mut native = MaybeUninit::uninit(); + let rc = unsafe { pal::pthread_attr_getinheritsched(attrs.get(), native.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_getinheritsched failed, rc: {rc}"); + + match unsafe { native.assume_init() } { + pal::PTHREAD_INHERIT_SCHED => true, + pal::PTHREAD_EXPLICIT_SCHED => false, + _ => panic!("unknown inherit scheduling attributes value"), + } + } + + fn attr_priority(attrs: &Attributes) -> i32 { + let mut param_native = MaybeUninit::uninit(); + let rc = unsafe { pal::pthread_attr_getschedparam(attrs.get(), param_native.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_getschedparam failed, rc: {rc}"); + + unsafe { param_native.assume_init().sched_priority } + } + + fn attr_policy(attrs: &Attributes) -> SchedulerPolicy { + let mut policy_native = MaybeUninit::uninit(); + let rc = unsafe { pal::pthread_attr_getschedpolicy(attrs.get(), policy_native.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_getschedpolicy failed, rc: {rc}"); + + SchedulerPolicy::try_from(unsafe { policy_native.assume_init() }).unwrap() + } + + fn attr_stack_size(attrs: &Attributes) -> usize { + let mut stack_size = MaybeUninit::uninit(); + let rc = unsafe { pal::pthread_attr_getstacksize(attrs.get(), stack_size.as_mut_ptr()) }; + assert!(rc == 0, "pthread_attr_getstacksize failed, rc: {rc}"); + + unsafe { stack_size.assume_init() } + } + + #[test] + fn attributes_new_succeeds() { + // Also checks `Drop` on exit. + let _ = Attributes::new(); + } + + #[test] + fn attributes_inherit_scheduling_attributes_succeeds() { + let mut attrs = Attributes::new(); + + attrs.inherit_scheduling_attributes(true); + assert!(attr_inherit_scheduling_attributes(&attrs)); + + attrs.inherit_scheduling_attributes(false); + assert!(!attr_inherit_scheduling_attributes(&attrs)); + } + + #[test] + fn attributes_priority_succeeds() { + let mut attrs = Attributes::new(); + + attrs.scheduler_policy(SchedulerPolicy::Fifo); + attrs.priority(50); + assert_eq!(attr_priority(&attrs), 50); + } + + #[test] + #[should_panic(expected = "pthread_attr_setschedparam failed, rc:")] + fn attributes_priority_wrong_scheduler_policy() { + let mut attrs = Attributes::new(); + attrs.priority(50); + } + + #[test] + fn attributes_scheduler_policy_succeeds() { + let mut attrs = Attributes::new(); + + attrs.scheduler_policy(SchedulerPolicy::Fifo); + assert_eq!(attr_policy(&attrs), SchedulerPolicy::Fifo); + } + + #[test] + fn attributes_stack_size_succeeds() { + let mut attrs = Attributes::new(); + + let expected_stack_size = 1024 * 1024; + attrs.stack_size(expected_stack_size); + assert_eq!(attr_stack_size(&attrs), expected_stack_size); + } + + #[test] + #[should_panic(expected = "pthread_attr_setstacksize failed, rc:")] + fn attributes_stack_size_too_small() { + let mut attrs = Attributes::new(); + attrs.stack_size(4 * 1024); + } + + #[test] + fn spawn_joined_succeeds() { + let thread_parameters = ThreadParameters::default(); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + tx.send(654321).unwrap(); + 123 + }, + thread_parameters, + ); + let result = join_handle.join(); + + assert!(result.is_ok_and(|x| x == 123)); + assert_eq!(rx.recv().unwrap(), 654321) + } + + #[test] + fn spawn_not_joined_succeeds() { + let thread_parameters = ThreadParameters::default(); + let (tx, rx) = channel(); + let _ = spawn( + move || { + tx.send(654321).unwrap(); + }, + thread_parameters, + ); + + assert_eq!(rx.recv().unwrap(), 654321) + } + + #[test] + fn spawn_joined_panics() { + let thread_parameters = ThreadParameters::default(); + let join_handle = spawn( + move || { + panic!("internal panic"); + }, + thread_parameters, + ); + let result = join_handle.join(); + assert!(result.is_err()); + } + + #[test] + #[should_panic(expected = "unhandled panic occurred in a thread")] + fn spawn_not_joined_panics() { + let thread_parameters = ThreadParameters::default(); + let _ = spawn( + move || { + panic!("internal panic"); + }, + thread_parameters, + ); + } + + fn current_sched_params() -> (SchedulerPolicy, i32) { + let thread = unsafe { pal::pthread_self() }; + let mut policy = MaybeUninit::uninit(); + let mut param = MaybeUninit::uninit(); + let rc = unsafe { pal::pthread_getschedparam(thread, policy.as_mut_ptr(), param.as_mut_ptr()) }; + assert!(rc == 0, "pthread_getschedparam failed, rc: {rc}"); + + let policy_native = unsafe { policy.assume_init() }; + let scheduler_policy = match policy_native { + pal::SCHED_OTHER => SchedulerPolicy::Other, + pal::SCHED_FIFO => SchedulerPolicy::Fifo, + pal::SCHED_RR => SchedulerPolicy::RoundRobin, + _ => panic!("unknown scheduler type"), + }; + + let priority = unsafe { param.assume_init().sched_priority }; + + (scheduler_policy, priority) + } + + #[test] + #[ignore = "test requires cap_sys_nice=ep"] + fn spawn_scheduler_params_succeeds() { + let exp_scheduler_parameters = SchedulerParameters::new(SchedulerPolicy::Fifo, 10); + let thread_parameters = ThreadParameters::new().scheduler_parameters(exp_scheduler_parameters); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + let sched_params = current_sched_params(); + tx.send(sched_params).unwrap(); + }, + thread_parameters, + ); + let result = join_handle.join(); + assert!(result.is_ok()); + + let (scheduler_policy, priority) = rx.recv().unwrap(); + assert_eq!(scheduler_policy, exp_scheduler_parameters.policy()); + assert_eq!(priority, exp_scheduler_parameters.priority()); + } + + #[test] + fn spawn_affinity_succeeds() { + let exp_affinity = vec![0]; + let thread_parameters = ThreadParameters::new().affinity(&exp_affinity); + let (tx, rx) = channel(); + let join_handle = spawn( + move || { + let affinity = get_affinity(); + tx.send(affinity).unwrap(); + }, + thread_parameters, + ); + let result = join_handle.join(); + assert!(result.is_ok()); + + assert_eq!(rx.recv().unwrap().iter().copied().collect::>(), exp_affinity); + } + + #[test] + #[should_panic(expected = "CPU ID provided to affinity exceeds max supported size, provided: 1234, max: 1023")] + fn spawn_affinity_out_of_range() { + let thread_parameters = ThreadParameters::new().affinity(&[1234]); + let _ = spawn(|| {}, thread_parameters); + } + + #[test] + fn spawn_stack_size_succeeds() { + // Check that nothing fails - cannot check stack size from within. + let stack_size = 1024 * 1024; + let thread_parameters = ThreadParameters::new().stack_size(stack_size); + let join_handle = spawn( + || { + // Do nothing. + }, + thread_parameters, + ); + let result = join_handle.join(); + assert!(result.is_ok()); + } +}