Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
safe-write = "0.1.2"
nix = "0.29.0"
sd-notify = "0.4.5"
listenfd = "1.0"
jemallocator = "0.5.4"

# Serialization/Parsing
Expand Down
3 changes: 2 additions & 1 deletion basefiles/dstack-guest-agent.service
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[Unit]
Description=dstack Guest Agent Service
After=network.target tboot.service
Requires=dstack-guest-agent.socket
After=network.target tboot.service dstack-guest-agent.socket
Before=docker.service

[Service]
Expand Down
18 changes: 18 additions & 0 deletions basefiles/dstack-guest-agent.socket
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-FileCopyrightText: 2025 Phala Network <dstack@phala.network>
#
# SPDX-License-Identifier: Apache-2.0

# Socket activation for dstack-guest-agent.
# Provides backward compatibility for containers that mount sockets directly.
# Socket order: dstack.sock (index 0), tappd.sock (index 1)

[Unit]
Description=dstack guest agent sockets

[Socket]
ListenStream=/run/dstack.sock
ListenStream=/run/tappd.sock
SocketMode=0777

[Install]
WantedBy=sockets.target
15 changes: 0 additions & 15 deletions basefiles/dstack-socket.service

This file was deleted.

16 changes: 0 additions & 16 deletions basefiles/dstack-socket.socket

This file was deleted.

15 changes: 0 additions & 15 deletions basefiles/tappd-socket.service

This file was deleted.

16 changes: 0 additions & 16 deletions basefiles/tappd-socket.socket

This file was deleted.

1 change: 1 addition & 0 deletions guest-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ tempfile.workspace = true
rand.workspace = true
or-panic.workspace = true
cc-eventlog.workspace = true
listenfd.workspace = true
87 changes: 58 additions & 29 deletions guest-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{fs::Permissions, future::pending, os::unix::fs::PermissionsExt};
use std::{future::pending, os::unix::net::UnixListener as StdUnixListener};

