From bf533d01d33bd2ad32bbe8e0c973f2498c9a27fc Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 05:09:47 +0000 Subject: [PATCH 1/3] Add daemon node registration Co-authored-by: multica-agent --- crates/mizan-api/src/daemon_nodes.rs | 673 +++++++++++++++++++++++++++ crates/mizan-api/src/lib.rs | 23 + crates/mizan-api/src/storage.rs | 5 + crates/mizan-core/src/schema.rs | 16 + crates/mizan-daemon/Cargo.toml | 1 + crates/mizan-daemon/src/main.rs | 88 +++- migrations/0004_daemon_nodes.sql | 19 + migrations/README.md | 1 + 8 files changed, 824 insertions(+), 2 deletions(-) create mode 100644 crates/mizan-api/src/daemon_nodes.rs create mode 100644 migrations/0004_daemon_nodes.sql diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs new file mode 100644 index 0000000..358d5fb --- /dev/null +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -0,0 +1,673 @@ +use axum::body::Body; +use axum::extract::{Extension, Path, State}; +use axum::http::{Request, StatusCode, header::AUTHORIZATION}; +use axum::middleware::Next; +use axum::response::Response; +use axum::Json; +use mizan_core::{AppError, DatabaseBackend, ErrorEnvelope}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use sha2::{Digest, Sha256}; +use sqlx::{AnyPool, query, query_as}; +use tracing::{Instrument, info_span, warn}; +use uuid::Uuid; + +use crate::AppState; +use crate::auth::ApiKeyIdentity; +use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; +use crate::utils::{ + from_app_error, is_enabled, is_unique_constraint_error, prepare_sql, unix_timestamp_string, +}; + +type DaemonNodeHttpResult = Result)>; + +const DAEMON_TOKEN_PREFIX: &str = "mizan_sk_daemon_"; +const STATUS_PENDING: &str = "pending"; +const STATUS_ACTIVE: &str = "active"; +const STATUS_REVOKED: &str = "revoked"; +const AUDIT_ACTION_CREATE_DAEMON_NODE: &str = "daemon_node_created"; +const AUDIT_ACTION_REVOKE_DAEMON_NODE: &str = "daemon_node_revoked"; +const AUDIT_ENTITY_DAEMON_NODE: &str = "daemon_node"; + +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonNodeCreateRequest { + pub host_user_id: Option, + pub label: Option, + pub hostname: Option, + pub public_key: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonNodeCreateResponse { + pub id: Uuid, + pub token: String, + pub token_type: &'static str, + pub status: String, + pub host_user_id: Option, + pub label: Option, + pub hostname: Option, + pub public_key: Option, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonNodeResponse { + pub id: Uuid, + pub host_user_id: Option, + pub label: Option, + pub hostname: Option, + pub public_key: Option, + pub status: String, + pub revoked: bool, + pub last_seen_at: Option, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonNodeListResponse { + pub data: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonNodeRevokeResponse { + pub id: Uuid, + pub revoked: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonRegistrationRequest { + pub hostname: Option, + pub public_key: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonRegistrationResponse { + pub node_id: Uuid, + pub status: String, + pub last_seen_at: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonPingResponse { + pub node_id: Uuid, + pub status: String, + pub last_seen_at: String, +} + +#[derive(Debug, Clone)] +pub struct DaemonNodeIdentity { + pub node_id: Uuid, + pub status: String, +} + +#[derive(Debug)] +struct DbDaemonNode { + id: String, + host_user_id: Option, + label: Option, + hostname: Option, + public_key: Option, + status: String, + revoked: i64, + last_seen_at: Option, + created_at: String, + updated_at: String, +} + +pub async fn list_daemon_nodes( + State(state): State, +) -> DaemonNodeHttpResult> { + let rows = query_as::< + _, + ( + String, + Option, + Option, + Option, + Option, + String, + i64, + Option, + String, + String, + ), + >(&prepare_sql( + state.database_backend(), + "SELECT id, + host_user_id, + label, + hostname, + public_key, + status, + revoked, + last_seen_at, + created_at, + updated_at + FROM daemon_nodes + ORDER BY created_at DESC", + )) + .fetch_all(&state.database) + .await + .map_err(|error| from_app_error(AppError::infrastructure(error.to_string())))?; + + let data = rows + .into_iter() + .map( + |( + id, + host_user_id, + label, + hostname, + public_key, + status, + revoked, + last_seen_at, + created_at, + updated_at, + )| { + daemon_node_response(DbDaemonNode { + id, + host_user_id, + label, + hostname, + public_key, + status, + revoked, + last_seen_at, + created_at, + updated_at, + }) + }, + ) + .collect::, _>>() + .map_err(from_app_error)?; + + Ok(Json(DaemonNodeListResponse { data })) +} + +pub async fn create_daemon_node( + State(state): State, + Extension(identity): Extension, + Json(payload): Json, +) -> DaemonNodeHttpResult> { + let host_user_id = payload.host_user_id.unwrap_or(identity.user_id); + ensure_user_exists(&state.database, state.database_backend(), host_user_id) + .await + .map_err(from_app_error)?; + + let label = normalize_optional(payload.label); + let hostname = normalize_optional(payload.hostname); + let public_key = normalize_optional(payload.public_key); + let id = Uuid::now_v7(); + let token = format!("{}{}", DAEMON_TOKEN_PREFIX, Uuid::now_v7()); + let token_hash = hash_value(&token); + let now = unix_timestamp_string(); + + query(&prepare_sql( + state.database_backend(), + "INSERT INTO daemon_nodes ( + id, + host_user_id, + label, + hostname, + public_key, + token_hash, + status, + revoked, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, ?)", + )) + .bind(id.to_string()) + .bind(host_user_id.to_string()) + .bind(label.as_deref()) + .bind(hostname.as_deref()) + .bind(public_key.as_deref()) + .bind(token_hash) + .bind(STATUS_PENDING) + .bind(&now) + .bind(&now) + .execute(&state.database) + .await + .map_err(|error| from_app_error(map_insert_error(error.to_string())))?; + + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_CREATE_DAEMON_NODE.to_owned(), + entity_type: AUDIT_ENTITY_DAEMON_NODE.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ + "host_user_id": host_user_id.to_string(), + "label": label, + "hostname": hostname, + "public_key_present": public_key.is_some(), + "raw_secret_returned_once": true, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record daemon node creation audit"); + } + + Ok(Json(DaemonNodeCreateResponse { + id, + token, + token_type: "Bearer", + status: STATUS_PENDING.to_owned(), + host_user_id: Some(host_user_id), + label, + hostname, + public_key, + created_at: now, + })) +} + +pub async fn revoke_daemon_node( + State(state): State, + Extension(identity): Extension, + Path(id): Path, +) -> DaemonNodeHttpResult> { + let revoked = revoke_node(&state.database, state.database_backend(), id) + .await + .map_err(from_app_error)?; + + if !revoked { + return Err(( + StatusCode::NOT_FOUND, + Json(ErrorEnvelope::from(&AppError::NotFound( + "daemon node not found".to_string(), + ))), + )); + } + + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_REVOKE_DAEMON_NODE.to_owned(), + entity_type: AUDIT_ENTITY_DAEMON_NODE.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ "revoked": true })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record daemon node revocation audit"); + } + + Ok(Json(DaemonNodeRevokeResponse { id, revoked })) +} + +pub async fn register_daemon_node( + State(state): State, + Extension(identity): Extension, + Json(payload): Json, +) -> DaemonNodeHttpResult> { + let last_seen_at = mark_node_seen( + &state.database, + state.database_backend(), + identity.node_id, + normalize_optional(payload.hostname), + normalize_optional(payload.public_key), + ) + .await + .map_err(from_app_error)?; + + Ok(Json(DaemonRegistrationResponse { + node_id: identity.node_id, + status: STATUS_ACTIVE.to_owned(), + last_seen_at, + })) +} + +pub async fn daemon_ping( + State(state): State, + Extension(identity): Extension, +) -> DaemonNodeHttpResult> { + let last_seen_at = mark_node_seen( + &state.database, + state.database_backend(), + identity.node_id, + None, + None, + ) + .await + .map_err(from_app_error)?; + + Ok(Json(DaemonPingResponse { + node_id: identity.node_id, + status: identity.status, + last_seen_at, + })) +} + +pub async fn daemon_node_auth( + State(state): State, + mut request: Request, + next: Next, +) -> DaemonNodeHttpResult { + let authorization = request + .headers() + .get(AUTHORIZATION) + .and_then(|value| value.to_str().ok()) + .ok_or_else(|| map_error(StatusCode::UNAUTHORIZED, AppError::Unauthorized))?; + + let identity = + resolve_daemon_node_identity(&state.database, state.database_backend(), authorization) + .instrument(info_span!("daemon_node_auth")) + .await?; + request.extensions_mut().insert(identity); + Ok(next.run(request).await) +} + +async fn ensure_user_exists( + database: &AnyPool, + database_backend: DatabaseBackend, + user_id: Uuid, +) -> Result<(), AppError> { + let exists = query_as::<_, (i64,)>(&prepare_sql( + database_backend, + "SELECT 1 FROM users WHERE id = ?", + )) + .bind(user_id.to_string()) + .fetch_optional(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + if exists.is_none() { + return Err(AppError::invalid_config( + "daemon_node.host_user_id", + "host_user_id does not exist", + )); + } + + Ok(()) +} + +async fn resolve_daemon_node_identity( + database: &AnyPool, + database_backend: DatabaseBackend, + authorization: &str, +) -> DaemonNodeHttpResult { + let token = authorization_token(authorization) + .ok_or_else(|| map_error(StatusCode::UNAUTHORIZED, AppError::Unauthorized))?; + let token_hash = hash_value(token); + + let row = query_as::<_, (String, String)>(&prepare_sql( + database_backend, + "SELECT id, status + FROM daemon_nodes + WHERE token_hash = ? AND revoked = 0 AND status != ?", + )) + .bind(token_hash) + .bind(STATUS_REVOKED) + .fetch_optional(database) + .await + .map_err(|error| { + map_error( + StatusCode::INTERNAL_SERVER_ERROR, + AppError::infrastructure(error.to_string()), + ) + })? + .ok_or_else(|| map_error(StatusCode::UNAUTHORIZED, AppError::Unauthorized))?; + + let node_id = Uuid::parse_str(&row.0).map_err(|error| { + map_error( + StatusCode::UNAUTHORIZED, + AppError::invalid_config("daemon node identity", error.to_string()), + ) + })?; + + Ok(DaemonNodeIdentity { + node_id, + status: row.1, + }) +} + +async fn mark_node_seen( + database: &AnyPool, + database_backend: DatabaseBackend, + node_id: Uuid, + hostname: Option, + public_key: Option, +) -> Result { + let now = unix_timestamp_string(); + + let result = query(&prepare_sql( + database_backend, + "UPDATE daemon_nodes + SET status = ?, + last_seen_at = ?, + hostname = COALESCE(?, hostname), + public_key = COALESCE(?, public_key), + updated_at = ? + WHERE id = ? AND revoked = 0 AND status != ?", + )) + .bind(STATUS_ACTIVE) + .bind(&now) + .bind(hostname.as_deref()) + .bind(public_key.as_deref()) + .bind(&now) + .bind(node_id.to_string()) + .bind(STATUS_REVOKED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + if result.rows_affected() != 1 { + return Err(AppError::Unauthorized); + } + + Ok(now) +} + +async fn revoke_node( + database: &AnyPool, + database_backend: DatabaseBackend, + node_id: Uuid, +) -> Result { + let now = unix_timestamp_string(); + let result = query(&prepare_sql( + database_backend, + "UPDATE daemon_nodes + SET status = ?, revoked = 1, updated_at = ? + WHERE id = ? AND revoked = 0", + )) + .bind(STATUS_REVOKED) + .bind(&now) + .bind(node_id.to_string()) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + Ok(result.rows_affected() == 1) +} + +fn daemon_node_response(row: DbDaemonNode) -> Result { + let id = Uuid::parse_str(&row.id) + .map_err(|error| AppError::infrastructure(format!("invalid daemon node id: {error}")))?; + let host_user_id = row + .host_user_id + .as_deref() + .map(Uuid::parse_str) + .transpose() + .map_err(|error| { + AppError::infrastructure(format!("invalid daemon node host user id: {error}")) + })?; + + Ok(DaemonNodeResponse { + id, + host_user_id, + label: row.label, + hostname: row.hostname, + public_key: row.public_key, + status: row.status, + revoked: is_enabled(row.revoked), + last_seen_at: row.last_seen_at, + created_at: row.created_at, + updated_at: row.updated_at, + }) +} + +fn normalize_optional(value: Option) -> Option { + value + .map(|value| value.trim().to_owned()) + .filter(|value| !value.is_empty()) +} + +fn authorization_token(raw_authorization: &str) -> Option<&str> { + let mut split = raw_authorization.split_whitespace(); + let scheme = split.next()?; + if !scheme.eq_ignore_ascii_case("Bearer") { + return None; + } + + split.next() +} + +fn hash_value(value: &str) -> String { + let mut digest = Sha256::new(); + digest.update(value.as_bytes()); + digest + .finalize() + .iter() + .map(|byte| format!("{byte:02x}")) + .collect::() +} + +fn map_error(status: StatusCode, error: AppError) -> (StatusCode, Json) { + (status, Json(ErrorEnvelope::from(&error))) +} + +fn map_insert_error(error: String) -> AppError { + if is_unique_constraint_error(&error) { + AppError::invalid_config("daemon_node.token", "daemon node token must be unique") + } else { + AppError::infrastructure(error) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage; + use sqlx::query_scalar; + + async fn sqlite_test_database() -> AnyPool { + storage::connect_and_migrate("sqlite::memory:", true, 1) + .await + .expect("create sqlite test database") + } + + async fn seed_user(database: &AnyPool) -> Uuid { + let id = Uuid::now_v7(); + let now = unix_timestamp_string(); + query(&prepare_sql( + DatabaseBackend::Sqlite, + "INSERT INTO users (id, email, password_hash, role, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?)", + )) + .bind(id.to_string()) + .bind(format!("{id}@example.test")) + .bind("hash") + .bind("admin") + .bind(&now) + .bind(&now) + .execute(database) + .await + .expect("insert user"); + id + } + + async fn insert_node(database: &AnyPool, token: &str, revoked: bool) -> Uuid { + let node_id = Uuid::now_v7(); + let user_id = seed_user(database).await; + let now = unix_timestamp_string(); + query(&prepare_sql( + DatabaseBackend::Sqlite, + "INSERT INTO daemon_nodes ( + id, host_user_id, token_hash, status, revoked, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)", + )) + .bind(node_id.to_string()) + .bind(user_id.to_string()) + .bind(hash_value(token)) + .bind(if revoked { + STATUS_REVOKED + } else { + STATUS_PENDING + }) + .bind(if revoked { 1 } else { 0 }) + .bind(&now) + .bind(&now) + .execute(database) + .await + .expect("insert daemon node"); + node_id + } + + #[tokio::test] + async fn daemon_registration_accepts_valid_token_and_marks_node_seen() { + let database = sqlite_test_database().await; + let token = "mizan_sk_daemon_valid"; + let node_id = insert_node(&database, token, false).await; + + let identity = resolve_daemon_node_identity( + &database, + DatabaseBackend::Sqlite, + &format!("Bearer {token}"), + ) + .await + .expect("valid daemon token should authenticate"); + assert_eq!(identity.node_id, node_id); + + let last_seen = mark_node_seen( + &database, + DatabaseBackend::Sqlite, + node_id, + Some("host-a".to_owned()), + Some("ssh-ed25519 test".to_owned()), + ) + .await + .expect("mark seen"); + assert!(!last_seen.is_empty()); + + let status: String = query_scalar("SELECT status FROM daemon_nodes WHERE id = ?") + .bind(node_id.to_string()) + .fetch_one(&database) + .await + .expect("read status"); + assert_eq!(status, STATUS_ACTIVE); + } + + #[tokio::test] + async fn daemon_registration_rejects_invalid_token() { + let database = sqlite_test_database().await; + insert_node(&database, "mizan_sk_daemon_valid", false).await; + + let error = resolve_daemon_node_identity( + &database, + DatabaseBackend::Sqlite, + "Bearer mizan_sk_daemon_invalid", + ) + .await + .expect_err("invalid token should fail"); + + assert_eq!(error.0, StatusCode::UNAUTHORIZED); + } + + #[tokio::test] + async fn daemon_registration_rejects_revoked_node() { + let database = sqlite_test_database().await; + let token = "mizan_sk_daemon_revoked"; + insert_node(&database, token, true).await; + + let error = resolve_daemon_node_identity( + &database, + DatabaseBackend::Sqlite, + &format!("Bearer {token}"), + ) + .await + .expect_err("revoked node should fail"); + + assert_eq!(error.0, StatusCode::UNAUTHORIZED); + } +} diff --git a/crates/mizan-api/src/lib.rs b/crates/mizan-api/src/lib.rs index a78f645..097b15e 100644 --- a/crates/mizan-api/src/lib.rs +++ b/crates/mizan-api/src/lib.rs @@ -18,6 +18,7 @@ use tracing::{info, warn}; mod auth; mod billing; +mod daemon_nodes; mod gateway; mod logging; mod metrics; @@ -203,6 +204,26 @@ pub fn router(state: AppState) -> Router { .route_layer(from_fn(providers::require_admin_role)) .route_layer(from_fn_with_state(state.clone(), auth::api_key_auth)); + let daemon_admin_router = Router::new() + .route( + "/admin/daemon-nodes", + get(daemon_nodes::list_daemon_nodes).post(daemon_nodes::create_daemon_node), + ) + .route( + "/admin/daemon-nodes/{id}", + delete(daemon_nodes::revoke_daemon_node), + ) + .route_layer(from_fn(providers::require_admin_role)) + .route_layer(from_fn_with_state(state.clone(), auth::api_key_auth)); + + let daemon_router = Router::new() + .route("/daemon/register", post(daemon_nodes::register_daemon_node)) + .route("/daemon/ping", get(daemon_nodes::daemon_ping)) + .route_layer(from_fn_with_state( + state.clone(), + daemon_nodes::daemon_node_auth, + )); + Router::new() .route("/healthz", get(healthz)) .route("/readyz", get(readyz)) @@ -213,6 +234,8 @@ pub fn router(state: AppState) -> Router { .merge(billing_router) .merge(provider_router) .merge(billing_admin_router) + .merge(daemon_admin_router) + .merge(daemon_router) .fallback(not_found) .layer(TraceLayer::new_for_http()) .with_state(state) diff --git a/crates/mizan-api/src/storage.rs b/crates/mizan-api/src/storage.rs index 7b9aab6..de2c54c 100644 --- a/crates/mizan-api/src/storage.rs +++ b/crates/mizan-api/src/storage.rs @@ -139,6 +139,7 @@ mod tests { "usage_events", "request_logs", "admin_audit_logs", + "daemon_nodes", ]; let expected_indexes = [ @@ -159,6 +160,10 @@ mod tests { "idx_provider_connections_enabled", "idx_model_routes_public_model", "idx_wallets_owner_user_id", + "idx_daemon_nodes_host_user_id", + "idx_daemon_nodes_status", + "idx_daemon_nodes_last_seen_at", + "idx_daemon_nodes_token_hash", ]; for expected in expected_tables { diff --git a/crates/mizan-core/src/schema.rs b/crates/mizan-core/src/schema.rs index 7113cfc..51d55b2 100644 --- a/crates/mizan-core/src/schema.rs +++ b/crates/mizan-core/src/schema.rs @@ -13,6 +13,7 @@ pub mod tables { pub const USAGE_EVENTS: &str = "usage_events"; pub const REQUEST_LOGS: &str = "request_logs"; pub const ADMIN_AUDIT_LOGS: &str = "admin_audit_logs"; + pub const DAEMON_NODES: &str = "daemon_nodes"; } pub fn bool_to_i64(value: bool) -> i64 { @@ -154,6 +155,21 @@ pub struct AdminAuditLogRecord { pub created_at: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonNodeRecord { + pub id: Uuid, + pub host_user_id: Option, + pub label: Option, + pub hostname: Option, + pub public_key: Option, + pub token_hash: String, + pub status: String, + pub revoked: bool, + pub last_seen_at: Option, + pub created_at: String, + pub updated_at: String, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/mizan-daemon/Cargo.toml b/crates/mizan-daemon/Cargo.toml index 93edc79..6e07a97 100644 --- a/crates/mizan-daemon/Cargo.toml +++ b/crates/mizan-daemon/Cargo.toml @@ -9,6 +9,7 @@ rust-version.workspace = true [dependencies] clap = { version = "4", features = ["derive"] } mizan-core = { path = "../mizan-core" } +reqwest.workspace = true serde.workspace = true tokio.workspace = true toml = "0.8" diff --git a/crates/mizan-daemon/src/main.rs b/crates/mizan-daemon/src/main.rs index 96a1d44..d213942 100644 --- a/crates/mizan-daemon/src/main.rs +++ b/crates/mizan-daemon/src/main.rs @@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf, process, time::Duration}; use clap::{Args, CommandFactory, Parser, Subcommand}; use mizan_core::{AppError, AppResult, init_tracing}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::{net::TcpStream, time::timeout}; use tracing::info; @@ -19,6 +19,7 @@ async fn execute() -> AppResult<()> { match cli.command { Some(Command::Run(args)) => run(args).await, + Some(Command::Register(args)) => register(args).await, Some(Command::ConfigCheck(args)) => config_check(args), Some(Command::Health(args)) => health(args).await, None => { @@ -40,6 +41,7 @@ struct Cli { #[derive(Subcommand)] enum Command { Run(ConfigArgs), + Register(ConfigArgs), ConfigCheck(ConfigArgs), Health(HealthArgs), } @@ -72,11 +74,64 @@ async fn run(args: ConfigArgs) -> AppResult<()> { health_addr = %config.health_addr, "mizan daemon startup configuration loaded" ); - info!("daemon registration is prepared; node registration lands in the next milestone task"); + info!("daemon registration is available with `mizan-daemon register --config `"); Ok(()) } +async fn register(args: ConfigArgs) -> AppResult<()> { + let config = DaemonConfig::load(&args.config)?; + init_tracing("mizan_daemon=info,mizan_core=info")?; + + let token = std::fs::read_to_string(&config.daemon_token_path) + .map_err(|error| AppError::config("daemon_token_path", error))?; + let token = token.trim(); + if token.is_empty() { + return Err(AppError::invalid_config( + "daemon_token_path", + "daemon token file is empty", + )); + } + + let registration_url = control_plane_endpoint(&config.control_plane_url, "/daemon/register"); + let response = reqwest::Client::new() + .post(®istration_url) + .bearer_auth(token) + .json(&DaemonRegistrationRequest { + hostname: std::env::var("HOSTNAME") + .ok() + .map(|value| value.trim().to_owned()) + .filter(|value| !value.is_empty()), + public_key: None, + }) + .send() + .await + .map_err(|error| AppError::infrastructure(format!("daemon registration failed: {error}")))?; + + let status = response.status(); + if !status.is_success() { + return Err(AppError::infrastructure(format!( + "daemon registration rejected by control plane with status {status}" + ))); + } + + let body: DaemonRegistrationResponse = response.json().await.map_err(|error| { + AppError::infrastructure(format!("invalid daemon registration response: {error}")) + })?; + + info!( + node_id = %body.node_id, + status = %body.status, + last_seen_at = %body.last_seen_at, + "daemon node registered with control plane" + ); + println!( + "ok: daemon node {} registered status={} last_seen_at={}", + body.node_id, body.status, body.last_seen_at + ); + Ok(()) +} + fn config_check(args: ConfigArgs) -> AppResult<()> { let config = DaemonConfig::load(&args.config)?; println!( @@ -180,10 +235,31 @@ struct RawDaemonConfig { health_addr: Option, } +#[derive(Debug, Serialize)] +struct DaemonRegistrationRequest { + hostname: Option, + public_key: Option, +} + +#[derive(Debug, Deserialize)] +struct DaemonRegistrationResponse { + node_id: String, + status: String, + last_seen_at: String, +} + fn required_field(value: Option, key: &'static str) -> AppResult { value.ok_or_else(|| AppError::invalid_config(key, "is required")) } +fn control_plane_endpoint(control_plane_url: &str, path: &str) -> String { + format!( + "{}/{}", + control_plane_url.trim_end_matches('/'), + path.trim_start_matches('/') + ) +} + #[cfg(test)] mod tests { use super::*; @@ -251,6 +327,14 @@ health_addr = "127.0.0.1:19180" assert!(error.to_string().contains("max_concurrency")); } + #[test] + fn builds_registration_endpoint_without_double_slashes() { + assert_eq!( + control_plane_endpoint("https://mizan.example.test/", "/daemon/register"), + "https://mizan.example.test/daemon/register" + ); + } + #[test] fn redacts_secret_material_for_logs() { let input = "daemon_token=mizan_sk_daemon_123 bearer=Bearer abc"; diff --git a/migrations/0004_daemon_nodes.sql b/migrations/0004_daemon_nodes.sql new file mode 100644 index 0000000..53b8cf5 --- /dev/null +++ b/migrations/0004_daemon_nodes.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS daemon_nodes ( + id TEXT PRIMARY KEY, + host_user_id TEXT, + label TEXT, + hostname TEXT, + public_key TEXT, + token_hash TEXT NOT NULL UNIQUE, + status TEXT NOT NULL DEFAULT 'pending', + revoked INTEGER NOT NULL DEFAULT 0, + last_seen_at TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY (host_user_id) REFERENCES users (id) ON DELETE SET NULL +); + +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_host_user_id ON daemon_nodes (host_user_id); +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_status ON daemon_nodes (status); +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_last_seen_at ON daemon_nodes (last_seen_at); +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_token_hash ON daemon_nodes (token_hash); diff --git a/migrations/README.md b/migrations/README.md index afbd686..d9bbf08 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -18,3 +18,4 @@ Current versions: - usage_events - request_logs - admin_audit_logs + - daemon_nodes From 47f1e28fd2543d9973eb15bf5de70cf5cfa6692d Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 08:04:22 +0000 Subject: [PATCH 2/3] Fix daemon registration PR checks Co-authored-by: multica-agent --- crates/mizan-api/src/daemon_nodes.rs | 4 ++-- crates/mizan-daemon/src/main.rs | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs index 358d5fb..9e0cbf1 100644 --- a/crates/mizan-api/src/daemon_nodes.rs +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -1,9 +1,9 @@ +use axum::Json; use axum::body::Body; use axum::extract::{Extension, Path, State}; use axum::http::{Request, StatusCode, header::AUTHORIZATION}; use axum::middleware::Next; use axum::response::Response; -use axum::Json; use mizan_core::{AppError, DatabaseBackend, ErrorEnvelope}; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -200,7 +200,7 @@ pub async fn create_daemon_node( let hostname = normalize_optional(payload.hostname); let public_key = normalize_optional(payload.public_key); let id = Uuid::now_v7(); - let token = format!("{}{}", DAEMON_TOKEN_PREFIX, Uuid::now_v7()); + let token = format!("{}{}", DAEMON_TOKEN_PREFIX, Uuid::new_v4()); let token_hash = hash_value(&token); let now = unix_timestamp_string(); diff --git a/crates/mizan-daemon/src/main.rs b/crates/mizan-daemon/src/main.rs index d213942..2e3fcf5 100644 --- a/crates/mizan-daemon/src/main.rs +++ b/crates/mizan-daemon/src/main.rs @@ -94,7 +94,12 @@ async fn register(args: ConfigArgs) -> AppResult<()> { } let registration_url = control_plane_endpoint(&config.control_plane_url, "/daemon/register"); - let response = reqwest::Client::new() + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|error| AppError::infrastructure(format!("daemon http client failed: {error}")))?; + + let response = client .post(®istration_url) .bearer_auth(token) .json(&DaemonRegistrationRequest { @@ -106,7 +111,9 @@ async fn register(args: ConfigArgs) -> AppResult<()> { }) .send() .await - .map_err(|error| AppError::infrastructure(format!("daemon registration failed: {error}")))?; + .map_err(|error| { + AppError::infrastructure(format!("daemon registration failed: {error}")) + })?; let status = response.status(); if !status.is_success() { From 1084e21cd7f996923947a61e844c44fab9bf224d Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 08:05:55 +0000 Subject: [PATCH 3/3] Enable UUID v4 for daemon tokens Co-authored-by: multica-agent --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b284c54..00e1f89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,4 +38,4 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "net tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json"] } -uuid = { version = "1", features = ["serde", "v7"] } +uuid = { version = "1", features = ["serde", "v4", "v7"] }