Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ futures-core = "0.3"
futures-util = "0.3"
http-body-util = "0.1"
mockall = "0.14"
nix = "0.31"
nonempty-collections = "1"
proc-macro2 = "1"
proptest = "1.9"
Expand Down
14 changes: 14 additions & 0 deletions crates/lib/managed-process/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "waymark-managed-process"
edition = "2024"
version.workspace = true
publish.workspace = true

[dependencies]
nix = { workspace = true, features = ["signal"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["process", "time"] }
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt", "time"] }
26 changes: 26 additions & 0 deletions crates/lib/managed-process/src/graceful_termination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::Child;

/// Error returned when graceful termination cannot be initiated.
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct GracefulTerminationError {
#[cfg(not(unix))]
inner: core::convert::Infallible,

#[cfg(unix)]
inner: crate::SendSignalError,
}

impl Child {
pub(crate) const CAN_GRACEFULLY_TERMINATE: bool = cfg!(unix);

pub(crate) async fn trigger_graceful_termination(
&self,
) -> Result<(), GracefulTerminationError> {
#[cfg(unix)]
self.send_signal(nix::sys::signal::Signal::SIGTERM)
.map_err(|error| GracefulTerminationError { inner: error })?;

Ok(())
}
}
194 changes: 194 additions & 0 deletions crates/lib/managed-process/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//! Managed wrapper around `tokio::process::Child` with graceful shutdown helpers.

#![warn(missing_docs)]

#[cfg(unix)]
mod platform_unix;

mod graceful_termination;

pub use self::graceful_termination::*;

#[cfg(unix)]
pub use self::platform_unix::*;

use std::process::Stdio;

/// A single managed child process handle.
///
/// Will kill the child process on drop.
pub struct Child {
/// The child process.
child: tokio::process::Child,
}

/// Spawns a managed child process.
///
/// The child inherits `stderr` and is configured with `kill_on_drop(true)`.
/// On Unix, the child is also configured with a parent-death signal so it
/// receives `SIGTERM` when the parent process exits.
pub fn spawn(command: impl Into<tokio::process::Command>) -> Result<Child, std::io::Error> {
let mut command = command.into();

command.stderr(Stdio::inherit()).kill_on_drop(true);

#[cfg(unix)]
inject_sigterm_pdeathsig(&mut command);

let child = command.spawn()?;

tracing::debug!(pid = child.id(), "spawned child process");

Ok(Child { child })
}

/// Errors that can occur while shutting down a managed child.
#[derive(Debug, thiserror::Error)]
pub enum ShutdownError {
/// Triggering graceful termination failed.
#[error("graceful termination: {0}")]
GracefulTermination(#[source] GracefulTerminationError),

/// Waiting after graceful termination failed.
#[error("graceful termination wait: {0}")]
GracefulTerminationWait(#[source] std::io::Error),

/// Force-kill fallback failed.
#[error("kill: {0}")]
Kill(#[source] KillAndWaitError),
}

/// Errors that can occur when force-killing a child and waiting for exit.
#[derive(Debug, thiserror::Error)]
pub enum KillAndWaitError {
/// Sending kill failed.
#[error("kill: {0}")]
Kill(#[source] std::io::Error),

/// Waiting for exit failed.
#[error("wait: {0}")]
Wait(#[source] WaitWithTimeoutError),
}

/// Errors returned by [`Child::wait_with_timeout`].
#[derive(Debug, thiserror::Error)]
pub enum WaitWithTimeoutError {
/// Waiting for process exit failed.
#[error("wait: {0}")]
Wait(#[source] std::io::Error),

/// The timeout elapsed before the process exited.
#[error("process didn't exit in time")]
Timeout {
/// Elapsed timeout duration.
elapsed: std::time::Duration,
},
}

impl Child {
/// Waits for the child process to exit.
pub async fn wait(&mut self) -> Result<std::process::ExitStatus, std::io::Error> {
self.child.wait().await
}

/// Waits for the child process to exit up to `timeout`.
///
/// Returns [`WaitWithTimeoutError::Timeout`] when the timeout expires.
pub async fn wait_with_timeout(
&mut self,
timeout: std::time::Duration,
) -> Result<std::process::ExitStatus, WaitWithTimeoutError> {
let timeout_result = tokio::time::timeout(timeout, self.wait()).await;

match timeout_result {
// Process exited.
Ok(Ok(exit_code)) => Ok(exit_code),

// `wait()` errored.
Ok(Err(error)) => Err(WaitWithTimeoutError::Wait(error)),

// Timeout.
Err(tokio::time::error::Elapsed { .. }) => {
Err(WaitWithTimeoutError::Timeout { elapsed: timeout })
}
}
}

/// Attempts graceful shutdown first, then force-kills as fallback.
///
/// If `graceful_termination_timeout` is set and graceful termination is
/// supported on this platform, waits up to that duration before falling
/// back to kill.
pub async fn shutdown(
mut self,
graceful_termination_timeout: impl Into<Option<std::time::Duration>>,
kill_timeout: impl Into<Option<std::time::Duration>>,
) -> Result<std::process::ExitStatus, ShutdownError> {
self.trigger_graceful_termination()
.await
.map_err(ShutdownError::GracefulTermination)?;

if Self::CAN_GRACEFULLY_TERMINATE
&& let Some(graceful_termination_timeout) = graceful_termination_timeout.into()
{
let result = self.wait_with_timeout(graceful_termination_timeout).await;

match result {
// Process exited.
Ok(exit_code) => return Ok(exit_code),

// `wait()` errored.
Err(WaitWithTimeoutError::Wait(error)) => {
return Err(ShutdownError::GracefulTerminationWait(error));
}

// Timeout.
Err(WaitWithTimeoutError::Timeout { .. }) => {
// continue to kill
}
}
}

self.kill_and_wait(kill_timeout)
.await
.map_err(ShutdownError::Kill)
}

/// Sends a kill signal to the child process.
pub fn send_kill(&mut self) -> Result<(), std::io::Error> {
self.child.start_kill()
}

/// Force-kills the child process and waits for it to exit.
///
/// When `timeout` is `Some`, waiting is bounded by that duration.
pub async fn kill_and_wait(
mut self,
timeout: impl Into<Option<std::time::Duration>>,
) -> Result<std::process::ExitStatus, KillAndWaitError> {
self.send_kill().map_err(KillAndWaitError::Kill)?;

let exit_code = if let Some(timeout) = timeout.into() {
self.wait_with_timeout(timeout)
.await
.map_err(KillAndWaitError::Wait)?
} else {
self.wait()
.await
.map_err(|err| KillAndWaitError::Wait(WaitWithTimeoutError::Wait(err)))?
};

Ok(exit_code)
}

/// Returns the underlying unmanaged `tokio` child process handle.
pub async fn unmanage(self) -> tokio::process::Child {
self.child
}
}

impl AsRef<tokio::process::Child> for Child {
fn as_ref(&self) -> &tokio::process::Child {
&self.child
}
}
62 changes: 62 additions & 0 deletions crates/lib/managed-process/src/platform_unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::Child;

use nix::{sys::signal::Signal, unistd::Pid};

/// Errors that can occur when sending a Unix signal to a child process.
#[derive(Debug, thiserror::Error)]
pub enum SendSignalError {
/// The child no longer has a process id.
#[error("child process is gone")]
ChildGone,

/// The process id does not fit into `i32`.
#[error("pid overflows i32: {0}")]
PidOverflow(std::num::TryFromIntError),

/// The process id was zero, which is invalid for this operation.
#[error("pid is zero: {0}")]
PidZero(std::num::TryFromIntError),

/// System call to send the signal failed.
#[error("kill: {0}")]
Kill(nix::errno::Errno),
}

impl Child {
/// Sends `signal` to the child process.
///
/// Passing `None` uses platform-default behavior for `kill(2)`.
pub fn send_signal(&self, signal: impl Into<Option<Signal>>) -> Result<(), SendSignalError> {
let pid = self.child.id().ok_or(SendSignalError::ChildGone)?;
let pid = i32::try_from(pid).map_err(SendSignalError::PidOverflow)?;
let pid = std::num::NonZeroI32::try_from(pid).map_err(SendSignalError::PidZero)?;
let pid = Pid::from_raw(pid.get());
nix::sys::signal::kill(pid, signal).map_err(SendSignalError::Kill)?;
Ok(())
}
}

/// Set the parent-death signal of the child process we're going to spawn.
/// This is the signal that the calling process will get when its parent dies.
///
/// Effectively we request that the child processes we spawn get SIGTERM if we
/// die.
pub(crate) fn inject_sigterm_pdeathsig(command: &mut tokio::process::Command) {
use std::os::unix::process::CommandExt as _;

let our_pid = std::process::id();

unsafe {
command.as_std_mut().pre_exec(move || {
if our_pid != std::os::unix::process::parent_id() {
// If the parent has exited - we abort the launch because it
// won't be terminated when the parent exists.
return Err(std::io::Error::other(
"parent process already exited, won't start child with a set pdeathsig",
));
}

nix::sys::prctl::set_pdeathsig(Signal::SIGTERM).map_err(std::io::Error::other)
});
}
}
Loading
Loading