use anyhow::{anyhow, Context, Result};
use clap::Parser;
Expand All @@ -16,6 +16,7 @@ use rocket::{
use rocket_vsock_listener::VsockListener;
use rpc_service::{AppState, ExternalRpcHandler, InternalRpcHandler, InternalRpcHandlerV0};
use sd_notify::{notify as sd_notify, NotifyState};
use socket_activation::{ActivatedSockets, ActivatedUnixListener};
use std::time::Duration;
use tokio::sync::oneshot;
use tracing::{error, info};
Expand All @@ -25,6 +26,7 @@ mod guest_api_service;
mod http_routes;
mod models;
mod rpc_service;
mod socket_activation;

const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const GIT_REV: &str = git_version::git_version!(
Expand Down Expand Up @@ -52,6 +54,7 @@ struct Args {
async fn run_internal_v0(
state: AppState,
figment: Figment,
activated_socket: Option<StdUnixListener>,
sock_ready_tx: oneshot::Sender<()>,
) -> Result<()> {
let rocket = rocket::custom(figment)
Expand All @@ -64,26 +67,36 @@ async fn run_internal_v0(
.ignite()
.await
.map_err(|err| anyhow!("Failed to ignite rocket: {err}"))?;
let endpoint = DefaultListener::bind_endpoint(&ignite)
.map_err(|err| anyhow!("Failed to get endpoint: {err}"))?;
let listener = DefaultListener::bind(&ignite)
.await
.map_err(|err| anyhow!("Failed to bind on {endpoint}: {err}"))?;
if let Some(path) = endpoint.unix() {
// Allow any user to connect to the socket
fs_err::set_permissions(path, Permissions::from_mode(0o777))?;

if let Some(std_listener) = activated_socket {
// Use systemd-activated socket
info!("Using systemd-activated socket for tappd.sock");
let listener = ActivatedUnixListener::new(std_listener)?;
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err: rocket::Error| anyhow!(err.to_string()))?;
} else {
// Fall back to binding our own socket
let endpoint = DefaultListener::bind_endpoint(&ignite)
.map_err(|err| anyhow!("Failed to get endpoint: {err}"))?;
let listener = DefaultListener::bind(&ignite)
.await
.map_err(|err| anyhow!("Failed to bind on {endpoint}: {err}"))?;
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err| anyhow!(err.to_string()))?;
}
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err| anyhow!(err.to_string()))?;
Ok(())
}

async fn run_internal(
state: AppState,
figment: Figment,
activated_socket: Option<StdUnixListener>,
sock_ready_tx: oneshot::Sender<()>,
) -> Result<()> {
let rocket = rocket::custom(figment)
Expand All @@ -93,20 +106,29 @@ async fn run_internal(
.ignite()
.await
.map_err(|err| anyhow!("Failed to ignite rocket: {err}"))?;
let endpoint = DefaultListener::bind_endpoint(&ignite)
.map_err(|err| anyhow!("Failed to get endpoint: {err}"))?;
let listener = DefaultListener::bind(&ignite)
.await
.map_err(|err| anyhow!("Failed to bind on {endpoint}: {err}"))?;
if let Some(path) = endpoint.unix() {
// Allow any user to connect to the socket
fs_err::set_permissions(path, Permissions::from_mode(0o777))?;

if let Some(std_listener) = activated_socket {
// Use systemd-activated socket
info!("Using systemd-activated socket for dstack.sock");
let listener = ActivatedUnixListener::new(std_listener)?;
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err: rocket::Error| anyhow!(err.to_string()))?;
} else {
// Fall back to binding our own socket
let endpoint = DefaultListener::bind_endpoint(&ignite)
.map_err(|err| anyhow!("Failed to get endpoint: {err}"))?;
let listener = DefaultListener::bind(&ignite)
.await
.map_err(|err| anyhow!("Failed to bind on {endpoint}: {err}"))?;
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err| anyhow!(err.to_string()))?;
}
sock_ready_tx.send(()).ok();
ignite
.launch_on(listener)
.await
.map_err(|err| anyhow!(err.to_string()))?;
Ok(())
}

Expand Down Expand Up @@ -219,11 +241,18 @@ async fn main() -> Result<()> {
.extract()
.context("Failed to extract bind address")?;
let guest_api_figment = figment.select("guest-api");

// Get systemd-activated sockets if available
let activated = ActivatedSockets::from_env();
if activated.any_activated() {
info!("Systemd socket activation detected");
}

let (tappd_ready_tx, tappd_ready_rx) = oneshot::channel();
let (sock_ready_tx, sock_ready_rx) = oneshot::channel();
tokio::select!(
res = run_internal_v0(state.clone(), internal_v0_figment, tappd_ready_tx) => res?,
res = run_internal(state.clone(), internal_figment, sock_ready_tx) => res?,
res = run_internal_v0(state.clone(), internal_v0_figment, activated.tappd, tappd_ready_tx) => res?,
res = run_internal(state.clone(), internal_figment, activated.dstack, sock_ready_tx) => res?,
res = run_external(state.clone(), external_figment) => res?,
res = run_guest_api(state.clone(), guest_api_figment) => res?,
_ = async {
Expand Down
80 changes: 80 additions & 0 deletions guest-agent/src/socket_activation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// SPDX-FileCopyrightText: © 2025 Phala Network <dstack@phala.network>
//
// SPDX-License-Identifier: Apache-2.0

//! Systemd socket activation support for dstack-guest-agent.
//!
//! This module provides utilities for receiving pre-created sockets from systemd
//! via the LISTEN_FDS mechanism, allowing the service to use sockets that survive
//! service restarts.

use std::{io, os::unix::net::UnixListener as StdUnixListener};

use listenfd::ListenFd;
use rocket::listener::{unix::UnixStream, Endpoint, Listener};

/// Socket indices for systemd socket activation.
/// Order matches ListenStream declarations in dstack-guest-agent.socket.
mod socket_index {
pub const DSTACK: usize = 0;
pub const TAPPD: usize = 1;
}

/// Systemd-activated sockets passed via LISTEN_FDS.
pub struct ActivatedSockets {
pub dstack: Option<StdUnixListener>,
pub tappd: Option<StdUnixListener>,
}

impl ActivatedSockets {
/// Retrieve activated sockets from systemd environment variables.
pub fn from_env() -> Self {
let mut listenfd = ListenFd::from_env();
Self {
dstack: listenfd
.take_unix_listener(socket_index::DSTACK)
.ok()
.flatten(),
tappd: listenfd
.take_unix_listener(socket_index::TAPPD)
.ok()
.flatten(),
}
}

/// Check if any sockets were activated.
pub fn any_activated(&self) -> bool {
self.dstack.is_some() || self.tappd.is_some()
}
}

/// Wrapper for systemd-activated Unix socket that implements rocket's Listener trait.
pub struct ActivatedUnixListener {
listener: tokio::net::UnixListener,
}

impl ActivatedUnixListener {
/// Create a new listener from a standard library UnixListener.
pub fn new(std_listener: StdUnixListener) -> io::Result<Self> {
std_listener.set_nonblocking(true)?;
let listener = tokio::net::UnixListener::from_std(std_listener)?;
Ok(Self { listener })
}
}

impl Listener for ActivatedUnixListener {
type Accept = UnixStream;
type Connection = UnixStream;

async fn accept(&self) -> io::Result<Self::Accept> {
Ok(self.listener.accept().await?.0)
}

async fn connect(&self, accept: Self::Accept) -> io::Result<Self::Connection> {
Ok(accept)
}

fn endpoint(&self) -> io::Result<Endpoint> {
self.listener.local_addr()?.try_into()
}
}