From be015595ee354daeb526dfce3581ef6041176351 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 26 Apr 2026 03:10:03 +0200 Subject: [PATCH 1/2] migration: phase 1 restore local core ingest --- Cargo.lock | 5 + syva-core/Cargo.toml | 7 + syva-core/src/container_id.rs | 29 ++ syva-core/src/main.rs | 301 +++++++++--- syva-core/src/rpc/mod.rs | 449 ++++++++++++++++++ syva-core/src/zone.rs | 2 +- syva-core/tests/common/mod.rs | 67 +++ .../tests/local_mode_register_then_list.rs | 40 ++ syva-core/tests/local_mode_starts_server.rs | 26 + 9 files changed, 863 insertions(+), 63 deletions(-) create mode 100644 syva-core/src/container_id.rs create mode 100644 syva-core/src/rpc/mod.rs create mode 100644 syva-core/tests/common/mod.rs create mode 100644 syva-core/tests/local_mode_register_then_list.rs create mode 100644 syva-core/tests/local_mode_starts_server.rs diff --git a/Cargo.lock b/Cargo.lock index 7e4efb3..8ec8626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3081,14 +3081,19 @@ dependencies = [ "axum", "aya", "clap", + "hyper-util", "libc", "serde", "serde_json", "syva-cp-client", "syva-ebpf-common", "syva-proto", + "tempfile", "tokio", + "tokio-stream", "tokio-util", + "tonic", + "tower 0.4.13", "tracing", "tracing-subscriber", "uuid", diff --git a/syva-core/Cargo.toml b/syva-core/Cargo.toml index 0301c27..52f3210 100644 --- a/syva-core/Cargo.toml +++ b/syva-core/Cargo.toml @@ -13,7 +13,9 @@ syva-cp-client = { path = "../syva-cp-client" } syva-ebpf-common = { path = "../syva-ebpf-common", features = ["userspace"] } aya = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true, features = ["net"] } tokio-util = { workspace = true } +tonic = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } anyhow = { workspace = true } @@ -23,3 +25,8 @@ serde_json = { workspace = true } clap = { workspace = true } libc = "0.2" uuid = { version = "1", features = ["v4", "serde"] } + +[dev-dependencies] +hyper-util = { version = "0.1", features = ["tokio"] } +tempfile = "3" +tower = { version = "0.4", features = ["util"] } diff --git a/syva-core/src/container_id.rs b/syva-core/src/container_id.rs new file mode 100644 index 0000000..ff4419e --- /dev/null +++ b/syva-core/src/container_id.rs @@ -0,0 +1,29 @@ +/// Validate that a container ID is safe to accept from local adapters. +pub(crate) fn is_valid_container_id(id: &str) -> bool { + !id.is_empty() + && id.len() <= 128 + && id + .bytes() + .all(|b| b.is_ascii_hexdigit() || b == b'-' || b == b'_') +} + +#[cfg(test)] +mod tests { + use super::is_valid_container_id; + + #[test] + fn accepts_containerd_like_ids() { + assert!(is_valid_container_id("abc123")); + assert!(is_valid_container_id("abc-def_123")); + assert!(is_valid_container_id(&"a".repeat(128))); + } + + #[test] + fn rejects_path_or_shell_shaped_ids() { + assert!(!is_valid_container_id("")); + assert!(!is_valid_container_id("../../../etc/passwd")); + assert!(!is_valid_container_id("abc/def")); + assert!(!is_valid_container_id("abc def")); + assert!(!is_valid_container_id(&"a".repeat(129))); + } +} diff --git a/syva-core/src/main.rs b/syva-core/src/main.rs index 1622cc9..b617559 100644 --- a/syva-core/src/main.rs +++ b/syva-core/src/main.rs @@ -8,22 +8,30 @@ //! syva-core status # Show enforcement counters //! syva-core events --follow # Stream deny events -mod cp_reconcile; mod btf; +mod container_id; +mod cp_reconcile; mod ebpf; mod events; mod health; mod ingest; +mod rpc; pub mod types; mod zone; +use std::ffi::CString; +use std::os::unix::ffi::OsStrExt; +use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use clap::{Parser, Subcommand}; use syva_cp_client::CpClientConfig; +use syva_proto::syva_core::syva_core_server::SyvaCoreServer; +use tokio::net::UnixListener; use tokio::sync::{Mutex, RwLock}; +use tokio_stream::wrappers::UnixListenerStream; use tracing_subscriber::EnvFilter; #[derive(Parser)] @@ -40,9 +48,17 @@ struct Cli { #[arg(long, default_value = "9091")] health_port: u16, - /// syva-cp endpoint. Required. + /// Policy ingestion source. + #[arg(long, default_value = "cp")] + policy_source: PolicySource, + + /// Local syva.core.v1 Unix socket path. + #[arg(long, default_value = "/run/syva/syva-core.sock")] + socket_path: PathBuf, + + /// syva-cp endpoint. Required when --policy-source=cp. #[arg(long, env = "SYVA_CP_ENDPOINT")] - cp_endpoint: String, + cp_endpoint: Option, /// Hostname to report to syva-cp. Defaults to the system hostname. #[arg(long, env = "SYVA_NODE_NAME")] @@ -92,6 +108,12 @@ enum Commands { }, } +#[derive(Clone, Copy, Debug, clap::ValueEnum)] +enum PolicySource { + Cp, + Local, +} + #[derive(Clone, Debug, clap::ValueEnum)] enum OutputFormat { Text, @@ -101,9 +123,7 @@ enum OutputFormat { #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::from_default_env().add_directive("syva_core=info".parse()?), - ) + .with_env_filter(EnvFilter::from_default_env().add_directive("syva_core=info".parse()?)) .init(); let cli = Cli::parse(); @@ -118,12 +138,11 @@ async fn main() -> anyhow::Result<()> { /// Main enforcement engine loop. async fn cmd_run(config: Cli) -> anyhow::Result<()> { tracing::info!("syva-core starting"); + let start_time = Instant::now(); // Health state — shared with the HTTP server. Starts as unhealthy // (not attached, zero zones) and transitions as startup progresses. - let health_state = health::SharedHealth::new(RwLock::new( - health::HealthState::new(), - )); + let health_state = health::SharedHealth::new(RwLock::new(health::HealthState::new())); health::spawn_health_server(config.health_port, health_state.clone()).await?; // Load eBPF programs (but do NOT attach — no enforcement yet). @@ -153,12 +172,26 @@ async fn cmd_run(config: Cli) -> anyhow::Result<()> { let registry = Arc::new(RwLock::new(zone::ZoneRegistry::new())); let ebpf = Arc::new(Mutex::new(mgr)); + let local_server_task = if matches!(config.policy_source, PolicySource::Local) { + Some( + spawn_local_core_server( + config.socket_path.clone(), + registry.clone(), + ebpf.clone(), + health_state.clone(), + start_time, + cancel.clone(), + ) + .await?, + ) + } else { + None + }; + tracing::info!("startup complete — enforcement active"); // Shutdown on SIGINT (ctrl-c) or SIGTERM (Kubernetes pod termination). - let mut sigterm = tokio::signal::unix::signal( - tokio::signal::unix::SignalKind::terminate(), - )?; + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; // Periodic error monitoring task. let monitor_ebpf = ebpf.clone(); @@ -211,56 +244,184 @@ async fn cmd_run(config: Cli) -> anyhow::Result<()> { } }); - let node_name = config - .node_name - .clone() - .or_else(system_hostname) - .unwrap_or_else(|| "unknown".to_string()); - let cp_config = CpClientConfig { - endpoint: config.cp_endpoint.clone(), - node_name, - cluster_id: config.cluster_id.clone(), - fingerprint: read_fingerprint(&config.fingerprint_path), - labels: parse_labels(&config.node_labels), - node_id_path: config.node_id_path.clone(), - heartbeat_interval: Duration::from_secs(config.heartbeat_secs), - ..Default::default() - }; + match config.policy_source { + PolicySource::Cp => { + let cp_endpoint = config.cp_endpoint.clone().ok_or_else(|| { + anyhow::anyhow!( + "--cp-endpoint or SYVA_CP_ENDPOINT is required when --policy-source=cp" + ) + })?; + let node_name = config + .node_name + .clone() + .or_else(system_hostname) + .unwrap_or_else(|| "unknown".to_string()); + let cp_config = CpClientConfig { + endpoint: cp_endpoint, + node_name, + cluster_id: config.cluster_id.clone(), + fingerprint: read_fingerprint(&config.fingerprint_path), + labels: parse_labels(&config.node_labels), + node_id_path: config.node_id_path.clone(), + heartbeat_interval: Duration::from_secs(config.heartbeat_secs), + ..Default::default() + }; + + let (cp, registration) = connect_and_register_with_retry(cp_config).await; + tracing::info!(node_id = %registration.node_id, "registered with syva-cp"); + + let _heartbeat = cp.spawn_heartbeat_loop(); + let reconciler = cp_reconcile::Reconciler::new( + cp, + registry.clone(), + ebpf.clone(), + health_state.clone(), + ); + let mut reconcile_task = tokio::spawn(async move { + reconciler.run().await; + }); - let (cp, registration) = connect_and_register_with_retry(cp_config).await; - tracing::info!(node_id = %registration.node_id, "registered with syva-cp"); - - let _heartbeat = cp.spawn_heartbeat_loop(); - let reconciler = cp_reconcile::Reconciler::new( - cp, - registry.clone(), - ebpf.clone(), - health_state.clone(), - ); - let mut reconcile_task = tokio::spawn(async move { - reconciler.run().await; - }); + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("received SIGINT; shutting down"); + } + _ = sigterm.recv() => { + tracing::info!("received SIGTERM; shutting down"); + } + _ = &mut reconcile_task => { + tracing::warn!("reconcile loop exited"); + } + } - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::info!("received SIGINT — shutting down"); + reconcile_task.abort(); } - _ = sigterm.recv() => { - tracing::info!("received SIGTERM — shutting down"); - } - _ = &mut reconcile_task => { - tracing::warn!("reconcile loop exited"); + PolicySource::Local => { + let mut local_server_task = local_server_task + .ok_or_else(|| anyhow::anyhow!("local server task was not started"))?; + let mut local_server_exited = false; + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("received SIGINT; shutting down"); + } + _ = sigterm.recv() => { + tracing::info!("received SIGTERM; shutting down"); + } + result = &mut local_server_task => { + local_server_exited = true; + match result { + Ok(Ok(())) => tracing::info!("local gRPC server exited"), + Ok(Err(error)) => tracing::warn!(%error, "local gRPC server exited with error"), + Err(error) => tracing::warn!(%error, "local gRPC server task failed"), + } + } + } + + cancel.cancel(); + if !local_server_exited { + match tokio::time::timeout(Duration::from_secs(10), local_server_task).await { + Ok(Ok(Ok(()))) => {} + Ok(Ok(Err(error))) => { + tracing::warn!(%error, "local gRPC server shutdown failed") + } + Ok(Err(error)) => { + tracing::warn!(%error, "local gRPC server task failed during shutdown") + } + Err(_) => tracing::warn!("timed out waiting for local gRPC server shutdown"), + } + } } } cancel.cancel(); - reconcile_task.abort(); // Drop ebpf manager (cleans up BPF pins). drop(ebpf); tracing::info!("syva-core stopped"); Ok(()) } +async fn spawn_local_core_server( + socket_path: PathBuf, + registry: Arc>, + ebpf: Arc>, + health: health::SharedHealth, + start_time: Instant, + cancel: tokio_util::sync::CancellationToken, +) -> anyhow::Result>> { + if socket_path.exists() { + anyhow::bail!( + "refusing to replace existing syva-core socket at {}", + socket_path.display() + ); + } + + if let Some(parent) = socket_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let listener = UnixListener::bind(&socket_path)?; + configure_socket_permissions(&socket_path)?; + + let service = rpc::SyvaCoreService { + registry, + ebpf, + health, + start_time, + }; + let incoming = UnixListenerStream::new(listener); + let shutdown = cancel.cancelled_owned(); + let display_path = socket_path.display().to_string(); + + let task = tokio::spawn(async move { + tracing::info!(socket = display_path, "local syva.core.v1 server listening"); + tonic::transport::Server::builder() + .add_service(SyvaCoreServer::new(service)) + .serve_with_incoming_shutdown(incoming, shutdown) + .await + .map_err(|error| anyhow::anyhow!("local gRPC server failed: {error}")) + }); + + Ok(task) +} + +fn configure_socket_permissions(socket_path: &std::path::Path) -> anyhow::Result<()> { + let permissions = std::fs::Permissions::from_mode(0o660); + std::fs::set_permissions(socket_path, permissions)?; + + let gid = syva_group_gid().ok_or_else(|| { + anyhow::anyhow!( + "group 'syva' is required for {} ownership", + socket_path.display() + ) + })?; + let c_path = CString::new(socket_path.as_os_str().as_bytes())?; + let result = unsafe { libc::chown(c_path.as_ptr(), 0, gid) }; + if result != 0 { + let error = std::io::Error::last_os_error(); + anyhow::bail!( + "failed to chown {} to root:syva: {error}", + socket_path.display() + ); + } + + Ok(()) +} + +fn syva_group_gid() -> Option { + let groups = std::fs::read_to_string("/etc/group").ok()?; + groups.lines().find_map(|line| { + let mut parts = line.split(':'); + let name = parts.next()?; + let _password = parts.next()?; + let gid = parts.next()?; + if name == "syva" { + gid.parse().ok() + } else { + None + } + }) +} + async fn connect_and_register_with_retry( config: CpClientConfig, ) -> (syva_cp_client::CpClient, syva_cp_client::NodeRegistration) { @@ -362,9 +523,8 @@ async fn cmd_status() -> anyhow::Result<()> { // pinned data in the matching variant before converting. let map_data = aya::maps::MapData::from_pin(&counter_path) .map_err(|e| anyhow::anyhow!("failed to open pinned counters: {e}"))?; - match PerCpuArray::<_, EnforcementCounters>::try_from( - aya::maps::Map::PerCpuArray(map_data), - ) { + match PerCpuArray::<_, EnforcementCounters>::try_from(aya::maps::Map::PerCpuArray(map_data)) + { Ok(map) => { println!(" hooks:"); let mut total_errors: u64 = 0; @@ -373,7 +533,12 @@ async fn cmd_status() -> anyhow::Result<()> { for (idx, hook) in events::HOOK_NAMES.iter().enumerate() { match map.get(&(idx as u32), 0) { Ok(per_cpu) => { - let mut total = EnforcementCounters { allow: 0, deny: 0, error: 0, lost: 0 }; + let mut total = EnforcementCounters { + allow: 0, + deny: 0, + error: 0, + lost: 0, + }; for cpu_val in per_cpu.iter() { total.allow += cpu_val.allow; total.deny += cpu_val.deny; @@ -382,13 +547,19 @@ async fn cmd_status() -> anyhow::Result<()> { } total_errors += total.error; total_lost += total.lost; - let flag = if total.error > 0 || total.lost > 0 { " !" } else { "" }; + let flag = if total.error > 0 || total.lost > 0 { + " !" + } else { + "" + }; println!( " {:<16} allow={:<8} deny={:<8} error={:<6} lost={}{}", hook, total.allow, total.deny, total.error, total.lost, flag ); } - Err(_) => { had_read_error = true; } + Err(_) => { + had_read_error = true; + } } } @@ -397,10 +568,15 @@ async fn cmd_status() -> anyhow::Result<()> { println!(" counters: some reads failed"); } else if total_errors > 0 { println!(); - println!(" WARNING: {} total enforcement errors detected.", total_errors); + println!( + " WARNING: {} total enforcement errors detected.", + total_errors + ); println!(" Errors cause fail-open behavior -- operations are ALLOWED"); println!(" when kernel struct reads fail. This may indicate wrong"); - println!(" kernel struct offsets. Check BTF availability and restart syva-core."); + println!( + " kernel struct offsets. Check BTF availability and restart syva-core." + ); } // Suppress unused variable warning for total_lost (used for flag above). @@ -498,16 +674,17 @@ async fn cmd_events(follow: bool, format: OutputFormat) -> anyhow::Result<()> { /// Drop capabilities that are no longer needed after BPF programs are loaded /// and maps are populated. BPF map operations use already-open file descriptors. fn drop_unnecessary_capabilities() { - const CAPS_TO_DROP: &[(libc::c_int, &str)] = &[ - (21, "CAP_SYS_ADMIN"), - ]; + const CAPS_TO_DROP: &[(libc::c_int, &str)] = &[(21, "CAP_SYS_ADMIN")]; for &(cap, name) in CAPS_TO_DROP { let ret = unsafe { libc::prctl(libc::PR_CAPBSET_DROP, cap, 0, 0, 0) }; if ret == 0 { tracing::info!(capability = name, "dropped capability"); } else { - tracing::debug!(capability = name, "failed to drop capability (may not be in bounding set)"); + tracing::debug!( + capability = name, + "failed to drop capability (may not be in bounding set)" + ); } } } diff --git a/syva-core/src/rpc/mod.rs b/syva-core/src/rpc/mod.rs new file mode 100644 index 0000000..fdecbc3 --- /dev/null +++ b/syva-core/src/rpc/mod.rs @@ -0,0 +1,449 @@ +//! gRPC service implementation for syva-core. + +use std::sync::Arc; +use std::time::Instant; + +use syva_ebpf_common::{EnforcementEvent, DECISION_DENY}; +use syva_proto::syva_core::syva_core_server::SyvaCore; +use syva_proto::syva_core::{ + AllowCommRequest, AllowCommResponse, AttachContainerRequest, AttachContainerResponse, CommPair, + DenyCommRequest, DenyCommResponse, DenyEvent, DetachContainerRequest, DetachContainerResponse, + HookStatus, ListCommsRequest, ListCommsResponse, ListZonesRequest, ListZonesResponse, + RegisterHostPathRequest, RegisterHostPathResponse, RegisterZoneRequest, RegisterZoneResponse, + RemoveZoneRequest, RemoveZoneResponse, StatusRequest, StatusResponse, WatchEventsRequest, + ZoneSummary, +}; +use tokio::sync::{Mutex, RwLock}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +use crate::container_id::is_valid_container_id; +use crate::ebpf::EnforceEbpf; +use crate::events::HOOK_NAMES; +use crate::health::SharedHealth; +use crate::ingest::{self, CoreZonePolicyInput}; +use crate::types::ZoneType; +use crate::zone::{ZoneRegistry, ZoneState, ZoneTransition}; + +pub(crate) struct SyvaCoreService { + pub(crate) registry: Arc>, + pub(crate) ebpf: Arc>, + pub(crate) health: SharedHealth, + pub(crate) start_time: Instant, +} + +#[tonic::async_trait] +impl SyvaCore for SyvaCoreService { + async fn register_zone( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.zone_name.is_empty() { + return Err(Status::invalid_argument("zone_name is required")); + } + + let policy = req.policy.map(|proto_policy| CoreZonePolicyInput { + host_paths: proto_policy.host_paths, + allowed_zones: proto_policy.allowed_zones, + allow_ptrace: proto_policy.allow_ptrace, + zone_type: match proto_policy.zone_type { + 1 => ZoneType::Privileged, + _ => ZoneType::NonGlobal, + }, + }); + + let zone_id = ingest::register_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &req.zone_name, + policy, + ) + .await + .map_err(|error| Status::internal(format!("failed to register zone: {error}")))?; + + tracing::info!(zone = req.zone_name, zone_id, "zone registered via gRPC"); + Ok(Response::new(RegisterZoneResponse { zone_id })) + } + + async fn remove_zone( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.zone_name.is_empty() { + return Err(Status::invalid_argument("zone_name is required")); + } + + let result = ingest::remove_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &req.zone_name, + req.drain, + ) + .await + .map_err(|error| Status::not_found(format!("{error}")))?; + + Ok(Response::new(RemoveZoneResponse { + ok: result.ok, + message: result.message, + })) + } + + async fn list_zones( + &self, + _request: Request, + ) -> Result, Status> { + let registry = self.registry.read().await; + let zones = registry + .zones_summary() + .map(|(name, zone_id, state, refcount)| ZoneSummary { + name: name.to_string(), + zone_id, + state: match state { + ZoneState::Pending => "pending", + ZoneState::Active => "active", + ZoneState::Draining => "draining", + } + .to_string(), + containers_active: refcount as u32, + }) + .collect(); + Ok(Response::new(ListZonesResponse { zones })) + } + + async fn attach_container( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + if !is_valid_container_id(&req.container_id) { + return Ok(Response::new(AttachContainerResponse { + ok: false, + message: "invalid container_id: must be non-empty, max 128 chars, hex/dash/underscore only".to_string(), + })); + } + if req.zone_name.is_empty() { + return Err(Status::invalid_argument("zone_name is required")); + } + if req.cgroup_id == 0 { + return Err(Status::invalid_argument("cgroup_id must be non-zero")); + } + + let mut registry = self.registry.write().await; + let zone_id = match registry.add_container(&req.container_id, &req.zone_name, req.cgroup_id) + { + Ok(id) => id, + Err(error) => { + return Ok(Response::new(AttachContainerResponse { + ok: false, + message: format!("{error}"), + })); + } + }; + + let mut ebpf = self.ebpf.lock().await; + if let Err(error) = ebpf.add_zone_member(req.cgroup_id, zone_id, ZoneType::NonGlobal) { + registry.remove_container(&req.container_id, None); + return Err(Status::internal(format!( + "BPF add_zone_member failed: {error}" + ))); + } + + let mut health = self.health.write().await; + health.containers_active = registry.container_count(); + + tracing::info!( + container = req.container_id, + zone = req.zone_name, + zone_id, + cgroup_id = req.cgroup_id, + "container attached via gRPC" + ); + + Ok(Response::new(AttachContainerResponse { + ok: true, + message: String::new(), + })) + } + + async fn detach_container( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.container_id.is_empty() { + return Err(Status::invalid_argument("container_id is required")); + } + + let mut registry = self.registry.write().await; + if let Some((zone_id, cgroup_id, transition)) = + registry.remove_container(&req.container_id, None) + { + let mut ebpf = self.ebpf.lock().await; + if let Err(error) = ebpf.remove_zone_member(cgroup_id) { + tracing::warn!(cgroup_id, %error, "failed to remove zone member from BPF map"); + } + + match transition { + ZoneTransition::DrainingComplete => { + tracing::info!(zone_id, "draining zone emptied; cleaning up BPF maps"); + let _ = ebpf.remove_zone_policy(zone_id); + let _ = ebpf.remove_zone_comms(zone_id); + let _ = ebpf.remove_zone_inodes(zone_id); + if let Err(error) = registry.unregister_zone_by_id(zone_id) { + tracing::warn!(zone_id, %error, "failed to unregister drained zone"); + } + } + ZoneTransition::WentToPending => { + tracing::info!(zone_id, "zone has no active containers"); + } + ZoneTransition::StillActive => {} + } + + let mut health = self.health.write().await; + health.containers_active = registry.container_count(); + health.zones_loaded = registry.zone_count(); + + tracing::info!(container = req.container_id, "container detached via gRPC"); + } + + Ok(Response::new(DetachContainerResponse { ok: true })) + } + + async fn allow_comm( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.zone_a.is_empty() || req.zone_b.is_empty() { + return Err(Status::invalid_argument( + "both zone_a and zone_b are required", + )); + } + + ingest::allow_comm_local(&self.registry, &self.ebpf, &req.zone_a, &req.zone_b) + .await + .map_err(|error| Status::internal(format!("failed to set allowed comms: {error}")))?; + + tracing::info!( + zone_a = req.zone_a, + zone_b = req.zone_b, + "cross-zone comm allowed via gRPC" + ); + Ok(Response::new(AllowCommResponse { ok: true })) + } + + async fn deny_comm( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.zone_a.is_empty() || req.zone_b.is_empty() { + return Err(Status::invalid_argument( + "both zone_a and zone_b are required", + )); + } + + ingest::deny_comm_local(&self.registry, &self.ebpf, &req.zone_a, &req.zone_b) + .await + .map_err(|error| { + Status::internal(format!( + "failed to remove comms between '{}' and '{}': {error}", + req.zone_a, req.zone_b + )) + })?; + + tracing::info!( + zone_a = req.zone_a, + zone_b = req.zone_b, + "cross-zone comm denied via gRPC" + ); + Ok(Response::new(DenyCommResponse { ok: true })) + } + + async fn list_comms( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let filter = if req.zone_name.is_empty() { + None + } else { + Some(req.zone_name.as_str()) + }; + + let registry = self.registry.read().await; + if let Some(zone) = filter { + if registry.zone_id(zone).is_none() { + return Err(Status::not_found(format!("zone '{zone}' not registered"))); + } + } + + let pairs = registry + .list_allowed_comms(filter) + .map(|(zone_a, zone_b)| CommPair { + zone_a: zone_a.to_string(), + zone_b: zone_b.to_string(), + }) + .collect(); + Ok(Response::new(ListCommsResponse { pairs })) + } + + async fn register_host_path( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + if req.zone_name.is_empty() { + return Err(Status::invalid_argument("zone_name is required")); + } + if req.path.is_empty() { + return Err(Status::invalid_argument("path is required")); + } + + let registry = self.registry.read().await; + let zone_id = registry + .zone_id(&req.zone_name) + .ok_or_else(|| Status::not_found(format!("zone '{}' not registered", req.zone_name)))?; + drop(registry); + + let mut ebpf = self.ebpf.lock().await; + let count = if req.recursive { + ebpf.populate_inode_zone_map(zone_id, std::slice::from_ref(&req.path)) + .map_err(|error| { + Status::internal(format!("failed to populate inode map: {error}")) + })? + } else { + ebpf.register_single_inode(zone_id, &req.path) + .map_err(|error| Status::internal(format!("failed to register inode: {error}")))? + }; + + tracing::info!( + zone = req.zone_name, + path = req.path, + inodes = count, + "host path registered via gRPC" + ); + + Ok(Response::new(RegisterHostPathResponse { + inodes_registered: count as u32, + })) + } + + async fn status( + &self, + _request: Request, + ) -> Result, Status> { + let health = self.health.read().await; + let registry = self.registry.read().await; + + let mut hooks = Vec::new(); + let ebpf = self.ebpf.lock().await; + match ebpf.read_counters() { + Ok(counters) => { + for (idx, (_, totals)) in counters.iter().enumerate() { + hooks.push(HookStatus { + hook: HOOK_NAMES + .get(idx) + .copied() + .unwrap_or("unknown") + .to_string(), + allow: totals.allow, + deny: totals.deny, + error: totals.error, + lost: totals.lost, + }); + } + } + Err(error) => { + tracing::debug!(%error, "failed to read counters for status RPC"); + } + } + + Ok(Response::new(StatusResponse { + attached: health.attached, + zones_active: registry.zone_count() as u32, + containers_active: registry.container_count() as u32, + uptime_secs: self.start_time.elapsed().as_secs(), + hooks, + max_zones: syva_ebpf_common::MAX_ZONES, + })) + } + + type WatchEventsStream = ReceiverStream>; + + async fn watch_events( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let (tx, rx) = tokio::sync::mpsc::channel(256); + + let mut ebpf = self.ebpf.lock().await; + let ring_buf = ebpf + .take_event_ring_buf() + .ok_or_else(|| Status::unavailable("event ring buffer already taken"))?; + drop(ebpf); + + tokio::spawn(async move { + let mut ring_buf = ring_buf; + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); + + loop { + interval.tick().await; + + let events: Vec = tokio::task::block_in_place(|| { + let mut out = Vec::new(); + while let Some(item) = ring_buf.next() { + if item.len() < std::mem::size_of::() { + continue; + } + let event = unsafe { + std::ptr::read_unaligned(item.as_ptr() as *const EnforcementEvent) + }; + out.push(event); + if out.len() >= 1000 { + break; + } + } + out + }); + + let had_events = !events.is_empty(); + for event in events { + if event.decision != DECISION_DENY { + continue; + } + let deny_event = DenyEvent { + timestamp_ns: event.timestamp_ns, + hook: HOOK_NAMES + .get(event.hook as usize) + .copied() + .unwrap_or("unknown") + .to_string(), + zone_id: event.caller_zone, + target_zone_id: event.target_zone, + pid: event.pid, + comm: String::new(), + inode: 0, + context: event.context.to_string(), + }; + + if tx.send(Ok(deny_event)).await.is_err() { + return; + } + } + + if !req.follow && !had_events { + return; + } + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} diff --git a/syva-core/src/zone.rs b/syva-core/src/zone.rs index 97e7e73..dbbe3a1 100644 --- a/syva-core/src/zone.rs +++ b/syva-core/src/zone.rs @@ -254,7 +254,7 @@ impl ZoneRegistry { } /// All registered zone names and their IDs. - #[cfg_attr(not(test), allow(dead_code))] + #[allow(dead_code)] pub fn all_zones(&self) -> impl Iterator { self.zones.iter().map(|(name, entry)| (name.as_str(), entry.zone_id)) } diff --git a/syva-core/tests/common/mod.rs b/syva-core/tests/common/mod.rs new file mode 100644 index 0000000..79a9ee9 --- /dev/null +++ b/syva-core/tests/common/mod.rs @@ -0,0 +1,67 @@ +use std::process::{Child, Command, Stdio}; +use std::time::{Duration, Instant}; + +use syva_proto::syva_core::syva_core_client::SyvaCoreClient; +use tonic::transport::Channel; + +pub(crate) struct CoreProcess { + child: Child, +} + +impl Drop for CoreProcess { + fn drop(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +pub(crate) fn spawn_core(socket_path: &std::path::Path) -> anyhow::Result { + let bin = std::env::var("CARGO_BIN_EXE_syva-core") + .unwrap_or_else(|_| "target/debug/syva-core".to_string()); + let child = Command::new(bin) + .arg("--policy-source") + .arg("local") + .arg("--socket-path") + .arg(socket_path) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn()?; + Ok(CoreProcess { child }) +} + +pub(crate) async fn connect( + socket_path: &std::path::Path, +) -> anyhow::Result> { + let path = socket_path.to_path_buf(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| { + let path = path.clone(); + async move { + let stream = tokio::net::UnixStream::connect(path).await?; + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream)) + } + })) + .await?; + Ok(SyvaCoreClient::new(channel)) +} + +pub(crate) async fn wait_for_core( + socket_path: &std::path::Path, +) -> anyhow::Result> { + let deadline = Instant::now() + Duration::from_secs(20); + let mut last_error = None; + while Instant::now() < deadline { + match connect(socket_path).await { + Ok(client) => return Ok(client), + Err(error) => { + last_error = Some(error); + tokio::time::sleep(Duration::from_millis(250)).await; + } + } + } + + match last_error { + Some(error) => Err(error), + None => anyhow::bail!("timed out waiting for syva-core"), + } +} diff --git a/syva-core/tests/local_mode_register_then_list.rs b/syva-core/tests/local_mode_register_then_list.rs new file mode 100644 index 0000000..cafd909 --- /dev/null +++ b/syva-core/tests/local_mode_register_then_list.rs @@ -0,0 +1,40 @@ +//! Manual integration test for local-mode zone registration. +//! +//! Run on Linux as root with a `syva` group present: +//! +//! ```text +//! sudo -E cargo test -p syva-core --test local_mode_register_then_list -- --ignored --nocapture +//! ``` + +mod common; + +use syva_proto::syva_core::{ListZonesRequest, RegisterZoneRequest, ZonePolicy}; + +#[tokio::test] +#[ignore = "requires Linux, root privileges, BPF LSM support, and group 'syva'"] +async fn local_mode_register_then_list() -> anyhow::Result<()> { + let dir = tempfile::tempdir()?; + let socket_path = dir.path().join("syva-core.sock"); + let _core = common::spawn_core(&socket_path)?; + + let mut client = common::wait_for_core(&socket_path).await?; + client + .register_zone(RegisterZoneRequest { + zone_name: "phase-one-test".to_string(), + policy: Some(ZonePolicy { + host_paths: Vec::new(), + allowed_zones: Vec::new(), + allow_ptrace: false, + zone_type: 0, + }), + }) + .await?; + + let zones = client + .list_zones(ListZonesRequest {}) + .await? + .into_inner() + .zones; + assert!(zones.iter().any(|zone| zone.name == "phase-one-test")); + Ok(()) +} diff --git a/syva-core/tests/local_mode_starts_server.rs b/syva-core/tests/local_mode_starts_server.rs new file mode 100644 index 0000000..2c32225 --- /dev/null +++ b/syva-core/tests/local_mode_starts_server.rs @@ -0,0 +1,26 @@ +//! Manual integration test for local-mode startup. +//! +//! Run on Linux as root with a `syva` group present: +//! +//! ```text +//! sudo -E cargo test -p syva-core --test local_mode_starts_server -- --ignored --nocapture +//! ``` + +mod common; + +use syva_proto::syva_core::StatusRequest; + +#[tokio::test] +#[ignore = "requires Linux, root privileges, BPF LSM support, and group 'syva'"] +async fn local_mode_starts_server() -> anyhow::Result<()> { + let dir = tempfile::tempdir()?; + let socket_path = dir.path().join("syva-core.sock"); + let _core = common::spawn_core(&socket_path)?; + + let mut client = common::wait_for_core(&socket_path).await?; + let status = client.status(StatusRequest {}).await?.into_inner(); + + assert!(status.attached); + assert!(socket_path.exists()); + Ok(()) +} From 15024f2ca097f535af8396f9a66b238314664fe8 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 26 Apr 2026 14:33:48 +0200 Subject: [PATCH 2/2] fix(syva-core): address local ingest review feedback --- syva-core/src/ingest.rs | 5 +++-- syva-core/src/main.rs | 25 +++++++++++++++++++++--- syva-core/src/rpc/mod.rs | 42 ++++++++++++++++++++++++++++------------ syva-core/src/zone.rs | 33 +++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 17 deletions(-) diff --git a/syva-core/src/ingest.rs b/syva-core/src/ingest.rs index f9429ea..19dce41 100644 --- a/syva-core/src/ingest.rs +++ b/syva-core/src/ingest.rs @@ -34,6 +34,9 @@ pub(crate) async fn register_zone_local( let was_new = registry.zone_id(zone_name).is_none(); registry.register_zone(zone_name)?; let zone_id = registry.revive_draining(zone_name)?; + if let Some(policy) = policy.as_ref() { + registry.set_zone_type(zone_name, policy.zone_type)?; + } let zones_loaded = registry.zone_count(); (zone_id, zones_loaded, was_new) }; @@ -71,8 +74,6 @@ pub(crate) async fn register_zone_local( } } } - - let _ = policy.zone_type; } health.write().await.zones_loaded = zones_loaded; diff --git a/syva-core/src/main.rs b/syva-core/src/main.rs index b617559..4b36599 100644 --- a/syva-core/src/main.rs +++ b/syva-core/src/main.rs @@ -360,7 +360,16 @@ async fn spawn_local_core_server( } let listener = UnixListener::bind(&socket_path)?; - configure_socket_permissions(&socket_path)?; + if let Err(error) = configure_socket_permissions(&socket_path) { + drop(listener); + cleanup_socket_file(&socket_path).map_err(|cleanup_error| { + anyhow::anyhow!( + "failed to configure syva-core socket permissions at {}: {error}; additionally failed to remove the socket file: {cleanup_error}", + socket_path.display() + ) + })?; + return Err(error); + } let service = rpc::SyvaCoreService { registry, @@ -374,16 +383,26 @@ async fn spawn_local_core_server( let task = tokio::spawn(async move { tracing::info!(socket = display_path, "local syva.core.v1 server listening"); - tonic::transport::Server::builder() + let result = tonic::transport::Server::builder() .add_service(SyvaCoreServer::new(service)) .serve_with_incoming_shutdown(incoming, shutdown) .await - .map_err(|error| anyhow::anyhow!("local gRPC server failed: {error}")) + .map_err(|error| anyhow::anyhow!("local gRPC server failed: {error}")); + cleanup_socket_file(&socket_path)?; + result }); Ok(task) } +fn cleanup_socket_file(socket_path: &std::path::Path) -> anyhow::Result<()> { + match std::fs::remove_file(socket_path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(error.into()), + } +} + fn configure_socket_permissions(socket_path: &std::path::Path) -> anyhow::Result<()> { let permissions = std::fs::Permissions::from_mode(0o660); std::fs::set_permissions(socket_path, permissions)?; diff --git a/syva-core/src/rpc/mod.rs b/syva-core/src/rpc/mod.rs index fdecbc3..0bbdcb0 100644 --- a/syva-core/src/rpc/mod.rs +++ b/syva-core/src/rpc/mod.rs @@ -43,15 +43,10 @@ impl SyvaCore for SyvaCoreService { return Err(Status::invalid_argument("zone_name is required")); } - let policy = req.policy.map(|proto_policy| CoreZonePolicyInput { - host_paths: proto_policy.host_paths, - allowed_zones: proto_policy.allowed_zones, - allow_ptrace: proto_policy.allow_ptrace, - zone_type: match proto_policy.zone_type { - 1 => ZoneType::Privileged, - _ => ZoneType::NonGlobal, - }, - }); + let policy = req + .policy + .map(proto_policy_to_core_input) + .transpose()?; let zone_id = ingest::register_zone_local( &self.registry, @@ -134,8 +129,7 @@ impl SyvaCore for SyvaCoreService { } let mut registry = self.registry.write().await; - let zone_id = match registry.add_container(&req.container_id, &req.zone_name, req.cgroup_id) - { + let zone_id = match registry.add_container(&req.container_id, &req.zone_name, req.cgroup_id) { Ok(id) => id, Err(error) => { return Ok(Response::new(AttachContainerResponse { @@ -144,9 +138,10 @@ impl SyvaCore for SyvaCoreService { })); } }; + let zone_type = registry.zone_type(&req.zone_name).unwrap_or(ZoneType::NonGlobal); let mut ebpf = self.ebpf.lock().await; - if let Err(error) = ebpf.add_zone_member(req.cgroup_id, zone_id, ZoneType::NonGlobal) { + if let Err(error) = ebpf.add_zone_member(req.cgroup_id, zone_id, zone_type) { registry.remove_container(&req.container_id, None); return Err(Status::internal(format!( "BPF add_zone_member failed: {error}" @@ -447,3 +442,26 @@ impl SyvaCore for SyvaCoreService { Ok(Response::new(ReceiverStream::new(rx))) } } + +#[allow(clippy::result_large_err)] +fn proto_policy_to_core_input( + proto_policy: syva_proto::syva_core::ZonePolicy, +) -> Result { + Ok(CoreZonePolicyInput { + host_paths: proto_policy.host_paths, + allowed_zones: proto_policy.allowed_zones, + allow_ptrace: proto_policy.allow_ptrace, + zone_type: parse_proto_zone_type(proto_policy.zone_type)?, + }) +} + +#[allow(clippy::result_large_err)] +fn parse_proto_zone_type(value: i32) -> Result { + match value { + 0 => Ok(ZoneType::NonGlobal), + 1 => Ok(ZoneType::Privileged), + other => Err(Status::invalid_argument(format!( + "unsupported zone_type: {other}" + ))), + } +} diff --git a/syva-core/src/zone.rs b/syva-core/src/zone.rs index dbbe3a1..9cdf714 100644 --- a/syva-core/src/zone.rs +++ b/syva-core/src/zone.rs @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet}; +use crate::types::ZoneType; + /// Zone lifecycle state. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ZoneState { @@ -47,6 +49,7 @@ pub enum ZoneTransition { #[derive(Debug)] pub struct ZoneEntry { pub zone_id: u32, + pub zone_type: ZoneType, pub state: ZoneState, pub refcount: usize, } @@ -110,6 +113,7 @@ impl ZoneRegistry { self.next_id = self.next_id.wrapping_add(1); self.zones.insert(zone_name.to_string(), ZoneEntry { zone_id, + zone_type: ZoneType::NonGlobal, state: ZoneState::Pending, refcount: 0, }); @@ -253,6 +257,20 @@ impl ZoneRegistry { self.zones.get(zone_name).map(|e| e.zone_id) } + /// Update the zone type for an already-registered zone. + pub fn set_zone_type(&mut self, zone_name: &str, zone_type: ZoneType) -> anyhow::Result<()> { + let entry = self.zones.get_mut(zone_name) + .ok_or_else(|| anyhow::anyhow!("zone '{zone_name}' is not registered"))?; + entry.zone_type = zone_type; + Ok(()) + } + + /// Look up zone type by name. + #[cfg_attr(not(test), allow(dead_code))] + pub fn zone_type(&self, zone_name: &str) -> Option { + self.zones.get(zone_name).map(|e| e.zone_type) + } + /// All registered zone names and their IDs. #[allow(dead_code)] pub fn all_zones(&self) -> impl Iterator { @@ -519,6 +537,21 @@ mod tests { assert_eq!(reg.zones["frontend"].state, ZoneState::Pending); } + #[test] + fn new_zones_default_to_non_global() { + let mut reg = ZoneRegistry::new(); + reg.register_zone("frontend").unwrap(); + assert_eq!(reg.zone_type("frontend"), Some(ZoneType::NonGlobal)); + } + + #[test] + fn set_zone_type_persists_for_lookups() { + let mut reg = ZoneRegistry::new(); + reg.register_zone("frontend").unwrap(); + reg.set_zone_type("frontend", ZoneType::Privileged).unwrap(); + assert_eq!(reg.zone_type("frontend"), Some(ZoneType::Privileged)); + } + #[test] fn register_zone_capped_at_max_zones() { // Registration must fail once next_id reaches MAX_ZONES, because