-
Notifications
You must be signed in to change notification settings - Fork 0
PEN-110: add user API key onboarding smoke #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,8 @@ use axum::http::StatusCode; | |
| use axum::middleware::Next; | ||
| use axum::response::Response; | ||
| use mizan_core::{AppError, ErrorEnvelope}; | ||
| use std::collections::HashSet; | ||
|
|
||
| use serde::{Deserialize, Serialize}; | ||
| use serde_json::{Value, json}; | ||
| use sqlx::{query, query_as}; | ||
|
|
@@ -16,7 +18,7 @@ use crate::auth::ApiKeyIdentity; | |
| use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; | ||
| use crate::utils::{ | ||
| encrypt_provider_api_key, from_app_error, is_enabled, is_unique_constraint_error, | ||
| parse_timestamp, prepare_sql, unix_timestamp_string, | ||
| now_utc_epoch_seconds, parse_timestamp, prepare_sql, unix_timestamp_string, | ||
| }; | ||
|
|
||
| type ProviderHttpResult<T> = Result<T, (StatusCode, Json<ErrorEnvelope>)>; | ||
|
|
@@ -29,6 +31,10 @@ const AUDIT_ENTITY_MODEL_ROUTE: &str = "model_route"; | |
| const AUTH_MODE_API_KEY: &str = "api_key"; | ||
| const AUTH_MODE_SUBSCRIPTION_CLI: &str = "subscription_cli"; | ||
| const AUTH_MODE_BROWSER_SESSION: &str = "browser_session"; | ||
| const DAEMON_STATUS_ACTIVE: &str = "active"; | ||
| const DAEMON_HEALTHY_STATUS: &str = "healthy"; | ||
| const DAEMON_OWNED_BY: &str = "mizan-daemon"; | ||
| const DAEMON_ROUTE_ID: &str = "daemon"; | ||
|
|
||
| #[derive(Debug, Serialize)] | ||
| pub struct ProviderConnectionResponse { | ||
|
|
@@ -268,12 +274,82 @@ pub async fn list_models( | |
| }); | ||
| } | ||
|
|
||
| append_daemon_public_models(&state, &mut data).await?; | ||
| data.sort_by(|left, right| left.id.cmp(&right.id)); | ||
|
|
||
| Ok(Json(PublicModelsResponse { | ||
| object: "list", | ||
| data, | ||
| })) | ||
| } | ||
|
|
||
| async fn append_daemon_public_models( | ||
| state: &AppState, | ||
| data: &mut Vec<PublicModelResponse>, | ||
| ) -> ProviderHttpResult<()> { | ||
| let mut seen_models = data | ||
| .iter() | ||
| .map(|model| model.id.clone()) | ||
| .collect::<HashSet<_>>(); | ||
| let cutoff = now_utc_epoch_seconds() | ||
| .saturating_sub(i64::from(state.config.daemon_stale_seconds.max(1))) | ||
| .to_string(); | ||
|
|
||
| let rows = query_as::<_, (String, String, String)>(&prepare_sql( | ||
| state.database_backend(), | ||
| "SELECT provider_family, model_ids_json, last_seen_at | ||
| FROM daemon_nodes | ||
| WHERE status = ? | ||
| AND revoked = 0 | ||
| AND disabled = 0 | ||
| AND health_status = ? | ||
| AND provider_family IS NOT NULL | ||
| AND model_ids_json != ? | ||
| AND max_concurrency IS NOT NULL | ||
| AND max_concurrency > 0 | ||
| AND last_seen_at IS NOT NULL | ||
| AND last_seen_at >= ? | ||
| ORDER BY last_seen_at DESC, created_at ASC", | ||
| )) | ||
| .bind(DAEMON_STATUS_ACTIVE) | ||
| .bind(DAEMON_HEALTHY_STATUS) | ||
| .bind("[]") | ||
| .bind(cutoff) | ||
| .fetch_all(&state.database) | ||
| .await | ||
| .map_err(|error| from_app_error(AppError::infrastructure(error.to_string())))?; | ||
|
|
||
| for (provider_family, model_ids_json, last_seen_at) in rows { | ||
| let created = parse_timestamp(&last_seen_at).map_err(from_app_error)?; | ||
| let model_ids = parse_daemon_model_ids(&model_ids_json).map_err(from_app_error)?; | ||
|
|
||
| for model_id in model_ids { | ||
| if !seen_models.insert(model_id.clone()) { | ||
| continue; | ||
| } | ||
|
|
||
| data.push(PublicModelResponse { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a healthy daemon advertises a model that has no matching Useful? React with 👍 / 👎. |
||
| id: model_id.clone(), | ||
| object: "model", | ||
| created, | ||
| owned_by: DAEMON_OWNED_BY.to_owned(), | ||
| provider_type: provider_family.clone(), | ||
| upstream_model: model_id, | ||
| route_id: DAEMON_ROUTE_ID.to_owned(), | ||
| max_tokens: None, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn parse_daemon_model_ids(raw: &str) -> Result<Vec<String>, AppError> { | ||
| serde_json::from_str::<Vec<String>>(raw).map_err(|error| { | ||
| AppError::infrastructure(format!("daemon_node.model_ids_json is invalid: {error}")) | ||
| }) | ||
| } | ||
|
|
||
| pub async fn list_provider_connections( | ||
| State(state): State<AppState>, | ||
| ) -> ProviderHttpResult<Json<ProviderConnectionListResponse>> { | ||
|
|
@@ -784,6 +860,45 @@ fn map_duplicate_model_error(error: String) -> AppError { | |
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::{metrics::MetricsRegistry, storage}; | ||
| use mizan_core::{AppConfig, DatabaseBackend}; | ||
| use mizan_gateway::Gateway; | ||
| use redis::Client as RedisClient; | ||
|
|
||
| async fn test_state() -> AppState { | ||
| let database = storage::connect_and_migrate("sqlite::memory:", true, 1) | ||
| .await | ||
| .expect("create sqlite test database"); | ||
| let redis = RedisClient::open("redis://127.0.0.1:6379/") | ||
| .expect("create redis client for state"); | ||
|
|
||
| AppState { | ||
| config: AppConfig { | ||
| http_addr: "127.0.0.1:0".parse().expect("parse test addr"), | ||
| database_backend: DatabaseBackend::Sqlite, | ||
| database_url: "sqlite::memory:".to_owned(), | ||
| database_max_connections: 1, | ||
| run_migrations: true, | ||
| redis_url: "redis://127.0.0.1:6379/".to_owned(), | ||
| limit_rpm: 0, | ||
| limit_tpm: 0, | ||
| limit_concurrency: 0, | ||
| limit_window_seconds: 60, | ||
| limit_lease_seconds: 120, | ||
| log_level: "off".to_owned(), | ||
| admin_seed_email: None, | ||
| admin_seed_password: None, | ||
| admin_seed_role: "admin".to_owned(), | ||
| provider_secret_key: Some("test-provider-secret".to_owned()), | ||
| log_raw_request_bodies: false, | ||
| daemon_stale_seconds: 90, | ||
| }, | ||
| gateway: Gateway::new(), | ||
| database, | ||
| redis, | ||
| metrics: MetricsRegistry::default(), | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn normalize_auth_mode_defaults_to_api_key() { | ||
|
|
@@ -818,4 +933,172 @@ mod tests { | |
|
|
||
| assert!(normalized.contains("credential_ref")); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn list_models_includes_only_safe_fresh_daemon_models() { | ||
| let state = test_state().await; | ||
| let now = unix_timestamp_string(); | ||
| let stale = (now_utc_epoch_seconds() - 300).to_string(); | ||
| let provider_id = Uuid::now_v7(); | ||
| let route_id = Uuid::now_v7(); | ||
|
|
||
| query(&prepare_sql( | ||
| DatabaseBackend::Sqlite, | ||
| "INSERT INTO provider_connections ( | ||
| id, name, provider_type, auth_mode, base_url, api_key_encrypted, enabled, created_at, updated_at | ||
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
| )) | ||
| .bind(provider_id.to_string()) | ||
| .bind("route-provider") | ||
| .bind("openai-compatible") | ||
| .bind(AUTH_MODE_API_KEY) | ||
| .bind("http://127.0.0.1:18182") | ||
| .bind("encrypted") | ||
| .bind(1) | ||
| .bind(&now) | ||
| .bind(&now) | ||
| .execute(&state.database) | ||
| .await | ||
| .expect("insert provider connection"); | ||
|
|
||
| query(&prepare_sql( | ||
| DatabaseBackend::Sqlite, | ||
| "INSERT INTO model_routes ( | ||
| id, | ||
| provider_connection_id, | ||
| public_model, | ||
| upstream_model, | ||
| max_tokens, | ||
| pricing_input_per_1m_tokens, | ||
| pricing_output_per_1m_tokens, | ||
| enabled, | ||
| created_at, | ||
| updated_at | ||
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
| )) | ||
| .bind(route_id.to_string()) | ||
| .bind(provider_id.to_string()) | ||
| .bind("routed-model") | ||
| .bind("upstream-routed-model") | ||
| .bind(4096_i64) | ||
| .bind(0_i64) | ||
| .bind(0_i64) | ||
| .bind(1) | ||
| .bind(&now) | ||
| .bind(&now) | ||
| .execute(&state.database) | ||
| .await | ||
| .expect("insert model route"); | ||
|
|
||
| insert_daemon_node( | ||
| &state, | ||
| "fresh", | ||
| &now, | ||
| 0, | ||
| DAEMON_HEALTHY_STATUS, | ||
| r#"["llama3.1","qwen2.5-coder"]"#, | ||
| ) | ||
| .await; | ||
| insert_daemon_node( | ||
| &state, | ||
| "stale", | ||
| &stale, | ||
| 0, | ||
| DAEMON_HEALTHY_STATUS, | ||
| r#"["stale-model"]"#, | ||
| ) | ||
| .await; | ||
| insert_daemon_node( | ||
| &state, | ||
| "disabled", | ||
| &now, | ||
| 1, | ||
| DAEMON_HEALTHY_STATUS, | ||
| r#"["disabled-model"]"#, | ||
| ) | ||
| .await; | ||
| insert_daemon_node( | ||
| &state, | ||
| "unhealthy", | ||
| &now, | ||
| 0, | ||
| "degraded", | ||
| r#"["unhealthy-model"]"#, | ||
| ) | ||
| .await; | ||
|
|
||
| let response = list_models(axum::extract::State(state)) | ||
| .await | ||
| .expect("list public models") | ||
| .0; | ||
|
|
||
| let ids = response | ||
| .data | ||
| .iter() | ||
| .map(|model| model.id.as_str()) | ||
| .collect::<Vec<_>>(); | ||
| assert_eq!(ids, vec!["llama3.1", "qwen2.5-coder", "routed-model"]); | ||
|
|
||
| let daemon_model = response | ||
| .data | ||
| .iter() | ||
| .find(|model| model.id == "llama3.1") | ||
| .expect("daemon model included"); | ||
| assert_eq!(daemon_model.owned_by, DAEMON_OWNED_BY); | ||
| assert_eq!(daemon_model.provider_type, "openai-compatible"); | ||
| assert_eq!(daemon_model.upstream_model, "llama3.1"); | ||
| assert_eq!(daemon_model.route_id, DAEMON_ROUTE_ID); | ||
| assert_eq!(daemon_model.max_tokens, None); | ||
| } | ||
|
|
||
| async fn insert_daemon_node( | ||
| state: &AppState, | ||
| label: &str, | ||
| last_seen_at: &str, | ||
| disabled: i64, | ||
| health_status: &str, | ||
| model_ids_json: &str, | ||
| ) { | ||
| let now = unix_timestamp_string(); | ||
| query(&prepare_sql( | ||
| DatabaseBackend::Sqlite, | ||
| "INSERT INTO daemon_nodes ( | ||
| id, | ||
| label, | ||
| token_hash, | ||
| status, | ||
| revoked, | ||
| last_seen_at, | ||
| created_at, | ||
| updated_at, | ||
| provider_family, | ||
| model_ids_json, | ||
| max_concurrency, | ||
| health_status, | ||
| disabled, | ||
| hostname, | ||
| labels_json, | ||
| capability_metadata_json | ||
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
| )) | ||
| .bind(Uuid::now_v7().to_string()) | ||
| .bind(label) | ||
| .bind(format!("hash-{label}")) | ||
| .bind(DAEMON_STATUS_ACTIVE) | ||
| .bind(0_i64) | ||
| .bind(last_seen_at) | ||
| .bind(&now) | ||
| .bind(&now) | ||
| .bind("openai-compatible") | ||
| .bind(model_ids_json) | ||
| .bind(2_i64) | ||
| .bind(health_status) | ||
| .bind(disabled) | ||
| .bind(format!("{label}.internal")) | ||
| .bind(r#"["private-label"]"#) | ||
| .bind(r#"{"local_provider_url":"http://127.0.0.1:11434/v1"}"#) | ||
| .execute(&state.database) | ||
| .await | ||
| .expect("insert daemon node"); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a single daemon node has corrupted or invalid data in the database (e.g., a malformed
model_ids_jsonor an invalidlast_seen_attimestamp), the entire/v1/modelsendpoint will fail with a500 Internal Server Error.To make the API more resilient, we should log a warning and skip the invalid daemon node instead of failing the entire request.