Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions abi/snapshot.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"abi_version": 15,
"abi_version": 16,
"channel_buffers": {
"data_offset": 72,
"data_size": 65536,
Expand Down Expand Up @@ -113,7 +113,7 @@
},
"host_adapter": {
"manifest": {
"abi_version": 15,
"abi_version": 16,
"channel_data_offset": 72,
"channel_data_size": 65536,
"channel_header_size": 72,
Expand Down Expand Up @@ -1309,6 +1309,11 @@
"name": "kernel_set_cwd",
"signature": "(i32,i32,i32) -> (i32)"
},
{
"kind": "func",
"name": "kernel_set_fd_pipe",
"signature": "(i32,i32) -> (i32)"
},
{
"kind": "func",
"name": "kernel_set_fork_exec",
Expand Down Expand Up @@ -1349,6 +1354,11 @@
"name": "kernel_set_stdin_pipe",
"signature": "(i32) -> (i32)"
},
{
"kind": "func",
"name": "kernel_set_stdio_pipe",
"signature": "(i32,i32) -> (i32)"
},
{
"kind": "func",
"name": "kernel_set_tid_address",
Expand Down
24 changes: 24 additions & 0 deletions crates/fork-instrument/src/call_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ const MAX_INDIRECT_DEPTH: u8 = 2;
pub fn reaching_closure(module: &Module, seed: FunctionId) -> HashSet<FunctionId> {
let profiles = profile_functions(module);
let table_targets = table_targets(module, &profiles);
let has_dynamic_linker_imports = module.imports.iter().any(|import| {
import.module == "env"
&& matches!(import.kind, ImportKind::Function(_))
&& matches!(
import.name.as_str(),
"__wasm_dlopen" | "__wasm_dlsym" | "__wasm_dlclose" | "__wasm_dlerror"
)
});

// Reverse direct-call graph: `callee -> set of callers`.
let mut reverse_direct: HashMap<FunctionId, HashSet<FunctionId>> = HashMap::new();
Expand Down Expand Up @@ -411,6 +419,22 @@ pub fn reaching_closure(module: &Module, seed: FunctionId) -> HashSet<FunctionId
}
}

if has_dynamic_linker_imports {
let dynamic_indirect_roots: Vec<FunctionId> = profiles
.iter()
.filter_map(|(caller, profile)| (!profile.indirect.is_empty()).then_some(*caller))
.collect();
for caller in dynamic_indirect_roots {
enqueue(
caller,
1,
&mut best_indirect_depth,
&mut result,
&mut worklist,
);
}
}

while let Some((g, indirect_depth)) = worklist.pop_front() {
// (2) Direct-reverse: who calls g directly?
if let Some(callers) = reverse_direct.get(&g) {
Expand Down
37 changes: 37 additions & 0 deletions crates/fork-instrument/tests/call_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,43 @@ fn passive_element_with_table_init_is_followed() {
);
}

#[test]
fn dynamic_linker_indirect_call_is_conservative_fork_boundary() {
// A dlopen/dlsym-capable main module can have side-module functions
// inserted into its indirect function table by the host after static
// analysis. If such a side-module function calls fork(), the main-module
// frame above the call_indirect must be serializable even though the
// side-module target is not present in any static element segment.
let wat = r#"
(module
(import "kernel" "kernel_fork" (func $fork (result i32)))
(import "env" "__wasm_dlsym" (func $dlsym (param i32 i32 i32) (result i32)))
(type $side_fn_ty (func (result i32)))
(table $t 1 funcref)
(func $dispatch_side_callback (export "dispatch_side_callback") (result i32)
i32.const 0
call_indirect $t (type $side_fn_ty))
(func $parent_frame (export "parent_frame") (result i32)
call $dispatch_side_callback)
(func $ordinary (export "ordinary") (result i32)
i32.const 7))
"#;
let found = discover(wat);
assert!(
found.iter().any(|n| n == "dispatch_side_callback"),
"dynamic-linking call_indirect sites must be instrumented as potential \
side-module fork boundaries; got {found:?}"
);
assert!(
found.iter().any(|n| n == "parent_frame"),
"direct callers above a dynamic side-module dispatch must also be saved; got {found:?}"
);
assert!(
!found.iter().any(|n| n == "ordinary"),
"unrelated dynamic-linking functions without call_indirect should stay out; got {found:?}"
);
}

#[test]
fn indirect_closure_allows_two_hops_but_does_not_cascade_forever() {
// Models trampoline-shaped runtimes without allowing unbounded
Expand Down
172 changes: 172 additions & 0 deletions crates/kernel/src/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const MAX_ENV_VARS: u32 = 65536;
const MAX_ARGV: u32 = 65536;
const MAX_PATH_LEN: usize = 1048576; // 1 MiB
const MAX_STRING_LEN: usize = 1048576; // 1 MiB
const INITIAL_EXEC_STATE_BUFFER_LEN: usize = 64 * 1024;
const MAX_EXEC_STATE_BUFFER_LEN: usize = 4 * 1024 * 1024;

// ── Writer helper ───────────────────────────────────────────────────────────

Expand Down Expand Up @@ -1176,6 +1178,7 @@ pub fn deserialize_fork_state(buf: &[u8], child_pid: u32) -> Result<Process, Err
is_session_leader: false,
state: ProcessState::Running,
exit_status: 0,
exit_signal: 0,
fd_table,
ofd_table,
lock_table: LockTable::new(),
Expand Down Expand Up @@ -1231,6 +1234,129 @@ pub fn deserialize_fork_state(buf: &[u8], child_pid: u32) -> Result<Process, Err
})
}

