From f33ec697f0eae6adaf76e6f1bd2ea0a4b7a2f98f Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 08:18:54 +0000 Subject: [PATCH 1/4] Add daemon heartbeat capability registry Co-authored-by: multica-agent --- .env.example | 1 + README.md | 4 + crates/mizan-api/src/daemon_nodes.rs | 626 +++++++++++++++++++++--- crates/mizan-api/src/lib.rs | 1 + crates/mizan-api/src/storage.rs | 2 + crates/mizan-core/src/config.rs | 12 + crates/mizan-daemon/Cargo.toml | 1 + crates/mizan-daemon/src/main.rs | 233 ++++++++- migrations/0005_daemon_capabilities.sql | 12 + migrations/README.md | 5 + 10 files changed, 813 insertions(+), 84 deletions(-) create mode 100644 migrations/0005_daemon_capabilities.sql diff --git a/.env.example b/.env.example index 6d453e2..983dd1d 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,4 @@ MIZAN_ADMIN_EMAIL= MIZAN_ADMIN_PASSWORD= MIZAN_ADMIN_ROLE=admin MIZAN_PROVIDER_SECRET_KEY= +MIZAN_DAEMON_STALE_SECONDS=90 diff --git a/README.md b/README.md index 9558eeb..b4b8de3 100644 --- a/README.md +++ b/README.md @@ -158,9 +158,13 @@ Minimal `mizan-daemon.toml`: control_plane_url = "http://127.0.0.1:18180" daemon_token_path = "/run/secrets/mizan-daemon-token" local_provider_url = "http://127.0.0.1:11434/v1" +provider_family = "openai-compatible" advertised_models = ["llama3.1"] max_concurrency = 2 +region = "local" +labels = ["gpu"] health_addr = "127.0.0.1:19180" +heartbeat_interval_seconds = 30 ``` Run API, SQLite-backed storage, and Redis with Docker Compose: diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs index 9e0cbf1..ff091a4 100644 --- a/crates/mizan-api/src/daemon_nodes.rs +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -6,9 +6,9 @@ use axum::middleware::Next; use axum::response::Response; use mizan_core::{AppError, DatabaseBackend, ErrorEnvelope}; use serde::{Deserialize, Serialize}; -use serde_json::json; +use serde_json::{Value, json}; use sha2::{Digest, Sha256}; -use sqlx::{AnyPool, query, query_as}; +use sqlx::{AnyPool, FromRow, query, query_as}; use tracing::{Instrument, info_span, warn}; use uuid::Uuid; @@ -16,7 +16,8 @@ use crate::AppState; use crate::auth::ApiKeyIdentity; use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; use crate::utils::{ - from_app_error, is_enabled, is_unique_constraint_error, prepare_sql, unix_timestamp_string, + from_app_error, is_enabled, is_unique_constraint_error, now_utc_epoch_seconds, parse_timestamp, + prepare_sql, unix_timestamp_string, }; type DaemonNodeHttpResult = Result)>; @@ -25,6 +26,7 @@ const DAEMON_TOKEN_PREFIX: &str = "mizan_sk_daemon_"; const STATUS_PENDING: &str = "pending"; const STATUS_ACTIVE: &str = "active"; const STATUS_REVOKED: &str = "revoked"; +const HEALTH_STATUS_HEALTHY: &str = "healthy"; const AUDIT_ACTION_CREATE_DAEMON_NODE: &str = "daemon_node_created"; const AUDIT_ACTION_REVOKE_DAEMON_NODE: &str = "daemon_node_revoked"; const AUDIT_ENTITY_DAEMON_NODE: &str = "daemon_node"; @@ -59,7 +61,9 @@ pub struct DaemonNodeResponse { pub public_key: Option, pub status: String, pub revoked: bool, + pub disabled: bool, pub last_seen_at: Option, + pub capabilities: DaemonCapabilityResponse, pub created_at: String, pub updated_at: String, } @@ -79,6 +83,8 @@ pub struct DaemonNodeRevokeResponse { pub struct DaemonRegistrationRequest { pub hostname: Option, pub public_key: Option, + #[serde(default)] + pub capabilities: Option, } #[derive(Debug, Clone, Serialize)] @@ -86,6 +92,7 @@ pub struct DaemonRegistrationResponse { pub node_id: Uuid, pub status: String, pub last_seen_at: String, + pub capabilities: DaemonCapabilityResponse, } #[derive(Debug, Clone, Serialize)] @@ -95,13 +102,68 @@ pub struct DaemonPingResponse { pub last_seen_at: String, } +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonHeartbeatRequest { + #[serde(default)] + pub hostname: Option, + #[serde(default)] + pub public_key: Option, + pub capabilities: DaemonCapabilityPayload, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonHeartbeatResponse { + pub node_id: Uuid, + pub status: String, + pub last_seen_at: String, + pub capabilities: DaemonCapabilityResponse, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonCapabilityPayload { + pub provider_family: String, + pub model_ids: Vec, + pub max_concurrency: u32, + #[serde(default)] + pub pricing_metadata: Option, + #[serde(default)] + pub region: Option, + #[serde(default)] + pub labels: Vec, + #[serde(default)] + pub health_status: Option, + #[serde(default)] + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DaemonCapabilityResponse { + pub provider_family: Option, + pub model_ids: Vec, + pub max_concurrency: Option, + pub pricing_metadata: Option, + pub region: Option, + pub labels: Vec, + pub health_status: Option, + pub metadata: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EligibleDaemonNode { + pub id: Uuid, + pub provider_family: String, + pub model_id: String, + pub max_concurrency: u32, + pub last_seen_at: String, +} + #[derive(Debug, Clone)] pub struct DaemonNodeIdentity { pub node_id: Uuid, pub status: String, } -#[derive(Debug)] +#[derive(Debug, FromRow)] struct DbDaemonNode { id: String, host_user_id: Option, @@ -110,29 +172,36 @@ struct DbDaemonNode { public_key: Option, status: String, revoked: i64, + disabled: i64, last_seen_at: Option, + provider_family: Option, + model_ids_json: String, + max_concurrency: Option, + pricing_metadata_json: Option, + region: Option, + labels_json: String, + health_status: Option, + capability_metadata_json: Option, created_at: String, updated_at: String, } +#[derive(Debug, Clone)] +struct NormalizedCapabilities { + provider_family: String, + model_ids: Vec, + max_concurrency: u32, + pricing_metadata: Option, + region: Option, + labels: Vec, + health_status: String, + metadata: Option, +} + pub async fn list_daemon_nodes( State(state): State, ) -> DaemonNodeHttpResult> { - let rows = query_as::< - _, - ( - String, - Option, - Option, - Option, - Option, - String, - i64, - Option, - String, - String, - ), - >(&prepare_sql( + let rows = query_as::<_, DbDaemonNode>(&prepare_sql( state.database_backend(), "SELECT id, host_user_id, @@ -141,7 +210,16 @@ pub async fn list_daemon_nodes( public_key, status, revoked, + disabled, last_seen_at, + provider_family, + model_ids_json, + max_concurrency, + pricing_metadata_json, + region, + labels_json, + health_status, + capability_metadata_json, created_at, updated_at FROM daemon_nodes @@ -153,33 +231,7 @@ pub async fn list_daemon_nodes( let data = rows .into_iter() - .map( - |( - id, - host_user_id, - label, - hostname, - public_key, - status, - revoked, - last_seen_at, - created_at, - updated_at, - )| { - daemon_node_response(DbDaemonNode { - id, - host_user_id, - label, - hostname, - public_key, - status, - revoked, - last_seen_at, - created_at, - updated_at, - }) - }, - ) + .map(daemon_node_response) .collect::, _>>() .map_err(from_app_error)?; @@ -301,12 +353,18 @@ pub async fn register_daemon_node( Extension(identity): Extension, Json(payload): Json, ) -> DaemonNodeHttpResult> { + let capabilities = payload + .capabilities + .map(normalize_capabilities) + .transpose() + .map_err(from_app_error)?; let last_seen_at = mark_node_seen( &state.database, state.database_backend(), identity.node_id, normalize_optional(payload.hostname), normalize_optional(payload.public_key), + capabilities.as_ref(), ) .await .map_err(from_app_error)?; @@ -315,6 +373,34 @@ pub async fn register_daemon_node( node_id: identity.node_id, status: STATUS_ACTIVE.to_owned(), last_seen_at, + capabilities: capabilities + .map(normalized_capability_response) + .unwrap_or_default(), + })) +} + +pub async fn daemon_heartbeat( + State(state): State, + Extension(identity): Extension, + Json(payload): Json, +) -> DaemonNodeHttpResult> { + let capabilities = normalize_capabilities(payload.capabilities).map_err(from_app_error)?; + let last_seen_at = mark_node_seen( + &state.database, + state.database_backend(), + identity.node_id, + normalize_optional(payload.hostname), + normalize_optional(payload.public_key), + Some(&capabilities), + ) + .await + .map_err(from_app_error)?; + + Ok(Json(DaemonHeartbeatResponse { + node_id: identity.node_id, + status: STATUS_ACTIVE.to_owned(), + last_seen_at, + capabilities: normalized_capability_response(capabilities), })) } @@ -328,6 +414,7 @@ pub async fn daemon_ping( identity.node_id, None, None, + None, ) .await .map_err(from_app_error)?; @@ -428,29 +515,69 @@ async fn mark_node_seen( node_id: Uuid, hostname: Option, public_key: Option, + capabilities: Option<&NormalizedCapabilities>, ) -> Result { let now = unix_timestamp_string(); - let result = query(&prepare_sql( - database_backend, - "UPDATE daemon_nodes - SET status = ?, - last_seen_at = ?, - hostname = COALESCE(?, hostname), - public_key = COALESCE(?, public_key), - updated_at = ? - WHERE id = ? AND revoked = 0 AND status != ?", - )) - .bind(STATUS_ACTIVE) - .bind(&now) - .bind(hostname.as_deref()) - .bind(public_key.as_deref()) - .bind(&now) - .bind(node_id.to_string()) - .bind(STATUS_REVOKED) - .execute(database) - .await - .map_err(|error| AppError::infrastructure(error.to_string()))?; + let result = if let Some(capabilities) = capabilities { + query(&prepare_sql( + database_backend, + "UPDATE daemon_nodes + SET status = ?, + last_seen_at = ?, + hostname = COALESCE(?, hostname), + public_key = COALESCE(?, public_key), + provider_family = ?, + model_ids_json = ?, + max_concurrency = ?, + pricing_metadata_json = ?, + region = ?, + labels_json = ?, + health_status = ?, + capability_metadata_json = ?, + updated_at = ? + WHERE id = ? AND revoked = 0 AND status != ?", + )) + .bind(STATUS_ACTIVE) + .bind(&now) + .bind(hostname.as_deref()) + .bind(public_key.as_deref()) + .bind(&capabilities.provider_family) + .bind(serialize_json(&capabilities.model_ids)?) + .bind(i64::from(capabilities.max_concurrency)) + .bind(serialize_optional_json(capabilities.pricing_metadata.as_ref())?) + .bind(capabilities.region.as_deref()) + .bind(serialize_json(&capabilities.labels)?) + .bind(&capabilities.health_status) + .bind(serialize_optional_json(capabilities.metadata.as_ref())?) + .bind(&now) + .bind(node_id.to_string()) + .bind(STATUS_REVOKED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))? + } else { + query(&prepare_sql( + database_backend, + "UPDATE daemon_nodes + SET status = ?, + last_seen_at = ?, + hostname = COALESCE(?, hostname), + public_key = COALESCE(?, public_key), + updated_at = ? + WHERE id = ? AND revoked = 0 AND status != ?", + )) + .bind(STATUS_ACTIVE) + .bind(&now) + .bind(hostname.as_deref()) + .bind(public_key.as_deref()) + .bind(&now) + .bind(node_id.to_string()) + .bind(STATUS_REVOKED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))? + }; if result.rows_affected() != 1 { return Err(AppError::Unauthorized); @@ -501,12 +628,218 @@ fn daemon_node_response(row: DbDaemonNode) -> Result Result, AppError> { + let model_id = model_id.trim(); + if model_id.is_empty() { + return Ok(None); + } + + let cutoff = now_utc_epoch_seconds().saturating_sub(stale_after_seconds.max(1)); + let rows = query_as::<_, (String, String, String, i64, String)>(&prepare_sql( + database_backend, + "SELECT id, provider_family, model_ids_json, max_concurrency, last_seen_at + FROM daemon_nodes + WHERE status = ? + AND revoked = 0 + AND disabled = 0 + AND health_status = ? + AND provider_family IS NOT NULL + AND max_concurrency IS NOT NULL + AND last_seen_at IS NOT NULL + ORDER BY last_seen_at DESC, created_at ASC", + )) + .bind(STATUS_ACTIVE) + .bind(HEALTH_STATUS_HEALTHY) + .fetch_all(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + for (id, provider_family, model_ids_json, max_concurrency, last_seen_at) in rows { + let last_seen = parse_timestamp(&last_seen_at)?; + if last_seen < cutoff { + continue; + } + + let model_ids = parse_json_vec(&model_ids_json, "daemon_node.model_ids_json")?; + if !model_ids.iter().any(|candidate| candidate == model_id) { + continue; + } + + let max_concurrency = u32::try_from(max_concurrency).map_err(|_| { + AppError::infrastructure("stored daemon max_concurrency is invalid") + })?; + if max_concurrency == 0 { + continue; + } + + let id = Uuid::parse_str(&id).map_err(|error| { + AppError::infrastructure(format!("stored daemon node id is invalid: {error}")) + })?; + + return Ok(Some(EligibleDaemonNode { + id, + provider_family, + model_id: model_id.to_owned(), + max_concurrency, + last_seen_at, + })); + } + + Ok(None) +} + +impl Default for DaemonCapabilityResponse { + fn default() -> Self { + Self { + provider_family: None, + model_ids: Vec::new(), + max_concurrency: None, + pricing_metadata: None, + region: None, + labels: Vec::new(), + health_status: None, + metadata: None, + } + } +} + +fn normalized_capability_response(value: NormalizedCapabilities) -> DaemonCapabilityResponse { + DaemonCapabilityResponse { + provider_family: Some(value.provider_family), + model_ids: value.model_ids, + max_concurrency: Some(value.max_concurrency), + pricing_metadata: value.pricing_metadata, + region: value.region, + labels: value.labels, + health_status: Some(value.health_status), + metadata: value.metadata, + } +} + +fn daemon_capability_response(row: &DbDaemonNode) -> Result { + let max_concurrency = row + .max_concurrency + .map(|value| { + u32::try_from(value).map_err(|_| { + AppError::infrastructure("stored daemon max_concurrency is invalid") + }) + }) + .transpose()?; + + Ok(DaemonCapabilityResponse { + provider_family: row.provider_family.clone(), + model_ids: parse_json_vec(&row.model_ids_json, "daemon_node.model_ids_json")?, + max_concurrency, + pricing_metadata: parse_optional_json_value( + row.pricing_metadata_json.as_deref(), + "daemon_node.pricing_metadata_json", + )?, + region: row.region.clone(), + labels: parse_json_vec(&row.labels_json, "daemon_node.labels_json")?, + health_status: row.health_status.clone(), + metadata: parse_optional_json_value( + row.capability_metadata_json.as_deref(), + "daemon_node.capability_metadata_json", + )?, + }) +} + +fn normalize_capabilities( + payload: DaemonCapabilityPayload, +) -> Result { + let provider_family = payload.provider_family.trim().to_ascii_lowercase(); + if provider_family.is_empty() { + return Err(AppError::invalid_config( + "daemon_capabilities.provider_family", + "provider_family is required", + )); + } + + let model_ids = normalize_string_list(payload.model_ids); + if model_ids.is_empty() { + return Err(AppError::invalid_config( + "daemon_capabilities.model_ids", + "at least one model id is required", + )); + } + + if payload.max_concurrency == 0 { + return Err(AppError::invalid_config( + "daemon_capabilities.max_concurrency", + "must be greater than zero", + )); + } + + let health_status = normalize_optional(payload.health_status) + .unwrap_or_else(|| HEALTH_STATUS_HEALTHY.to_owned()) + .to_ascii_lowercase(); + + Ok(NormalizedCapabilities { + provider_family, + model_ids, + max_concurrency: payload.max_concurrency, + pricing_metadata: payload.pricing_metadata, + region: normalize_optional(payload.region), + labels: normalize_string_list(payload.labels), + health_status, + metadata: payload.metadata, + }) +} + +fn normalize_string_list(values: Vec) -> Vec { + let mut normalized = Vec::new(); + for value in values { + let value = value.trim(); + if value.is_empty() + || normalized + .iter() + .any(|candidate: &String| candidate.as_str() == value) + { + continue; + } + normalized.push(value.to_owned()); + } + normalized +} + +fn serialize_json(value: &impl Serialize) -> Result { + serde_json::to_string(value) + .map_err(|error| AppError::infrastructure(format!("json serialization failed: {error}"))) +} + +fn serialize_optional_json(value: Option<&Value>) -> Result, AppError> { + value.map(serialize_json).transpose() +} + +fn parse_json_vec(raw: &str, field_name: &'static str) -> Result, AppError> { + serde_json::from_str(raw) + .map_err(|error| AppError::infrastructure(format!("{field_name} is invalid: {error}"))) +} + +fn parse_optional_json_value( + raw: Option<&str>, + field_name: &'static str, +) -> Result, AppError> { + raw.map(|value| { + serde_json::from_str(value) + .map_err(|error| AppError::infrastructure(format!("{field_name} is invalid: {error}"))) + }) + .transpose() +} + fn normalize_optional(value: Option) -> Option { value .map(|value| value.trim().to_owned()) @@ -625,6 +958,7 @@ mod tests { node_id, Some("host-a".to_owned()), Some("ssh-ed25519 test".to_owned()), + None, ) .await .expect("mark seen"); @@ -638,6 +972,120 @@ mod tests { assert_eq!(status, STATUS_ACTIVE); } + #[tokio::test] + async fn daemon_heartbeat_stores_capabilities() { + let database = sqlite_test_database().await; + let token = "mizan_sk_daemon_capable"; + let node_id = insert_node(&database, token, false).await; + let capabilities = normalize_capabilities(DaemonCapabilityPayload { + provider_family: "openai-compatible".to_owned(), + model_ids: vec![" llama3.1 ".to_owned(), "qwen2.5-coder".to_owned()], + max_concurrency: 4, + pricing_metadata: Some(json!({"input_per_1m": 100})), + region: Some("iad".to_owned()), + labels: vec!["gpu".to_owned()], + health_status: None, + metadata: Some(json!({"local_provider_url": "http://127.0.0.1:11434/v1"})), + }) + .expect("valid capabilities"); + + mark_node_seen( + &database, + DatabaseBackend::Sqlite, + node_id, + Some("host-b".to_owned()), + None, + Some(&capabilities), + ) + .await + .expect("mark seen with capabilities"); + + let row: (String, String, i64, String, String) = query_as( + "SELECT provider_family, model_ids_json, max_concurrency, health_status, region + FROM daemon_nodes WHERE id = ?", + ) + .bind(node_id.to_string()) + .fetch_one(&database) + .await + .expect("read daemon capabilities"); + + assert_eq!(row.0, "openai-compatible"); + assert_eq!(row.1, r#"["llama3.1","qwen2.5-coder"]"#); + assert_eq!(row.2, 4); + assert_eq!(row.3, HEALTH_STATUS_HEALTHY); + assert_eq!(row.4, "iad"); + } + + #[tokio::test] + async fn daemon_capability_validation_rejects_empty_models_and_zero_capacity() { + let empty_models = normalize_capabilities(DaemonCapabilityPayload { + provider_family: "openai-compatible".to_owned(), + model_ids: vec![" ".to_owned()], + max_concurrency: 1, + pricing_metadata: None, + region: None, + labels: Vec::new(), + health_status: None, + metadata: None, + }); + assert!(empty_models.is_err()); + + let zero_capacity = normalize_capabilities(DaemonCapabilityPayload { + provider_family: "openai-compatible".to_owned(), + model_ids: vec!["llama3.1".to_owned()], + max_concurrency: 0, + pricing_metadata: None, + region: None, + labels: Vec::new(), + health_status: None, + metadata: None, + }); + assert!(zero_capacity.is_err()); + } + + #[tokio::test] + async fn daemon_selection_excludes_stale_disabled_and_unhealthy_nodes() { + let database = sqlite_test_database().await; + let now = now_utc_epoch_seconds(); + let online = insert_selectable_node(&database, "llama3.1", now, 0, HEALTH_STATUS_HEALTHY) + .await; + insert_selectable_node(&database, "llama3.1", now - 120, 0, HEALTH_STATUS_HEALTHY).await; + insert_selectable_node(&database, "llama3.1", now, 1, HEALTH_STATUS_HEALTHY).await; + insert_selectable_node(&database, "llama3.1", now, 0, "degraded").await; + + let selected = + select_eligible_daemon_node(&database, DatabaseBackend::Sqlite, "llama3.1", 60) + .await + .expect("select daemon node") + .expect("online node should be selected"); + + assert_eq!(selected.id, online); + } + + #[tokio::test] + async fn daemon_selection_returns_none_when_only_stale_nodes_match() { + let database = sqlite_test_database().await; + insert_selectable_node( + &database, + "self-hosted/gpt-oss", + now_utc_epoch_seconds() - 120, + 0, + HEALTH_STATUS_HEALTHY, + ) + .await; + + let selected = select_eligible_daemon_node( + &database, + DatabaseBackend::Sqlite, + "self-hosted/gpt-oss", + 60, + ) + .await + .expect("select daemon node"); + + assert!(selected.is_none()); + } + #[tokio::test] async fn daemon_registration_rejects_invalid_token() { let database = sqlite_test_database().await; @@ -670,4 +1118,50 @@ mod tests { assert_eq!(error.0, StatusCode::UNAUTHORIZED); } + + async fn insert_selectable_node( + database: &AnyPool, + model_id: &str, + last_seen_at: i64, + disabled: i64, + health_status: &str, + ) -> Uuid { + let node_id = Uuid::now_v7(); + let user_id = seed_user(database).await; + let now = unix_timestamp_string(); + query(&prepare_sql( + DatabaseBackend::Sqlite, + "INSERT INTO daemon_nodes ( + id, + host_user_id, + token_hash, + status, + revoked, + disabled, + last_seen_at, + provider_family, + model_ids_json, + max_concurrency, + health_status, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?)", + )) + .bind(node_id.to_string()) + .bind(user_id.to_string()) + .bind(hash_value(&format!("token-{node_id}"))) + .bind(STATUS_ACTIVE) + .bind(disabled) + .bind(last_seen_at.to_string()) + .bind("openai-compatible") + .bind(serialize_json(&vec![model_id.to_owned()]).expect("serialize model ids")) + .bind(4_i64) + .bind(health_status) + .bind(&now) + .bind(&now) + .execute(database) + .await + .expect("insert selectable daemon node"); + node_id + } } diff --git a/crates/mizan-api/src/lib.rs b/crates/mizan-api/src/lib.rs index 097b15e..0fef937 100644 --- a/crates/mizan-api/src/lib.rs +++ b/crates/mizan-api/src/lib.rs @@ -218,6 +218,7 @@ pub fn router(state: AppState) -> Router { let daemon_router = Router::new() .route("/daemon/register", post(daemon_nodes::register_daemon_node)) + .route("/daemon/heartbeat", post(daemon_nodes::daemon_heartbeat)) .route("/daemon/ping", get(daemon_nodes::daemon_ping)) .route_layer(from_fn_with_state( state.clone(), diff --git a/crates/mizan-api/src/storage.rs b/crates/mizan-api/src/storage.rs index de2c54c..dcef482 100644 --- a/crates/mizan-api/src/storage.rs +++ b/crates/mizan-api/src/storage.rs @@ -164,6 +164,8 @@ mod tests { "idx_daemon_nodes_status", "idx_daemon_nodes_last_seen_at", "idx_daemon_nodes_token_hash", + "idx_daemon_nodes_disabled", + "idx_daemon_nodes_health_status", ]; for expected in expected_tables { diff --git a/crates/mizan-core/src/config.rs b/crates/mizan-core/src/config.rs index 3646a55..745848d 100644 --- a/crates/mizan-core/src/config.rs +++ b/crates/mizan-core/src/config.rs @@ -21,6 +21,7 @@ pub struct AppConfig { pub admin_seed_role: String, pub provider_secret_key: Option, pub log_raw_request_bodies: bool, + pub daemon_stale_seconds: u32, } impl AppConfig { @@ -124,6 +125,16 @@ impl AppConfig { "false", |value| parse_bool_value("MIZAN_LOG_RAW_REQUEST_BODIES", value), )?, + daemon_stale_seconds: env::var("MIZAN_DAEMON_STALE_SECONDS").map_or( + Ok(DEFAULT_DAEMON_STALE_SECONDS), + |value| { + parse_u32_env( + "MIZAN_DAEMON_STALE_SECONDS", + &value, + DEFAULT_DAEMON_STALE_SECONDS, + ) + }, + )?, }) } else { Err(AppError::invalid_config( @@ -206,6 +217,7 @@ fn parse_bool_value(key: &'static str, raw_value: &str) -> AppResult { const DEFAULT_DATABASE_MAX_CONNECTIONS: u32 = 10; const DEFAULT_LIMIT_WINDOW_SECONDS: u32 = 60; const DEFAULT_LIMIT_LEASE_SECONDS: u32 = 120; +const DEFAULT_DAEMON_STALE_SECONDS: u32 = 90; fn parse_u32_env(key: &'static str, raw_value: &str, default: u32) -> AppResult { let value = raw_value.trim(); diff --git a/crates/mizan-daemon/Cargo.toml b/crates/mizan-daemon/Cargo.toml index 6e07a97..09b20dd 100644 --- a/crates/mizan-daemon/Cargo.toml +++ b/crates/mizan-daemon/Cargo.toml @@ -11,6 +11,7 @@ clap = { version = "4", features = ["derive"] } mizan-core = { path = "../mizan-core" } reqwest.workspace = true serde.workspace = true +serde_json.workspace = true tokio.workspace = true toml = "0.8" tracing.workspace = true diff --git a/crates/mizan-daemon/src/main.rs b/crates/mizan-daemon/src/main.rs index 2e3fcf5..f8c7f96 100644 --- a/crates/mizan-daemon/src/main.rs +++ b/crates/mizan-daemon/src/main.rs @@ -3,8 +3,12 @@ use std::{net::SocketAddr, path::PathBuf, process, time::Duration}; use clap::{Args, CommandFactory, Parser, Subcommand}; use mizan_core::{AppError, AppResult, init_tracing}; use serde::{Deserialize, Serialize}; -use tokio::{net::TcpStream, time::timeout}; -use tracing::info; +use serde_json::Value; +use tokio::{ + net::TcpStream, + time::{sleep, timeout}, +}; +use tracing::{info, warn}; #[tokio::main] async fn main() { @@ -64,6 +68,9 @@ struct HealthArgs { async fn run(args: ConfigArgs) -> AppResult<()> { let config = DaemonConfig::load(&args.config)?; init_tracing("mizan_daemon=info,mizan_core=info")?; + let token = read_daemon_token(&config)?; + let heartbeat_url = control_plane_endpoint(&config.control_plane_url, "/daemon/heartbeat"); + let client = daemon_http_client()?; info!( control_plane_url = %config.control_plane_url, @@ -71,43 +78,53 @@ async fn run(args: ConfigArgs) -> AppResult<()> { local_provider_url = %config.local_provider_url, advertised_models = %config.advertised_models.join(","), max_concurrency = config.max_concurrency, + provider_family = %config.provider_family, health_addr = %config.health_addr, + heartbeat_interval_seconds = config.heartbeat_interval_seconds, "mizan daemon startup configuration loaded" ); info!("daemon registration is available with `mizan-daemon register --config `"); - Ok(()) + loop { + match send_heartbeat(&client, &heartbeat_url, &token, &config).await { + Ok(body) => { + info!( + node_id = %body.node_id, + status = %body.status, + last_seen_at = %body.last_seen_at, + "daemon heartbeat accepted by control plane" + ); + } + Err(error) => { + warn!(error = %error, "daemon heartbeat failed"); + } + } + sleep(Duration::from_secs(u64::from( + config.heartbeat_interval_seconds.max(1), + ))) + .await; + } } async fn register(args: ConfigArgs) -> AppResult<()> { let config = DaemonConfig::load(&args.config)?; init_tracing("mizan_daemon=info,mizan_core=info")?; - let token = std::fs::read_to_string(&config.daemon_token_path) - .map_err(|error| AppError::config("daemon_token_path", error))?; - let token = token.trim(); - if token.is_empty() { - return Err(AppError::invalid_config( - "daemon_token_path", - "daemon token file is empty", - )); - } + let token = read_daemon_token(&config)?; let registration_url = control_plane_endpoint(&config.control_plane_url, "/daemon/register"); - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(30)) - .build() - .map_err(|error| AppError::infrastructure(format!("daemon http client failed: {error}")))?; + let client = daemon_http_client()?; let response = client .post(®istration_url) - .bearer_auth(token) + .bearer_auth(&token) .json(&DaemonRegistrationRequest { hostname: std::env::var("HOSTNAME") .ok() .map(|value| value.trim().to_owned()) .filter(|value| !value.is_empty()), public_key: None, + capabilities: config.capabilities_payload(), }) .send() .await @@ -152,6 +169,59 @@ fn config_check(args: ConfigArgs) -> AppResult<()> { Ok(()) } +async fn send_heartbeat( + client: &reqwest::Client, + heartbeat_url: &str, + token: &str, + config: &DaemonConfig, +) -> AppResult { + let response = client + .post(heartbeat_url) + .bearer_auth(token) + .json(&DaemonHeartbeatRequest { + hostname: std::env::var("HOSTNAME") + .ok() + .map(|value| value.trim().to_owned()) + .filter(|value| !value.is_empty()), + public_key: None, + capabilities: config.capabilities_payload(), + }) + .send() + .await + .map_err(|error| AppError::infrastructure(format!("daemon heartbeat failed: {error}")))?; + + let status = response.status(); + if !status.is_success() { + return Err(AppError::infrastructure(format!( + "daemon heartbeat rejected by control plane with status {status}" + ))); + } + + response.json().await.map_err(|error| { + AppError::infrastructure(format!("invalid daemon heartbeat response: {error}")) + }) +} + +fn read_daemon_token(config: &DaemonConfig) -> AppResult { + let token = std::fs::read_to_string(&config.daemon_token_path) + .map_err(|error| AppError::config("daemon_token_path", error))?; + let token = token.trim(); + if token.is_empty() { + return Err(AppError::invalid_config( + "daemon_token_path", + "daemon token file is empty", + )); + } + Ok(token.to_owned()) +} + +fn daemon_http_client() -> AppResult { + reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|error| AppError::infrastructure(format!("daemon http client failed: {error}"))) +} + async fn health(args: HealthArgs) -> AppResult<()> { if let Some(path) = args.config { let config = DaemonConfig::load(&path)?; @@ -177,14 +247,19 @@ async fn probe_health_addr(addr: SocketAddr, timeout_ms: u64) -> AppResult<()> { Ok(()) } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] struct DaemonConfig { control_plane_url: String, daemon_token_path: String, local_provider_url: String, + provider_family: String, advertised_models: Vec, max_concurrency: u32, + pricing_metadata: Option, + region: Option, + labels: Vec, health_addr: SocketAddr, + heartbeat_interval_seconds: u32, } impl DaemonConfig { @@ -200,6 +275,11 @@ impl DaemonConfig { let daemon_token_path = required_field(raw_config.daemon_token_path, "daemon_token_path")?; let local_provider_url = required_field(raw_config.local_provider_url, "local_provider_url")?; + let provider_family = raw_config + .provider_family + .unwrap_or_else(|| "openai-compatible".to_owned()) + .trim() + .to_ascii_lowercase(); let advertised_models = required_field(raw_config.advertised_models, "advertised_models")?; let max_concurrency = raw_config.max_concurrency.unwrap_or(1); let health_addr = raw_config @@ -214,22 +294,59 @@ impl DaemonConfig { "at least one model is required", )); } + if provider_family.is_empty() { + return Err(AppError::invalid_config( + "provider_family", + "provider_family is required", + )); + } if max_concurrency == 0 { return Err(AppError::invalid_config( "max_concurrency", "must be greater than zero", )); } + let heartbeat_interval_seconds = raw_config.heartbeat_interval_seconds.unwrap_or(30); + if heartbeat_interval_seconds == 0 { + return Err(AppError::invalid_config( + "heartbeat_interval_seconds", + "must be greater than zero", + )); + } Ok(Self { control_plane_url, daemon_token_path, local_provider_url, + provider_family, advertised_models, max_concurrency, + pricing_metadata: raw_config.pricing_metadata, + region: raw_config + .region + .map(|value| value.trim().to_owned()) + .filter(|value| !value.is_empty()), + labels: normalize_string_list(raw_config.labels.unwrap_or_default()), health_addr, + heartbeat_interval_seconds, }) } + + fn capabilities_payload(&self) -> DaemonCapabilityPayload { + DaemonCapabilityPayload { + provider_family: self.provider_family.clone(), + model_ids: self.advertised_models.clone(), + max_concurrency: self.max_concurrency, + pricing_metadata: self.pricing_metadata.clone(), + region: self.region.clone(), + labels: self.labels.clone(), + health_status: Some("healthy".to_owned()), + metadata: Some(serde_json::json!({ + "local_provider_url": self.local_provider_url, + "health_addr": self.health_addr.to_string(), + })), + } + } } #[derive(Debug, Deserialize)] @@ -237,15 +354,40 @@ struct RawDaemonConfig { control_plane_url: Option, daemon_token_path: Option, local_provider_url: Option, + provider_family: Option, advertised_models: Option>, max_concurrency: Option, + pricing_metadata: Option, + region: Option, + labels: Option>, health_addr: Option, + heartbeat_interval_seconds: Option, } #[derive(Debug, Serialize)] struct DaemonRegistrationRequest { hostname: Option, public_key: Option, + capabilities: DaemonCapabilityPayload, +} + +#[derive(Debug, Serialize)] +struct DaemonHeartbeatRequest { + hostname: Option, + public_key: Option, + capabilities: DaemonCapabilityPayload, +} + +#[derive(Debug, Serialize)] +struct DaemonCapabilityPayload { + provider_family: String, + model_ids: Vec, + max_concurrency: u32, + pricing_metadata: Option, + region: Option, + labels: Vec, + health_status: Option, + metadata: Option, } #[derive(Debug, Deserialize)] @@ -255,6 +397,13 @@ struct DaemonRegistrationResponse { last_seen_at: String, } +#[derive(Debug, Deserialize)] +struct DaemonHeartbeatResponse { + node_id: String, + status: String, + last_seen_at: String, +} + fn required_field(value: Option, key: &'static str) -> AppResult { value.ok_or_else(|| AppError::invalid_config(key, "is required")) } @@ -267,6 +416,22 @@ fn control_plane_endpoint(control_plane_url: &str, path: &str) -> String { ) } +fn normalize_string_list(values: Vec) -> Vec { + let mut normalized = Vec::new(); + for value in values { + let value = value.trim(); + if value.is_empty() + || normalized + .iter() + .any(|candidate: &String| candidate.as_str() == value) + { + continue; + } + normalized.push(value.to_owned()); + } + normalized +} + #[cfg(test)] mod tests { use super::*; @@ -276,9 +441,13 @@ mod tests { control_plane_url = "https://mizan.example.test" daemon_token_path = "/run/secrets/mizan-daemon-token" local_provider_url = "http://127.0.0.1:11434/v1" +provider_family = "openai-compatible" advertised_models = ["llama3.1", "qwen2.5-coder"] max_concurrency = 4 +region = "local" +labels = ["gpu", "lab"] health_addr = "127.0.0.1:19180" +heartbeat_interval_seconds = 15 "#; #[test] @@ -290,11 +459,15 @@ health_addr = "127.0.0.1:19180" config.advertised_models, vec!["llama3.1".to_owned(), "qwen2.5-coder".to_owned()] ); + assert_eq!(config.provider_family, "openai-compatible"); assert_eq!(config.max_concurrency, 4); + assert_eq!(config.region.as_deref(), Some("local")); + assert_eq!(config.labels, vec!["gpu".to_owned(), "lab".to_owned()]); assert_eq!( config.health_addr, "127.0.0.1:19180".parse::().unwrap() ); + assert_eq!(config.heartbeat_interval_seconds, 15); } #[test] @@ -318,6 +491,22 @@ health_addr = "127.0.0.1:19180" ); } + #[test] + fn builds_capability_payload_from_config() { + let config = DaemonConfig::parse(VALID_CONFIG).expect("config should parse"); + + let payload = config.capabilities_payload(); + + assert_eq!(payload.provider_family, "openai-compatible"); + assert_eq!( + payload.model_ids, + vec!["llama3.1".to_owned(), "qwen2.5-coder".to_owned()] + ); + assert_eq!(payload.max_concurrency, 4); + assert_eq!(payload.region.as_deref(), Some("local")); + assert_eq!(payload.health_status.as_deref(), Some("healthy")); + } + #[test] fn rejects_missing_required_fields() { let error = DaemonConfig::parse("control_plane_url = \"https://mizan.example.test\"") @@ -334,6 +523,14 @@ health_addr = "127.0.0.1:19180" assert!(error.to_string().contains("max_concurrency")); } + #[test] + fn rejects_zero_heartbeat_interval() { + let raw = VALID_CONFIG.replace("heartbeat_interval_seconds = 15", "heartbeat_interval_seconds = 0"); + let error = DaemonConfig::parse(&raw).expect_err("config should fail"); + + assert!(error.to_string().contains("heartbeat_interval_seconds")); + } + #[test] fn builds_registration_endpoint_without_double_slashes() { assert_eq!( diff --git a/migrations/0005_daemon_capabilities.sql b/migrations/0005_daemon_capabilities.sql new file mode 100644 index 0000000..2a4ff76 --- /dev/null +++ b/migrations/0005_daemon_capabilities.sql @@ -0,0 +1,12 @@ +ALTER TABLE daemon_nodes ADD COLUMN provider_family TEXT; +ALTER TABLE daemon_nodes ADD COLUMN model_ids_json TEXT NOT NULL DEFAULT '[]'; +ALTER TABLE daemon_nodes ADD COLUMN max_concurrency INTEGER; +ALTER TABLE daemon_nodes ADD COLUMN pricing_metadata_json TEXT; +ALTER TABLE daemon_nodes ADD COLUMN region TEXT; +ALTER TABLE daemon_nodes ADD COLUMN labels_json TEXT NOT NULL DEFAULT '[]'; +ALTER TABLE daemon_nodes ADD COLUMN health_status TEXT; +ALTER TABLE daemon_nodes ADD COLUMN capability_metadata_json TEXT; +ALTER TABLE daemon_nodes ADD COLUMN disabled INTEGER NOT NULL DEFAULT 0; + +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_disabled ON daemon_nodes (disabled); +CREATE INDEX IF NOT EXISTS idx_daemon_nodes_health_status ON daemon_nodes (health_status); diff --git a/migrations/README.md b/migrations/README.md index d9bbf08..b0c2849 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -19,3 +19,8 @@ Current versions: - request_logs - admin_audit_logs - daemon_nodes +- `0002_request_log_foundations.sql` +- `0003_provider_auth_modes.sql` +- `0004_daemon_nodes.sql` +- `0005_daemon_capabilities.sql` + - daemon heartbeat capability fields and selection indexes From 22829e724bc0e44e77ca9f69abbfef71b9bab5ba Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 12:07:53 +0000 Subject: [PATCH 2/4] Fix daemon heartbeat CI feedback Co-authored-by: multica-agent --- crates/mizan-api/src/daemon_nodes.rs | 49 +++++++++++++++------------- crates/mizan-api/src/utils.rs | 7 ++-- crates/mizan-daemon/src/main.rs | 5 ++- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs index ff091a4..c68c4c1 100644 --- a/crates/mizan-api/src/daemon_nodes.rs +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -16,8 +16,8 @@ use crate::AppState; use crate::auth::ApiKeyIdentity; use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; use crate::utils::{ - from_app_error, is_enabled, is_unique_constraint_error, now_utc_epoch_seconds, parse_timestamp, - prepare_sql, unix_timestamp_string, + from_app_error, is_enabled, is_unique_constraint_error, now_utc_epoch_seconds, prepare_sql, + unix_timestamp_string, }; type DaemonNodeHttpResult = Result)>; @@ -171,12 +171,12 @@ struct DbDaemonNode { hostname: Option, public_key: Option, status: String, - revoked: i64, - disabled: i64, + revoked: i32, + disabled: i32, last_seen_at: Option, provider_family: Option, model_ids_json: String, - max_concurrency: Option, + max_concurrency: Option, pricing_metadata_json: Option, region: Option, labels_json: String, @@ -544,8 +544,15 @@ async fn mark_node_seen( .bind(public_key.as_deref()) .bind(&capabilities.provider_family) .bind(serialize_json(&capabilities.model_ids)?) - .bind(i64::from(capabilities.max_concurrency)) - .bind(serialize_optional_json(capabilities.pricing_metadata.as_ref())?) + .bind(i32::try_from(capabilities.max_concurrency).map_err(|_| { + AppError::invalid_config( + "daemon_capabilities.max_concurrency", + "max_concurrency exceeds database integer range", + ) + })?) + .bind(serialize_optional_json( + capabilities.pricing_metadata.as_ref(), + )?) .bind(capabilities.region.as_deref()) .bind(serialize_json(&capabilities.labels)?) .bind(&capabilities.health_status) @@ -648,7 +655,8 @@ pub async fn select_eligible_daemon_node( } let cutoff = now_utc_epoch_seconds().saturating_sub(stale_after_seconds.max(1)); - let rows = query_as::<_, (String, String, String, i64, String)>(&prepare_sql( + let cutoff = cutoff.to_string(); + let rows = query_as::<_, (String, String, String, i32, String)>(&prepare_sql( database_backend, "SELECT id, provider_family, model_ids_json, max_concurrency, last_seen_at FROM daemon_nodes @@ -659,28 +667,24 @@ pub async fn select_eligible_daemon_node( AND provider_family IS NOT NULL AND max_concurrency IS NOT NULL AND last_seen_at IS NOT NULL + AND last_seen_at >= ? ORDER BY last_seen_at DESC, created_at ASC", )) .bind(STATUS_ACTIVE) .bind(HEALTH_STATUS_HEALTHY) + .bind(&cutoff) .fetch_all(database) .await .map_err(|error| AppError::infrastructure(error.to_string()))?; for (id, provider_family, model_ids_json, max_concurrency, last_seen_at) in rows { - let last_seen = parse_timestamp(&last_seen_at)?; - if last_seen < cutoff { - continue; - } - let model_ids = parse_json_vec(&model_ids_json, "daemon_node.model_ids_json")?; if !model_ids.iter().any(|candidate| candidate == model_id) { continue; } - let max_concurrency = u32::try_from(max_concurrency).map_err(|_| { - AppError::infrastructure("stored daemon max_concurrency is invalid") - })?; + let max_concurrency = u32::try_from(max_concurrency) + .map_err(|_| AppError::infrastructure("stored daemon max_concurrency is invalid"))?; if max_concurrency == 0 { continue; } @@ -733,9 +737,8 @@ fn daemon_capability_response(row: &DbDaemonNode) -> Result Uuid { let node_id = Uuid::now_v7(); @@ -1155,7 +1158,7 @@ mod tests { .bind(last_seen_at.to_string()) .bind("openai-compatible") .bind(serialize_json(&vec![model_id.to_owned()]).expect("serialize model ids")) - .bind(4_i64) + .bind(4_i32) .bind(health_status) .bind(&now) .bind(&now) diff --git a/crates/mizan-api/src/utils.rs b/crates/mizan-api/src/utils.rs index 4774328..32ff8de 100644 --- a/crates/mizan-api/src/utils.rs +++ b/crates/mizan-api/src/utils.rs @@ -31,8 +31,11 @@ pub fn prepare_sql(database_backend: DatabaseBackend, query: &'_ str) -> String } } -pub fn is_enabled(raw: i64) -> bool { - raw != 0 +pub fn is_enabled(raw: T) -> bool +where + T: Into, +{ + raw.into() != 0 } pub fn parse_timestamp(raw: &str) -> AppResult { diff --git a/crates/mizan-daemon/src/main.rs b/crates/mizan-daemon/src/main.rs index f8c7f96..2704413 100644 --- a/crates/mizan-daemon/src/main.rs +++ b/crates/mizan-daemon/src/main.rs @@ -525,7 +525,10 @@ heartbeat_interval_seconds = 15 #[test] fn rejects_zero_heartbeat_interval() { - let raw = VALID_CONFIG.replace("heartbeat_interval_seconds = 15", "heartbeat_interval_seconds = 0"); + let raw = VALID_CONFIG.replace( + "heartbeat_interval_seconds = 15", + "heartbeat_interval_seconds = 0", + ); let error = DaemonConfig::parse(&raw).expect_err("config should fail"); assert!(error.to_string().contains("heartbeat_interval_seconds")); From 6eb3974ffc1c1bb9a8a5814e766792e651729369 Mon Sep 17 00:00:00 2001 From: Dimas Date: Sun, 7 Jun 2026 12:10:31 +0000 Subject: [PATCH 3/4] Fix daemon node response ownership Co-authored-by: multica-agent --- crates/mizan-api/src/daemon_nodes.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs index c68c4c1..853d7f7 100644 --- a/crates/mizan-api/src/daemon_nodes.rs +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -618,6 +618,7 @@ async fn revoke_node( fn daemon_node_response(row: DbDaemonNode) -> Result { let id = Uuid::parse_str(&row.id) .map_err(|error| AppError::infrastructure(format!("invalid daemon node id: {error}")))?; + let capabilities = daemon_capability_response(&row)?; let host_user_id = row .host_user_id .as_deref() @@ -637,7 +638,7 @@ fn daemon_node_response(row: DbDaemonNode) -> Result Date: Sun, 7 Jun 2026 12:12:56 +0000 Subject: [PATCH 4/4] Address daemon selection clippy warnings Co-authored-by: multica-agent --- crates/mizan-api/src/daemon_nodes.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/crates/mizan-api/src/daemon_nodes.rs b/crates/mizan-api/src/daemon_nodes.rs index 853d7f7..cf3c5a7 100644 --- a/crates/mizan-api/src/daemon_nodes.rs +++ b/crates/mizan-api/src/daemon_nodes.rs @@ -136,7 +136,7 @@ pub struct DaemonCapabilityPayload { pub metadata: Option, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Default, Serialize)] pub struct DaemonCapabilityResponse { pub provider_family: Option, pub model_ids: Vec, @@ -148,6 +148,7 @@ pub struct DaemonCapabilityResponse { pub metadata: Option, } +#[allow(dead_code)] #[derive(Debug, Clone, PartialEq, Eq)] pub struct EligibleDaemonNode { pub id: Uuid, @@ -644,6 +645,7 @@ fn daemon_node_response(row: DbDaemonNode) -> Result Self { - Self { - provider_family: None, - model_ids: Vec::new(), - max_concurrency: None, - pricing_metadata: None, - region: None, - labels: Vec::new(), - health_status: None, - metadata: None, - } - } -} - fn normalized_capability_response(value: NormalizedCapabilities) -> DaemonCapabilityResponse { DaemonCapabilityResponse { provider_family: Some(value.provider_family),