diff --git a/Cargo.lock b/Cargo.lock index 29320bdc..427d14a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.44" @@ -1623,6 +1629,18 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nix" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +dependencies = [ + "bitflags 2.11.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -3765,6 +3783,16 @@ dependencies = [ "waymark-proto", ] +[[package]] +name = "waymark-managed-process" +version = "0.1.0" +dependencies = [ + "nix", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "waymark-message-conversions" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4f86c8f2..c50d69cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ futures-core = "0.3" futures-util = "0.3" http-body-util = "0.1" mockall = "0.14" +nix = "0.31" nonempty-collections = "1" proc-macro2 = "1" proptest = "1.9" diff --git a/crates/lib/managed-process/Cargo.toml b/crates/lib/managed-process/Cargo.toml new file mode 100644 index 00000000..32769696 --- /dev/null +++ b/crates/lib/managed-process/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "waymark-managed-process" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +nix = { workspace = true, features = ["signal"] } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["process", "time"] } +tracing = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt", "time"] } diff --git a/crates/lib/managed-process/src/graceful_termination.rs b/crates/lib/managed-process/src/graceful_termination.rs new file mode 100644 index 00000000..f86de3b8 --- /dev/null +++ b/crates/lib/managed-process/src/graceful_termination.rs @@ -0,0 +1,26 @@ +use crate::Child; + +/// Error returned when graceful termination cannot be initiated. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct GracefulTerminationError { + #[cfg(not(unix))] + inner: core::convert::Infallible, + + #[cfg(unix)] + inner: crate::SendSignalError, +} + +impl Child { + pub(crate) const CAN_GRACEFULLY_TERMINATE: bool = cfg!(unix); + + pub(crate) async fn trigger_graceful_termination( + &self, + ) -> Result<(), GracefulTerminationError> { + #[cfg(unix)] + self.send_signal(nix::sys::signal::Signal::SIGTERM) + .map_err(|error| GracefulTerminationError { inner: error })?; + + Ok(()) + } +} diff --git a/crates/lib/managed-process/src/lib.rs b/crates/lib/managed-process/src/lib.rs new file mode 100644 index 00000000..4e4baade --- /dev/null +++ b/crates/lib/managed-process/src/lib.rs @@ -0,0 +1,194 @@ +//! Managed wrapper around `tokio::process::Child` with graceful shutdown helpers. + +#![warn(missing_docs)] + +#[cfg(unix)] +mod platform_unix; + +mod graceful_termination; + +pub use self::graceful_termination::*; + +#[cfg(unix)] +pub use self::platform_unix::*; + +use std::process::Stdio; + +/// A single managed child process handle. +/// +/// Will kill the child process on drop. +pub struct Child { + /// The child process. + child: tokio::process::Child, +} + +/// Spawns a managed child process. +/// +/// The child inherits `stderr` and is configured with `kill_on_drop(true)`. +/// On Unix, the child is also configured with a parent-death signal so it +/// receives `SIGTERM` when the parent process exits. +pub fn spawn(command: impl Into) -> Result { + let mut command = command.into(); + + command.stderr(Stdio::inherit()).kill_on_drop(true); + + #[cfg(unix)] + inject_sigterm_pdeathsig(&mut command); + + let child = command.spawn()?; + + tracing::debug!(pid = child.id(), "spawned child process"); + + Ok(Child { child }) +} + +/// Errors that can occur while shutting down a managed child. +#[derive(Debug, thiserror::Error)] +pub enum ShutdownError { + /// Triggering graceful termination failed. + #[error("graceful termination: {0}")] + GracefulTermination(#[source] GracefulTerminationError), + + /// Waiting after graceful termination failed. + #[error("graceful termination wait: {0}")] + GracefulTerminationWait(#[source] std::io::Error), + + /// Force-kill fallback failed. + #[error("kill: {0}")] + Kill(#[source] KillAndWaitError), +} + +/// Errors that can occur when force-killing a child and waiting for exit. +#[derive(Debug, thiserror::Error)] +pub enum KillAndWaitError { + /// Sending kill failed. + #[error("kill: {0}")] + Kill(#[source] std::io::Error), + + /// Waiting for exit failed. + #[error("wait: {0}")] + Wait(#[source] WaitWithTimeoutError), +} + +/// Errors returned by [`Child::wait_with_timeout`]. +#[derive(Debug, thiserror::Error)] +pub enum WaitWithTimeoutError { + /// Waiting for process exit failed. + #[error("wait: {0}")] + Wait(#[source] std::io::Error), + + /// The timeout elapsed before the process exited. + #[error("process didn't exit in time")] + Timeout { + /// Elapsed timeout duration. + elapsed: std::time::Duration, + }, +} + +impl Child { + /// Waits for the child process to exit. + pub async fn wait(&mut self) -> Result { + self.child.wait().await + } + + /// Waits for the child process to exit up to `timeout`. + /// + /// Returns [`WaitWithTimeoutError::Timeout`] when the timeout expires. + pub async fn wait_with_timeout( + &mut self, + timeout: std::time::Duration, + ) -> Result { + let timeout_result = tokio::time::timeout(timeout, self.wait()).await; + + match timeout_result { + // Process exited. + Ok(Ok(exit_code)) => Ok(exit_code), + + // `wait()` errored. + Ok(Err(error)) => Err(WaitWithTimeoutError::Wait(error)), + + // Timeout. + Err(tokio::time::error::Elapsed { .. }) => { + Err(WaitWithTimeoutError::Timeout { elapsed: timeout }) + } + } + } + + /// Attempts graceful shutdown first, then force-kills as fallback. + /// + /// If `graceful_termination_timeout` is set and graceful termination is + /// supported on this platform, waits up to that duration before falling + /// back to kill. + pub async fn shutdown( + mut self, + graceful_termination_timeout: impl Into>, + kill_timeout: impl Into>, + ) -> Result { + self.trigger_graceful_termination() + .await + .map_err(ShutdownError::GracefulTermination)?; + + if Self::CAN_GRACEFULLY_TERMINATE + && let Some(graceful_termination_timeout) = graceful_termination_timeout.into() + { + let result = self.wait_with_timeout(graceful_termination_timeout).await; + + match result { + // Process exited. + Ok(exit_code) => return Ok(exit_code), + + // `wait()` errored. + Err(WaitWithTimeoutError::Wait(error)) => { + return Err(ShutdownError::GracefulTerminationWait(error)); + } + + // Timeout. + Err(WaitWithTimeoutError::Timeout { .. }) => { + // continue to kill + } + } + } + + self.kill_and_wait(kill_timeout) + .await + .map_err(ShutdownError::Kill) + } + + /// Sends a kill signal to the child process. + pub fn send_kill(&mut self) -> Result<(), std::io::Error> { + self.child.start_kill() + } + + /// Force-kills the child process and waits for it to exit. + /// + /// When `timeout` is `Some`, waiting is bounded by that duration. + pub async fn kill_and_wait( + mut self, + timeout: impl Into>, + ) -> Result { + self.send_kill().map_err(KillAndWaitError::Kill)?; + + let exit_code = if let Some(timeout) = timeout.into() { + self.wait_with_timeout(timeout) + .await + .map_err(KillAndWaitError::Wait)? + } else { + self.wait() + .await + .map_err(|err| KillAndWaitError::Wait(WaitWithTimeoutError::Wait(err)))? + }; + + Ok(exit_code) + } + + /// Returns the underlying unmanaged `tokio` child process handle. + pub async fn unmanage(self) -> tokio::process::Child { + self.child + } +} + +impl AsRef for Child { + fn as_ref(&self) -> &tokio::process::Child { + &self.child + } +} diff --git a/crates/lib/managed-process/src/platform_unix.rs b/crates/lib/managed-process/src/platform_unix.rs new file mode 100644 index 00000000..f8582de3 --- /dev/null +++ b/crates/lib/managed-process/src/platform_unix.rs @@ -0,0 +1,62 @@ +use crate::Child; + +use nix::{sys::signal::Signal, unistd::Pid}; + +/// Errors that can occur when sending a Unix signal to a child process. +#[derive(Debug, thiserror::Error)] +pub enum SendSignalError { + /// The child no longer has a process id. + #[error("child process is gone")] + ChildGone, + + /// The process id does not fit into `i32`. + #[error("pid overflows i32: {0}")] + PidOverflow(std::num::TryFromIntError), + + /// The process id was zero, which is invalid for this operation. + #[error("pid is zero: {0}")] + PidZero(std::num::TryFromIntError), + + /// System call to send the signal failed. + #[error("kill: {0}")] + Kill(nix::errno::Errno), +} + +impl Child { + /// Sends `signal` to the child process. + /// + /// Passing `None` uses platform-default behavior for `kill(2)`. + pub fn send_signal(&self, signal: impl Into>) -> Result<(), SendSignalError> { + let pid = self.child.id().ok_or(SendSignalError::ChildGone)?; + let pid = i32::try_from(pid).map_err(SendSignalError::PidOverflow)?; + let pid = std::num::NonZeroI32::try_from(pid).map_err(SendSignalError::PidZero)?; + let pid = Pid::from_raw(pid.get()); + nix::sys::signal::kill(pid, signal).map_err(SendSignalError::Kill)?; + Ok(()) + } +} + +/// Set the parent-death signal of the child process we're going to spawn. +/// This is the signal that the calling process will get when its parent dies. +/// +/// Effectively we request that the child processes we spawn get SIGTERM if we +/// die. +pub(crate) fn inject_sigterm_pdeathsig(command: &mut tokio::process::Command) { + use std::os::unix::process::CommandExt as _; + + let our_pid = std::process::id(); + + unsafe { + command.as_std_mut().pre_exec(move || { + if our_pid != std::os::unix::process::parent_id() { + // If the parent has exited - we abort the launch because it + // won't be terminated when the parent exists. + return Err(std::io::Error::other( + "parent process already exited, won't start child with a set pdeathsig", + )); + } + + nix::sys::prctl::set_pdeathsig(Signal::SIGTERM).map_err(std::io::Error::other) + }); + } +} diff --git a/crates/lib/managed-process/tests/integration.rs b/crates/lib/managed-process/tests/integration.rs new file mode 100644 index 00000000..249a2753 --- /dev/null +++ b/crates/lib/managed-process/tests/integration.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use waymark_managed_process::{self as managed_process, WaitWithTimeoutError}; + +fn spawn_sleeping_shell() -> managed_process::Child { + let mut command = tokio::process::Command::new("sh"); + command.arg("-c").arg("sleep 60"); + + managed_process::spawn(command).expect("spawn sleeping shell") +} + +#[tokio::test] +async fn wait_with_timeout_reports_timeout_for_running_child() { + let mut child = spawn_sleeping_shell(); + + let result = child.wait_with_timeout(Duration::from_millis(25)).await; + assert!(matches!( + result, + Err(WaitWithTimeoutError::Timeout { elapsed }) if elapsed == Duration::from_millis(25) + )); + + let _ = child.kill_and_wait(Some(Duration::from_secs(1))).await; +} + +#[tokio::test] +async fn kill_and_wait_terminates_running_child() { + let child = spawn_sleeping_shell(); + + let exit_status = child + .kill_and_wait(Some(Duration::from_secs(1))) + .await + .expect("kill and wait should terminate child"); + + assert!(!exit_status.success()); +} + +#[tokio::test] +async fn shutdown_falls_back_to_kill_after_graceful_timeout() { + let mut command = tokio::process::Command::new("sh"); + command + .arg("-c") + // Ignore SIGTERM so `shutdown` must use the kill fallback. + .arg("trap '' TERM; sleep 60"); + + let child = managed_process::spawn(command).expect("spawn term-trapping shell"); + + let exit_status = child + .shutdown( + Some(Duration::from_millis(25)), + Some(Duration::from_secs(1)), + ) + .await + .expect("shutdown should terminate child"); + + assert!(!exit_status.success()); +}