#[inline(never)]
fn read_fork_socket_table(r: &mut Reader<'_>) -> Result<SocketTable, Errno> {
let mut sockets = SocketTable::new();
if r.remaining() < 8 {
return Ok(sockets);
}

use crate::socket::{SocketDomain, SocketInfo, SocketState, SocketType};
let _total_slots = r.read_u32()? as usize;
let sock_count = r.read_u32()? as usize;
for _ in 0..sock_count {
let idx = r.read_u32()? as usize;
let domain = match r.read_u32()? {
0 => SocketDomain::Unix,
1 => SocketDomain::Inet,
2 => SocketDomain::Inet6,
3 => SocketDomain::Netlink,
_ => return Err(Errno::EINVAL),
};
let sock_type = match r.read_u32()? {
0 => SocketType::Stream,
1 => SocketType::Dgram,
_ => return Err(Errno::EINVAL),
};
let protocol = r.read_u32()?;
let state = match r.read_u32()? {
0 => SocketState::Unbound,
1 => SocketState::Bound,
2 => SocketState::Listening,
3 => SocketState::Connected,
4 => SocketState::Closed,
_ => return Err(Errno::EINVAL),
};
let peer_idx_raw = r.read_u32()?;
let peer_idx = if peer_idx_raw == 0xFFFFFFFF {
None
} else {
Some(peer_idx_raw as usize)
};
let recv_raw = r.read_u32()?;
let recv_buf_idx = if recv_raw == 0xFFFFFFFF {
None
} else {
Some(recv_raw as usize)
};
let send_raw = r.read_u32()?;
let send_buf_idx = if send_raw == 0xFFFFFFFF {
None
} else {
Some(send_raw as usize)
};
let shut_rd = r.read_u32()? != 0;
let shut_wr = r.read_u32()? != 0;
let host_handle_raw = r.read_u32()?;
let host_net_handle = if host_handle_raw == 0xFFFFFFFF {
None
} else {
Some(host_handle_raw as i32)
};

let opt_count = r.read_u32()? as usize;
let mut options = Vec::new();
for _ in 0..opt_count {
let level = r.read_u32()?;
let optname = r.read_u32()?;
let value = r.read_u32()?;
options.push((level, optname, value));
}

let mut bind_addr = [0u8; 4];
bind_addr.copy_from_slice(r.read_bytes(4)?);
let bind_port = r.read_u32()? as u16;
let mut peer_addr = [0u8; 4];
peer_addr.copy_from_slice(r.read_bytes(4)?);
let peer_port = r.read_u32()? as u16;

let backlog_count = r.read_u32()? as usize;
for _ in 0..backlog_count {
let _ = r.read_u32()?;
}

let mut sock = SocketInfo::new(domain, sock_type, protocol);
sock.state = state;
sock.peer_idx = peer_idx;
sock.recv_buf_idx = recv_buf_idx;
sock.send_buf_idx = send_buf_idx;
sock.shut_rd = shut_rd;
sock.shut_wr = shut_wr;
sock.host_net_handle = host_net_handle;
sock.options = options;
sock.bind_addr = bind_addr;
sock.bind_port = bind_port;
sock.peer_addr = peer_addr;
sock.peer_port = peer_port;
sock.global_pipes = r.read_u32()? != 0;

let shared_backlog_raw = r.read_u32()?;
sock.shared_backlog_idx = if shared_backlog_raw == 0xFFFFFFFF {
None
} else {
Some(shared_backlog_raw as usize)
};

if r.remaining() >= 4 {
let bind_path_len = r.read_u32()?;
if bind_path_len != 0xFFFFFFFF {
sock.bind_path = Some(r.read_bytes(bind_path_len as usize)?.to_vec());
}
}
if r.remaining() >= 4 {
let accept_wake_raw = r.read_u32()?;
sock.accept_wake_idx = if accept_wake_raw == 0xFFFFFFFF {
None
} else {
Some(accept_wake_raw)
};
}
sockets.insert_at(idx, sock);
}

Ok(sockets)
}

