From 9b09c651605f09ca584b1724b84d5072c9a572d5 Mon Sep 17 00:00:00 2001 From: Dimas Date: Thu, 11 Jun 2026 05:10:22 +0000 Subject: [PATCH] PEN-118: add daemon dispatch job lifecycle Co-authored-by: multica-agent --- crates/mizan-api/src/dispatch.rs | 605 ++++++++++++++++++++++++++++++ crates/mizan-api/src/gateway.rs | 390 +++++++++++++++++++ crates/mizan-api/src/lib.rs | 9 + crates/mizan-api/src/storage.rs | 4 + crates/mizan-daemon/src/main.rs | 201 +++++++++- migrations/0006_dispatch_jobs.sql | 25 ++ 6 files changed, 1233 insertions(+), 1 deletion(-) create mode 100644 crates/mizan-api/src/dispatch.rs create mode 100644 migrations/0006_dispatch_jobs.sql diff --git a/crates/mizan-api/src/dispatch.rs b/crates/mizan-api/src/dispatch.rs new file mode 100644 index 0000000..b064ff0 --- /dev/null +++ b/crates/mizan-api/src/dispatch.rs @@ -0,0 +1,605 @@ +use std::time::{Duration, Instant}; + +use axum::Json; +use axum::extract::{Extension, Path, State}; +use axum::http::StatusCode; +use mizan_core::{AppError, DatabaseBackend, ErrorEnvelope}; +use mizan_providers::{ChatRequest, ChatResponse}; +use serde::{Deserialize, Serialize}; +use sqlx::{AnyPool, query, query_as}; +use tokio::time::sleep; +use tracing::warn; +use uuid::Uuid; + +use crate::AppState; +use crate::daemon_nodes::{DaemonNodeIdentity, EligibleDaemonNode}; +use crate::utils::{from_app_error, now_utc_epoch_seconds, prepare_sql, unix_timestamp_string}; + +type DispatchHttpResult = Result)>; + +pub const STATUS_ACCEPTED: &str = "accepted"; +pub const STATUS_LEASED: &str = "leased"; +pub const STATUS_SUCCEEDED: &str = "succeeded"; +pub const STATUS_FAILED: &str = "failed"; +pub const STATUS_TIMED_OUT: &str = "timed_out"; + +#[derive(Debug, Clone)] +pub struct DispatchJobInput { + pub request_id: Uuid, + pub node_id: Uuid, + pub user_id: Option, + pub api_key_id: Option, + pub model: String, + pub request: ChatRequest, + pub timeout_seconds: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DispatchJobLeaseResponse { + pub id: Uuid, + pub request_id: Uuid, + pub model: String, + pub request: ChatRequest, + pub deadline_at: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DispatchJobLeaseEnvelope { + pub data: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DispatchJobCompleteRequest { + pub status: String, + #[serde(default)] + pub response: Option, + #[serde(default)] + pub error_code: Option, + #[serde(default)] + pub error_message: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DispatchJobStatusResponse { + pub id: Uuid, + pub status: String, +} + +#[derive(Debug, Clone)] +pub enum DispatchJobResult { + Succeeded(ChatResponse), + Failed { + error_code: Option, + error_message: String, + }, + TimedOut, +} + +#[derive(Debug, Clone)] +struct DispatchJobRow { + id: String, + request_id: String, + model: String, + request_json: String, + deadline_at: String, +} + +pub async fn lease_next_dispatch_job( + State(state): State, + Extension(identity): Extension, +) -> DispatchHttpResult> { + let data = lease_next_job_for_node(&state.database, state.database_backend(), identity.node_id) + .await + .map_err(from_app_error)?; + + Ok(Json(DispatchJobLeaseEnvelope { data })) +} + +pub async fn complete_dispatch_job( + State(state): State, + Extension(identity): Extension, + Path(id): Path, + Json(payload): Json, +) -> DispatchHttpResult> { + let status = complete_job( + &state.database, + state.database_backend(), + identity.node_id, + id, + payload, + ) + .await + .map_err(from_app_error)?; + + Ok(Json(DispatchJobStatusResponse { id, status })) +} + +pub async fn create_dispatch_job( + database: &AnyPool, + database_backend: DatabaseBackend, + input: DispatchJobInput, +) -> Result { + let id = Uuid::now_v7(); + let now = unix_timestamp_string(); + let deadline_at = now_utc_epoch_seconds() + .saturating_add(i64::from(input.timeout_seconds.max(1))) + .to_string(); + let request_json = serde_json::to_string(&input.request).map_err(|error| { + AppError::infrastructure(format!("dispatch request encode failed: {error}")) + })?; + + query(&prepare_sql( + database_backend, + "INSERT INTO dispatch_jobs ( + id, + request_id, + node_id, + user_id, + api_key_id, + model, + status, + request_json, + deadline_at, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )) + .bind(id.to_string()) + .bind(input.request_id.to_string()) + .bind(input.node_id.to_string()) + .bind(input.user_id.map(|value| value.to_string())) + .bind(input.api_key_id.map(|value| value.to_string())) + .bind(input.model) + .bind(STATUS_ACCEPTED) + .bind(request_json) + .bind(deadline_at) + .bind(&now) + .bind(&now) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + Ok(id) +} + +pub async fn dispatch_to_daemon_node( + database: &AnyPool, + database_backend: DatabaseBackend, + node: &EligibleDaemonNode, + input: DispatchJobInput, +) -> Result { + let timeout_seconds = input.timeout_seconds.max(1); + let job_id = create_dispatch_job(database, database_backend, input).await?; + + let result = match wait_for_dispatch_result( + database, + database_backend, + job_id, + Duration::from_secs(u64::from(timeout_seconds)), + ) + .await? + { + DispatchJobResult::TimedOut => { + mark_job_timed_out(database, database_backend, job_id).await?; + Ok(DispatchJobResult::TimedOut) + } + result => Ok(result), + }; + + if let Err(error) = &result { + warn!( + node_id = %node.id, + job_id = %job_id, + error = %error, + "daemon dispatch failed" + ); + } + + result +} + +pub async fn lease_next_job_for_node( + database: &AnyPool, + database_backend: DatabaseBackend, + node_id: Uuid, +) -> Result, AppError> { + mark_expired_jobs_timed_out(database, database_backend).await?; + let now = unix_timestamp_string(); + + let rows = query_as::<_, (String, String, String, String, String)>(&prepare_sql( + database_backend, + "SELECT id, request_id, model, request_json, deadline_at + FROM dispatch_jobs + WHERE node_id = ? + AND status = ? + AND deadline_at >= ? + ORDER BY created_at ASC + LIMIT 5", + )) + .bind(node_id.to_string()) + .bind(STATUS_ACCEPTED) + .bind(&now) + .fetch_all(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + for row in rows { + let row = DispatchJobRow { + id: row.0, + request_id: row.1, + model: row.2, + request_json: row.3, + deadline_at: row.4, + }; + let updated = query(&prepare_sql( + database_backend, + "UPDATE dispatch_jobs + SET status = ?, leased_at = ?, updated_at = ? + WHERE id = ? AND node_id = ? AND status = ?", + )) + .bind(STATUS_LEASED) + .bind(&now) + .bind(&now) + .bind(&row.id) + .bind(node_id.to_string()) + .bind(STATUS_ACCEPTED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + if updated.rows_affected() != 1 { + continue; + } + + let id = Uuid::parse_str(&row.id).map_err(|error| { + AppError::infrastructure(format!("stored dispatch id is invalid: {error}")) + })?; + let request_id = Uuid::parse_str(&row.request_id).map_err(|error| { + AppError::infrastructure(format!("stored dispatch request id is invalid: {error}")) + })?; + let request = serde_json::from_str::(&row.request_json).map_err(|error| { + AppError::infrastructure(format!("stored dispatch request is invalid: {error}")) + })?; + + return Ok(Some(DispatchJobLeaseResponse { + id, + request_id, + model: row.model, + request, + deadline_at: row.deadline_at, + })); + } + + Ok(None) +} + +pub async fn complete_job( + database: &AnyPool, + database_backend: DatabaseBackend, + node_id: Uuid, + job_id: Uuid, + payload: DispatchJobCompleteRequest, +) -> Result { + let status = match payload.status.trim() { + STATUS_SUCCEEDED => STATUS_SUCCEEDED, + STATUS_FAILED => STATUS_FAILED, + _ => { + return Err(AppError::invalid_config( + "dispatch_job.status", + "status must be succeeded or failed", + )); + } + }; + + if status == STATUS_SUCCEEDED && payload.response.is_none() { + return Err(AppError::invalid_config( + "dispatch_job.response", + "response is required when status is succeeded", + )); + } + + let response_json = payload + .response + .as_ref() + .map(serde_json::to_string) + .transpose() + .map_err(|error| { + AppError::infrastructure(format!("dispatch response encode failed: {error}")) + })?; + let now = unix_timestamp_string(); + let error_message = payload + .error_message + .map(|value| value.trim().to_owned()) + .filter(|value| !value.is_empty()); + + let updated = query(&prepare_sql( + database_backend, + "UPDATE dispatch_jobs + SET status = ?, + response_json = ?, + error_code = ?, + error_message = ?, + completed_at = ?, + updated_at = ? + WHERE id = ? AND node_id = ? AND status = ?", + )) + .bind(status) + .bind(response_json) + .bind(payload.error_code) + .bind(error_message) + .bind(&now) + .bind(&now) + .bind(job_id.to_string()) + .bind(node_id.to_string()) + .bind(STATUS_LEASED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + if updated.rows_affected() != 1 { + return Err(AppError::NotFound( + "leased dispatch job not found".to_owned(), + )); + } + + Ok(status.to_owned()) +} + +pub async fn wait_for_dispatch_result( + database: &AnyPool, + database_backend: DatabaseBackend, + job_id: Uuid, + timeout_duration: Duration, +) -> Result { + let started_at = Instant::now(); + + loop { + if let Some(result) = fetch_terminal_result(database, database_backend, job_id).await? { + return Ok(result); + } + + if started_at.elapsed() >= timeout_duration { + return Ok(DispatchJobResult::TimedOut); + } + + sleep(Duration::from_millis(100)).await; + } +} + +async fn fetch_terminal_result( + database: &AnyPool, + database_backend: DatabaseBackend, + job_id: Uuid, +) -> Result, AppError> { + let row = + query_as::<_, (String, Option, Option, Option)>(&prepare_sql( + database_backend, + "SELECT status, response_json, error_code, error_message + FROM dispatch_jobs + WHERE id = ?", + )) + .bind(job_id.to_string()) + .fetch_optional(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + let Some((status, response_json, error_code, error_message)) = row else { + return Err(AppError::NotFound("dispatch job not found".to_owned())); + }; + + match status.as_str() { + STATUS_SUCCEEDED => { + let raw = response_json.ok_or_else(|| { + AppError::infrastructure("dispatch job succeeded without response_json") + })?; + let response = serde_json::from_str::(&raw).map_err(|error| { + AppError::infrastructure(format!("stored dispatch response is invalid: {error}")) + })?; + Ok(Some(DispatchJobResult::Succeeded(response))) + } + STATUS_FAILED => Ok(Some(DispatchJobResult::Failed { + error_code, + error_message: error_message.unwrap_or_else(|| "daemon job failed".to_owned()), + })), + STATUS_TIMED_OUT => Ok(Some(DispatchJobResult::TimedOut)), + _ => Ok(None), + } +} + +async fn mark_job_timed_out( + database: &AnyPool, + database_backend: DatabaseBackend, + job_id: Uuid, +) -> Result<(), AppError> { + let now = unix_timestamp_string(); + query(&prepare_sql( + database_backend, + "UPDATE dispatch_jobs + SET status = ?, error_code = ?, error_message = ?, completed_at = ?, updated_at = ? + WHERE id = ? AND status IN (?, ?)", + )) + .bind(STATUS_TIMED_OUT) + .bind("timeout") + .bind("daemon dispatch timed out") + .bind(&now) + .bind(&now) + .bind(job_id.to_string()) + .bind(STATUS_ACCEPTED) + .bind(STATUS_LEASED) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + Ok(()) +} + +async fn mark_expired_jobs_timed_out( + database: &AnyPool, + database_backend: DatabaseBackend, +) -> Result<(), AppError> { + let now = unix_timestamp_string(); + query(&prepare_sql( + database_backend, + "UPDATE dispatch_jobs + SET status = ?, error_code = ?, error_message = ?, completed_at = ?, updated_at = ? + WHERE status IN (?, ?) AND deadline_at < ?", + )) + .bind(STATUS_TIMED_OUT) + .bind("timeout") + .bind("daemon dispatch timed out") + .bind(&now) + .bind(&now) + .bind(STATUS_ACCEPTED) + .bind(STATUS_LEASED) + .bind(&now) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(error.to_string()))?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::daemon_nodes::{HEALTH_STATUS_HEALTHY, STATUS_ACTIVE}; + use crate::storage; + use sqlx::query; + + async fn sqlite_test_database() -> AnyPool { + storage::connect_and_migrate("sqlite::memory:", true, 1) + .await + .expect("create sqlite test database") + } + + async fn seed_node(database: &AnyPool) -> Uuid { + let node_id = Uuid::now_v7(); + let user_id = Uuid::now_v7(); + let now = unix_timestamp_string(); + query("INSERT INTO users (id, email, password_hash, role, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)") + .bind(user_id.to_string()) + .bind(format!("{user_id}@example.test")) + .bind("hash") + .bind("admin") + .bind(&now) + .bind(&now) + .execute(database) + .await + .expect("insert user"); + query( + "INSERT INTO daemon_nodes ( + id, host_user_id, token_hash, status, revoked, disabled, last_seen_at, + provider_family, model_ids_json, max_concurrency, health_status, created_at, updated_at + ) VALUES (?, ?, ?, ?, 0, 0, ?, ?, ?, 1, ?, ?, ?)", + ) + .bind(node_id.to_string()) + .bind(user_id.to_string()) + .bind(format!("hash-{node_id}")) + .bind(STATUS_ACTIVE) + .bind(&now) + .bind("openai-compatible") + .bind(r#"["llama3.1"]"#) + .bind(HEALTH_STATUS_HEALTHY) + .bind(&now) + .bind(&now) + .execute(database) + .await + .expect("insert daemon node"); + node_id + } + + fn test_request() -> ChatRequest { + ChatRequest { + model: "llama3.1".to_owned(), + messages: Vec::new(), + stream: false, + max_tokens: Some(8), + } + } + + #[tokio::test] + async fn daemon_can_lease_and_complete_dispatch_job() { + let database = sqlite_test_database().await; + let node_id = seed_node(&database).await; + let request_id = Uuid::now_v7(); + let job_id = create_dispatch_job( + &database, + DatabaseBackend::Sqlite, + DispatchJobInput { + request_id, + node_id, + user_id: None, + api_key_id: None, + model: "llama3.1".to_owned(), + request: test_request(), + timeout_seconds: 5, + }, + ) + .await + .expect("create dispatch job"); + + let leased = lease_next_job_for_node(&database, DatabaseBackend::Sqlite, node_id) + .await + .expect("lease job") + .expect("job should exist"); + assert_eq!(leased.id, job_id); + assert_eq!(leased.request_id, request_id); + + complete_job( + &database, + DatabaseBackend::Sqlite, + node_id, + job_id, + DispatchJobCompleteRequest { + status: STATUS_SUCCEEDED.to_owned(), + response: Some(ChatResponse { + provider: "mizan-daemon".to_owned(), + model: "llama3.1".to_owned(), + content: "pong".to_owned(), + usage: None, + }), + error_code: None, + error_message: None, + }, + ) + .await + .expect("complete job"); + + match fetch_terminal_result(&database, DatabaseBackend::Sqlite, job_id) + .await + .expect("fetch result") + .expect("terminal result") + { + DispatchJobResult::Succeeded(response) => assert_eq!(response.content, "pong"), + other => panic!("unexpected result: {other:?}"), + } + } + + #[tokio::test] + async fn expired_jobs_are_marked_timed_out_before_lease() { + let database = sqlite_test_database().await; + let node_id = seed_node(&database).await; + create_dispatch_job( + &database, + DatabaseBackend::Sqlite, + DispatchJobInput { + request_id: Uuid::now_v7(), + node_id, + user_id: None, + api_key_id: None, + model: "llama3.1".to_owned(), + request: test_request(), + timeout_seconds: 1, + }, + ) + .await + .expect("create dispatch job"); + + sleep(Duration::from_secs(2)).await; + let leased = lease_next_job_for_node(&database, DatabaseBackend::Sqlite, node_id) + .await + .expect("lease job"); + + assert!(leased.is_none()); + } +} diff --git a/crates/mizan-api/src/gateway.rs b/crates/mizan-api/src/gateway.rs index 8fa0d5c..311f8b3 100644 --- a/crates/mizan-api/src/gateway.rs +++ b/crates/mizan-api/src/gateway.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::time::Instant; +use crate::dispatch::{DispatchJobInput, DispatchJobResult, dispatch_to_daemon_node}; use crate::logging::{RequestLogInput, error_code_from_app_error, record_request_log}; use axum::{ Extension, Json, @@ -34,6 +35,7 @@ use uuid::Uuid; use crate::AppState; use crate::auth::ApiKeyIdentity; use crate::billing; +use crate::daemon_nodes::select_eligible_daemon_node; use crate::metrics::{GatewayObservation, MetricsRegistry}; use crate::utils::{decrypt_provider_api_key, from_app_error, now_utc_epoch_seconds, prepare_sql}; @@ -247,6 +249,20 @@ async fn chat_completions_impl( .await { Ok(route) => route, + Err(error) if is_model_route_not_found(&error) => { + return handle_daemon_chat_completion( + state, + identity, + context, + unresolved_completion_log, + request_started_at, + request_id, + public_model.to_owned(), + payload, + spec, + ) + .await; + } Err(error) => { let status = app_error_status_code(&error); let error_code = error_code_from_app_error(&error); @@ -674,6 +690,371 @@ async fn chat_completions_impl( Ok(response) } +#[allow(clippy::too_many_arguments)] +async fn handle_daemon_chat_completion( + state: AppState, + identity: ApiKeyIdentity, + mut context: RequestContext, + completion_log: GatewayCompletionLog, + request_started_at: Instant, + request_id: Uuid, + public_model: String, + payload: ChatCompletionsRequest, + spec: GatewayRequestSpec, +) -> GatewayHttpResult { + if payload.stream { + let app_error = AppError::invalid_config( + spec.stream_field, + "stream is not supported for daemon-backed models yet", + ); + let status = app_error_status_code(&app_error); + let error_code = error_code_from_app_error(&app_error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + return Ok(build_error_response( + &context, + status, + Json(ErrorEnvelope::from(&app_error)), + )); + } + + let node = match select_eligible_daemon_node( + &state.database, + state.database_backend(), + &public_model, + i64::from(state.config.daemon_stale_seconds), + ) + .instrument(info_span!( + "daemon_node_selection", + request_id = %request_id, + trace_id = %context.trace_id, + model = %public_model, + )) + .await + { + Ok(Some(node)) => node, + Ok(None) => { + let app_error = + AppError::invalid_config(spec.model_field, "model not found or disabled"); + let status = app_error_status_code(&app_error); + let error_code = error_code_from_app_error(&app_error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + return Ok(build_error_response( + &context, + status, + Json(ErrorEnvelope::from(&app_error)), + )); + } + Err(error) => { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + let (_, body) = from_app_error(error); + return Ok(build_error_response(&context, status, body)); + } + }; + + context = RequestContextBuilder::default() + .user_id(identity.user_id) + .api_key_id(identity.api_key_id) + .provider(node.provider_family.clone()) + .request_id(request_id) + .trace_id(context.trace_id) + .route(public_model.clone()) + .method("POST") + .path(spec.path) + .model(node.model_id.clone()) + .streaming(false) + .build(); + let completion_log = GatewayCompletionLog::new( + &state.database, + state.database_backend(), + &context, + request_started_at, + ) + .with_route_alias(public_model.clone()) + .with_provider_alias("mizan-daemon"); + + let effective_max_tokens = + match resolve_effective_max_tokens(payload.max_tokens, None, spec.max_tokens_field) { + Ok(max_tokens) => max_tokens, + Err(error) => { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + let (_, body) = from_app_error(error); + return Ok(build_error_response(&context, status, body)); + } + }; + + let upstream_request = ChatRequest { + model: node.model_id.clone(), + messages: payload.messages.clone(), + stream: false, + max_tokens: effective_max_tokens, + }; + let request_messages = upstream_request.messages.clone(); + let route_price = RoutePrice { + input_microcredits_per_1m_tokens: 0, + output_microcredits_per_1m_tokens: 0, + }; + let admission_usage = estimate_admission_usage(&request_messages, effective_max_tokens); + let prompt_only_usage = billing::estimate_usage(&request_messages, ""); + + if let Err(error) = billing::ensure_sufficient_credit( + &state.database, + state.database_backend(), + identity.user_id, + admission_usage, + route_price, + ) + .await + { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + let (status, body) = from_app_error(error); + observe_gateway_metrics( + &state.metrics, + &context, + &public_model, + prompt_only_usage, + status, + request_started_at.elapsed().as_millis() as u64, + route_price, + ); + return Ok(build_error_response(&context, status, body)); + } + + let limit_lease = match acquire_runtime_limits( + &state, + vec![ + LimitScope::ApiKey(identity.api_key_id), + LimitScope::User(identity.user_id), + ], + admission_usage.total_tokens, + ) + .await + { + Ok(lease) => lease, + Err(error) => { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion(&completion_log, status, Some(&error_code)).await; + let (_, body) = from_app_error(error); + observe_gateway_metrics( + &state.metrics, + &context, + &public_model, + admission_usage, + status, + request_started_at.elapsed().as_millis() as u64, + route_price, + ); + return Ok(build_error_response(&context, status, body)); + } + }; + + let dispatch_result = dispatch_to_daemon_node( + &state.database, + state.database_backend(), + &node, + DispatchJobInput { + request_id, + node_id: node.id, + user_id: Some(identity.user_id), + api_key_id: Some(identity.api_key_id), + model: public_model.clone(), + request: upstream_request, + timeout_seconds: state.config.limit_lease_seconds.clamp(1, 30), + }, + ) + .instrument(info_span!( + "daemon_dispatch", + request_id = %context.request_id, + trace_id = %context.trace_id, + route = %public_model, + node_id = %node.id, + )) + .await; + + let response = match dispatch_result { + Ok(DispatchJobResult::Succeeded(upstream_response)) => { + let usage = upstream_response.usage.unwrap_or_else(|| { + billing::estimate_usage(&request_messages, &upstream_response.content) + }); + let latency_ms = request_started_at.elapsed().as_millis() as u64; + if let Err(error) = billing::record_usage( + &state.database, + state.database_backend(), + billing::BillingInput { + request_id, + user_id: identity.user_id, + api_key_id: Some(identity.api_key_id), + provider_id: None, + route_id: None, + model: public_model.clone(), + usage, + status_code: StatusCode::OK.as_u16(), + latency_ms, + route_price, + }, + ) + .await + { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + let (_, body) = from_app_error(error); + observe_gateway_metrics( + &state.metrics, + &context, + &public_model, + usage, + status, + latency_ms, + route_price, + ); + record_gateway_request_completion( + &completion_log, + status, + Some(error_code.as_str()), + ) + .await; + release_limit_lease(Some(limit_lease)); + return Ok(build_error_response(&context, status, body)); + } + + observe_gateway_metrics( + &state.metrics, + &context, + &public_model, + usage, + StatusCode::OK, + latency_ms, + route_price, + ); + record_gateway_request_completion(&completion_log, StatusCode::OK, None).await; + json_chat_completion_response( + &format!("chatcmpl-{}", Uuid::now_v7()), + public_model.clone(), + upstream_response, + &context, + ) + } + Ok(DispatchJobResult::Failed { + error_code, + error_message, + }) => { + let error = AppError::provider(format!( + "daemon dispatch failed code={} message={}", + error_code.unwrap_or_else(|| "daemon_error".to_owned()), + redact_for_logs(error_message) + )); + daemon_error_response( + &state, + &context, + &completion_log, + request_started_at, + request_id, + identity.clone(), + &public_model, + &request_messages, + route_price, + error, + ) + .await + } + Ok(DispatchJobResult::TimedOut) => { + let error = AppError::provider("daemon dispatch timed out"); + daemon_error_response( + &state, + &context, + &completion_log, + request_started_at, + request_id, + identity.clone(), + &public_model, + &request_messages, + route_price, + error, + ) + .await + } + Err(error) => { + let normalized_error = normalize_provider_error(error, &context, public_model.clone()); + daemon_error_response( + &state, + &context, + &completion_log, + request_started_at, + request_id, + identity.clone(), + &public_model, + &request_messages, + route_price, + normalized_error, + ) + .await + } + }; + + release_limit_lease(Some(limit_lease)); + Ok(response) +} + +#[allow(clippy::too_many_arguments)] +async fn daemon_error_response( + state: &AppState, + context: &RequestContext, + completion_log: &GatewayCompletionLog, + request_started_at: Instant, + request_id: Uuid, + identity: ApiKeyIdentity, + public_model: &str, + request_messages: &[ChatMessage], + route_price: RoutePrice, + error: AppError, +) -> Response { + let error_code = error_code_from_app_error(&error); + let (status, body) = from_app_error(error); + let latency_ms = request_started_at.elapsed().as_millis() as u64; + let usage = billing::estimate_usage(request_messages, ""); + if let Err(error) = billing::record_usage( + &state.database, + state.database_backend(), + billing::BillingInput { + request_id, + user_id: identity.user_id, + api_key_id: Some(identity.api_key_id), + provider_id: None, + route_id: None, + model: public_model.to_owned(), + usage, + status_code: status.as_u16(), + latency_ms, + route_price, + }, + ) + .await + { + warn!( + request_id = %request_id, + error = %error, + "failed to persist daemon dispatch error usage" + ); + } + observe_gateway_metrics( + &state.metrics, + context, + public_model, + usage, + status, + latency_ms, + route_price, + ); + record_gateway_request_completion(completion_log, status, Some(error_code.as_str())).await; + build_error_response(context, status, body) +} + #[derive(Debug, Clone)] struct GatewayCompletionLog { database: AnyPool, @@ -940,6 +1321,15 @@ fn app_error_status_code(error: &AppError) -> StatusCode { } } +fn is_model_route_not_found(error: &AppError) -> bool { + matches!( + error, + AppError::InvalidConfig { key, message } + if (*key == CHAT_COMPLETIONS_MODEL_FIELD || *key == RESPONSES_MODEL_FIELD) + && message == "model not found or disabled" + ) +} + fn json_chat_completion_response( completion_id: &str, model: String, diff --git a/crates/mizan-api/src/lib.rs b/crates/mizan-api/src/lib.rs index 0fef937..1b2e355 100644 --- a/crates/mizan-api/src/lib.rs +++ b/crates/mizan-api/src/lib.rs @@ -19,6 +19,7 @@ use tracing::{info, warn}; mod auth; mod billing; mod daemon_nodes; +mod dispatch; mod gateway; mod logging; mod metrics; @@ -220,6 +221,14 @@ pub fn router(state: AppState) -> Router { .route("/daemon/register", post(daemon_nodes::register_daemon_node)) .route("/daemon/heartbeat", post(daemon_nodes::daemon_heartbeat)) .route("/daemon/ping", get(daemon_nodes::daemon_ping)) + .route( + "/daemon/jobs/lease", + post(dispatch::lease_next_dispatch_job), + ) + .route( + "/daemon/jobs/{id}/complete", + post(dispatch::complete_dispatch_job), + ) .route_layer(from_fn_with_state( state.clone(), daemon_nodes::daemon_node_auth, diff --git a/crates/mizan-api/src/storage.rs b/crates/mizan-api/src/storage.rs index dcef482..89b5698 100644 --- a/crates/mizan-api/src/storage.rs +++ b/crates/mizan-api/src/storage.rs @@ -140,6 +140,7 @@ mod tests { "request_logs", "admin_audit_logs", "daemon_nodes", + "dispatch_jobs", ]; let expected_indexes = [ @@ -166,6 +167,9 @@ mod tests { "idx_daemon_nodes_token_hash", "idx_daemon_nodes_disabled", "idx_daemon_nodes_health_status", + "idx_dispatch_jobs_node_status", + "idx_dispatch_jobs_request_id", + "idx_dispatch_jobs_status_deadline", ]; for expected in expected_tables { diff --git a/crates/mizan-daemon/src/main.rs b/crates/mizan-daemon/src/main.rs index 2704413..7b8908c 100644 --- a/crates/mizan-daemon/src/main.rs +++ b/crates/mizan-daemon/src/main.rs @@ -1,7 +1,7 @@ use std::{net::SocketAddr, path::PathBuf, process, time::Duration}; use clap::{Args, CommandFactory, Parser, Subcommand}; -use mizan_core::{AppError, AppResult, init_tracing}; +use mizan_core::{AppError, AppResult, init_tracing, redact_for_logs}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::{ @@ -99,6 +99,9 @@ async fn run(args: ConfigArgs) -> AppResult<()> { warn!(error = %error, "daemon heartbeat failed"); } } + if let Err(error) = lease_and_run_one_job(&client, &token, &config).await { + warn!(error = %error, "daemon dispatch job processing failed"); + } sleep(Duration::from_secs(u64::from( config.heartbeat_interval_seconds.max(1), ))) @@ -106,6 +109,107 @@ async fn run(args: ConfigArgs) -> AppResult<()> { } } +async fn lease_and_run_one_job( + client: &reqwest::Client, + token: &str, + config: &DaemonConfig, +) -> AppResult<()> { + let lease_url = control_plane_endpoint(&config.control_plane_url, "/daemon/jobs/lease"); + let lease_response = client + .post(&lease_url) + .bearer_auth(token) + .send() + .await + .map_err(|error| AppError::infrastructure(format!("daemon job lease failed: {error}")))?; + + let status = lease_response.status(); + if !status.is_success() { + return Err(AppError::infrastructure(format!( + "daemon job lease rejected by control plane with status {status}" + ))); + } + + let lease: DispatchJobLeaseEnvelope = lease_response.json().await.map_err(|error| { + AppError::infrastructure(format!("invalid daemon job lease response: {error}")) + })?; + let Some(job) = lease.data else { + return Ok(()); + }; + + let completion = match call_local_provider(client, config, &job.request).await { + Ok(response) => DispatchJobCompleteRequest { + status: "succeeded".to_owned(), + response: Some(response), + error_code: None, + error_message: None, + }, + Err(error) => DispatchJobCompleteRequest { + status: "failed".to_owned(), + response: None, + error_code: Some("provider_error".to_owned()), + error_message: Some(redact_for_logs(error.to_string())), + }, + }; + + let complete_url = control_plane_endpoint( + &config.control_plane_url, + &format!("/daemon/jobs/{}/complete", job.id), + ); + let complete_response = client + .post(&complete_url) + .bearer_auth(token) + .json(&completion) + .send() + .await + .map_err(|error| { + AppError::infrastructure(format!("daemon job completion submit failed: {error}")) + })?; + + let status = complete_response.status(); + if !status.is_success() { + return Err(AppError::infrastructure(format!( + "daemon job completion rejected by control plane with status {status}" + ))); + } + + info!( + job_id = %job.id, + request_id = %job.request_id, + model = %job.model, + "daemon dispatch job completed" + ); + Ok(()) +} + +async fn call_local_provider( + client: &reqwest::Client, + config: &DaemonConfig, + request: &ChatRequest, +) -> AppResult { + let url = control_plane_endpoint(&config.local_provider_url, "/chat/completions"); + let response = client + .post(url) + .json(request) + .send() + .await + .map_err(|error| { + AppError::infrastructure(format!("local provider request failed: {error}")) + })?; + + let status = response.status(); + let body = response.text().await.map_err(|error| { + AppError::infrastructure(format!("local provider response read failed: {error}")) + })?; + if !status.is_success() { + return Err(AppError::provider(format!( + "local provider returned status={status} body={}", + redact_for_logs(body) + ))); + } + + parse_chat_completion_response(&body, request.model.clone()) +} + async fn register(args: ConfigArgs) -> AppResult<()> { let config = DaemonConfig::load(&args.config)?; init_tracing("mizan_daemon=info,mizan_core=info")?; @@ -404,6 +508,101 @@ struct DaemonHeartbeatResponse { last_seen_at: String, } +#[derive(Debug, Deserialize)] +struct DispatchJobLeaseEnvelope { + data: Option, +} + +#[derive(Debug, Deserialize)] +struct DispatchJobLeaseResponse { + id: String, + request_id: String, + model: String, + request: ChatRequest, +} + +#[derive(Debug, Serialize)] +struct DispatchJobCompleteRequest { + status: String, + response: Option, + error_code: Option, + error_message: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChatRequest { + model: String, + messages: Vec, + #[serde(default)] + stream: bool, + #[serde(default)] + max_tokens: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChatMessage { + role: String, + content: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChatResponse { + provider: String, + model: String, + content: String, + usage: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct TokenUsage { + prompt_tokens: u64, + completion_tokens: u64, + total_tokens: u64, + #[serde(default)] + estimated: bool, +} + +#[derive(Debug, Deserialize)] +struct OpenAiChatCompletionResponse { + model: Option, + choices: Vec, + usage: Option, +} + +#[derive(Debug, Deserialize)] +struct OpenAiChoice { + message: Option, +} + +#[derive(Debug, Deserialize)] +struct OpenAiMessage { + content: Option, +} + +fn parse_chat_completion_response( + raw_body: &str, + requested_model: String, +) -> AppResult { + let response: OpenAiChatCompletionResponse = serde_json::from_str(raw_body) + .map_err(|error| AppError::provider(format!("invalid local provider response: {error}")))?; + let Some(first_choice) = response.choices.into_iter().next() else { + return Err(AppError::provider( + "local provider response returned no choices", + )); + }; + let content = first_choice + .message + .and_then(|message| message.content) + .unwrap_or_default(); + + Ok(ChatResponse { + provider: "mizan-daemon".to_owned(), + model: response.model.unwrap_or(requested_model), + content, + usage: response.usage, + }) +} + fn required_field(value: Option, key: &'static str) -> AppResult { value.ok_or_else(|| AppError::invalid_config(key, "is required")) } diff --git a/migrations/0006_dispatch_jobs.sql b/migrations/0006_dispatch_jobs.sql new file mode 100644 index 0000000..4ef260f --- /dev/null +++ b/migrations/0006_dispatch_jobs.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS dispatch_jobs ( + id TEXT PRIMARY KEY, + request_id TEXT NOT NULL, + node_id TEXT NOT NULL, + user_id TEXT, + api_key_id TEXT, + model TEXT NOT NULL, + status TEXT NOT NULL, + request_json TEXT NOT NULL, + response_json TEXT, + error_code TEXT, + error_message TEXT, + leased_at TEXT, + completed_at TEXT, + deadline_at TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY (node_id) REFERENCES daemon_nodes (id) ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE SET NULL, + FOREIGN KEY (api_key_id) REFERENCES api_keys (id) ON DELETE SET NULL +); + +CREATE INDEX IF NOT EXISTS idx_dispatch_jobs_node_status ON dispatch_jobs (node_id, status, created_at); +CREATE INDEX IF NOT EXISTS idx_dispatch_jobs_request_id ON dispatch_jobs (request_id); +CREATE INDEX IF NOT EXISTS idx_dispatch_jobs_status_deadline ON dispatch_jobs (status, deadline_at);