// ── Exec Serialize ──────────────────────────────────────────────────────────

/// Serialize the process state into a binary buffer for exec.
Expand Down Expand Up @@ -1378,6 +1504,26 @@ pub fn serialize_exec_state(proc: &Process, buf: &mut [u8]) -> Result<usize, Err
Ok(w.pos)
}

pub fn serialize_exec_state_with_growing_buffer(proc: &Process) -> Result<Vec<u8>, Errno> {
let mut len = INITIAL_EXEC_STATE_BUFFER_LEN;

loop {
let mut buf = Vec::new();
buf.resize(len, 0u8);

match serialize_exec_state(proc, &mut buf) {
Ok(written) => {
buf.truncate(written);
return Ok(buf);
}
Err(Errno::ENOMEM) if len < MAX_EXEC_STATE_BUFFER_LEN => {
len = len.saturating_mul(2).min(MAX_EXEC_STATE_BUFFER_LEN);
}
Err(err) => return Err(err),
}
}
}

// ── Exec Deserialize ────────────────────────────────────────────────────────

/// Deserialize process state from an exec buffer.
Expand Down Expand Up @@ -1583,6 +1729,7 @@ pub fn deserialize_exec_state(buf: &[u8], pid: u32) -> Result<Process, Errno> {
is_session_leader,
state: ProcessState::Running,
exit_status: 0,
exit_signal: 0,
fd_table,
ofd_table,
lock_table: LockTable::new(),
Expand Down Expand Up @@ -1806,6 +1953,31 @@ mod tests {
assert_eq!(restored.signals.pending, 0);
}

#[test]
fn test_exec_state_grows_for_large_environment() {
let mut proc = Process::new(1);
proc.environ.clear();
for i in 0..1200 {
let mut var = b"KDE_LONG_ENV_".to_vec();
var.extend_from_slice(i.to_string().as_bytes());
var.push(b'=');
var.extend(core::iter::repeat(b'x').take(80));
proc.environ.push(var);
}

let mut old_limit_buf = vec![0u8; 64 * 1024];
assert_eq!(
serialize_exec_state(&proc, &mut old_limit_buf),
Err(Errno::ENOMEM),
);

let serialized = serialize_exec_state_with_growing_buffer(&proc).unwrap();
assert!(serialized.len() > 64 * 1024);

let restored = deserialize_exec_state(&serialized, 1).unwrap();
assert_eq!(restored.environ, proc.environ);
}

#[test]
fn test_exec_state_filters_cloexec_fds() {
use wasm_posix_shared::fd_flags::FD_CLOEXEC;
Expand Down
23 changes: 23 additions & 0 deletions crates/kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ pub fn current_time_secs() -> i64 {
}
}

// ---------------------------------------------------------------------------
// Kernel mode flag
// ---------------------------------------------------------------------------

use core::sync::atomic::{AtomicU32, Ordering};

/// Kernel operating mode.
///
/// - Mode 0 (default): Traditional per-process kernel. Blocking syscalls spin
/// or delegate to the host.
/// - Mode 1: Centralized kernel. Blocking syscalls return EAGAIN immediately
/// so the host JS event loop can handle waiting asynchronously.
static KERNEL_MODE: AtomicU32 = AtomicU32::new(0);

#[inline]
pub fn is_centralized_mode() -> bool {
KERNEL_MODE.load(Ordering::Relaxed) != 0
}

pub fn set_kernel_mode(mode: u32) {
KERNEL_MODE.store(mode, Ordering::Relaxed);
}

#[cfg(any(target_arch = "wasm32", target_arch = "wasm64"))]
mod wasm {
use core::alloc::{GlobalAlloc, Layout};
Expand Down
Loading
Loading