Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ MIZAN_REDIS_URL=redis://127.0.0.1:6379 scripts/limit-smoke.sh
REDIS_URL=redis://127.0.0.1:6379/ scripts/alpha-smoke.sh
```

Run the self-serve user onboarding smoke against a running API:

```sh
scripts/user-onboarding-smoke.sh
```

Copy-paste registration, login, API key, model listing, and gateway examples are
in [User API Key Onboarding](docs/USER_ONBOARDING.md).

## License

Apache-2.0. See [LICENSE](LICENSE).
Expand Down
285 changes: 284 additions & 1 deletion crates/mizan-api/src/providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::Response;
use mizan_core::{AppError, ErrorEnvelope};
use std::collections::HashSet;

use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::{query, query_as};
Expand All @@ -16,7 +18,7 @@ use crate::auth::ApiKeyIdentity;
use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload};
use crate::utils::{
encrypt_provider_api_key, from_app_error, is_enabled, is_unique_constraint_error,
parse_timestamp, prepare_sql, unix_timestamp_string,
now_utc_epoch_seconds, parse_timestamp, prepare_sql, unix_timestamp_string,
};

type ProviderHttpResult<T> = Result<T, (StatusCode, Json<ErrorEnvelope>)>;
Expand All @@ -29,6 +31,10 @@ const AUDIT_ENTITY_MODEL_ROUTE: &str = "model_route";
const AUTH_MODE_API_KEY: &str = "api_key";
const AUTH_MODE_SUBSCRIPTION_CLI: &str = "subscription_cli";
const AUTH_MODE_BROWSER_SESSION: &str = "browser_session";
const DAEMON_STATUS_ACTIVE: &str = "active";
const DAEMON_HEALTHY_STATUS: &str = "healthy";
const DAEMON_OWNED_BY: &str = "mizan-daemon";
const DAEMON_ROUTE_ID: &str = "daemon";

#[derive(Debug, Serialize)]
pub struct ProviderConnectionResponse {
Expand Down Expand Up @@ -268,12 +274,82 @@ pub async fn list_models(
});
}

append_daemon_public_models(&state, &mut data).await?;
data.sort_by(|left, right| left.id.cmp(&right.id));

Ok(Json(PublicModelsResponse {
object: "list",
data,
}))
}

async fn append_daemon_public_models(
state: &AppState,
data: &mut Vec<PublicModelResponse>,
) -> ProviderHttpResult<()> {
let mut seen_models = data
.iter()
.map(|model| model.id.clone())
.collect::<HashSet<_>>();
let cutoff = now_utc_epoch_seconds()
.saturating_sub(i64::from(state.config.daemon_stale_seconds.max(1)))
.to_string();

let rows = query_as::<_, (String, String, String)>(&prepare_sql(
state.database_backend(),
"SELECT provider_family, model_ids_json, last_seen_at
FROM daemon_nodes
WHERE status = ?
AND revoked = 0
AND disabled = 0
AND health_status = ?
AND provider_family IS NOT NULL
AND model_ids_json != ?
AND max_concurrency IS NOT NULL
AND max_concurrency > 0
AND last_seen_at IS NOT NULL
AND last_seen_at >= ?
ORDER BY last_seen_at DESC, created_at ASC",
))
.bind(DAEMON_STATUS_ACTIVE)
.bind(DAEMON_HEALTHY_STATUS)
.bind("[]")
.bind(cutoff)
.fetch_all(&state.database)
.await
.map_err(|error| from_app_error(AppError::infrastructure(error.to_string())))?;

for (provider_family, model_ids_json, last_seen_at) in rows {
let created = parse_timestamp(&last_seen_at).map_err(from_app_error)?;
let model_ids = parse_daemon_model_ids(&model_ids_json).map_err(from_app_error)?;

for model_id in model_ids {
Comment on lines +322 to +326

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If a single daemon node has corrupted or invalid data in the database (e.g., a malformed model_ids_json or an invalid last_seen_at timestamp), the entire /v1/models endpoint will fail with a 500 Internal Server Error.

To make the API more resilient, we should log a warning and skip the invalid daemon node instead of failing the entire request.

    for (provider_family, model_ids_json, last_seen_at) in rows {
        let created = match parse_timestamp(&last_seen_at) {
            Ok(t) => t,
            Err(err) => {
                warn!(error = %err, "Failed to parse last_seen_at for daemon node; skipping node");
                continue;
            }
        };
        let model_ids = match parse_daemon_model_ids(&model_ids_json) {
            Ok(ids) => ids,
            Err(err) => {
                warn!(error = %err, "Failed to parse model_ids_json for daemon node; skipping node");
                continue;
            }
        };

        for model_id in model_ids {

if !seen_models.insert(model_id.clone()) {
continue;
}

data.push(PublicModelResponse {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not advertise daemon-only models as callable

When a healthy daemon advertises a model that has no matching model_routes row, this adds it to /v1/models, but /v1/chat/completions still resolves requests only through model_routes joined to provider_connections (crates/mizan-api/src/gateway.rs:1359-1372). In a daemon-only setup, users and the new smoke script can pick a listed model and then get model not found or disabled, so /v1/models is advertising uncallable models until the gateway has daemon dispatch support or these entries are backed by real routes.

Useful? React with 👍 / 👎.

id: model_id.clone(),
object: "model",
created,
owned_by: DAEMON_OWNED_BY.to_owned(),
provider_type: provider_family.clone(),
upstream_model: model_id,
route_id: DAEMON_ROUTE_ID.to_owned(),
max_tokens: None,
});
}
}

Ok(())
}

fn parse_daemon_model_ids(raw: &str) -> Result<Vec<String>, AppError> {
serde_json::from_str::<Vec<String>>(raw).map_err(|error| {
AppError::infrastructure(format!("daemon_node.model_ids_json is invalid: {error}"))
})
}

pub async fn list_provider_connections(
State(state): State<AppState>,
) -> ProviderHttpResult<Json<ProviderConnectionListResponse>> {
Expand Down Expand Up @@ -784,6 +860,45 @@ fn map_duplicate_model_error(error: String) -> AppError {
#[cfg(test)]
mod tests {
use super::*;
use crate::{metrics::MetricsRegistry, storage};
use mizan_core::{AppConfig, DatabaseBackend};
use mizan_gateway::Gateway;
use redis::Client as RedisClient;

async fn test_state() -> AppState {
let database = storage::connect_and_migrate("sqlite::memory:", true, 1)
.await
.expect("create sqlite test database");
let redis = RedisClient::open("redis://127.0.0.1:6379/")
.expect("create redis client for state");

AppState {
config: AppConfig {
http_addr: "127.0.0.1:0".parse().expect("parse test addr"),
database_backend: DatabaseBackend::Sqlite,
database_url: "sqlite::memory:".to_owned(),
database_max_connections: 1,
run_migrations: true,
redis_url: "redis://127.0.0.1:6379/".to_owned(),
limit_rpm: 0,
limit_tpm: 0,
limit_concurrency: 0,
limit_window_seconds: 60,
limit_lease_seconds: 120,
log_level: "off".to_owned(),
admin_seed_email: None,
admin_seed_password: None,
admin_seed_role: "admin".to_owned(),
provider_secret_key: Some("test-provider-secret".to_owned()),
log_raw_request_bodies: false,
daemon_stale_seconds: 90,
},
gateway: Gateway::new(),
database,
redis,
metrics: MetricsRegistry::default(),
}
}

#[test]
fn normalize_auth_mode_defaults_to_api_key() {
Expand Down Expand Up @@ -818,4 +933,172 @@ mod tests {

assert!(normalized.contains("credential_ref"));
}

#[tokio::test]
async fn list_models_includes_only_safe_fresh_daemon_models() {
let state = test_state().await;
let now = unix_timestamp_string();
let stale = (now_utc_epoch_seconds() - 300).to_string();
let provider_id = Uuid::now_v7();
let route_id = Uuid::now_v7();

query(&prepare_sql(
DatabaseBackend::Sqlite,
"INSERT INTO provider_connections (
id, name, provider_type, auth_mode, base_url, api_key_encrypted, enabled, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
))
.bind(provider_id.to_string())
.bind("route-provider")
.bind("openai-compatible")
.bind(AUTH_MODE_API_KEY)
.bind("http://127.0.0.1:18182")
.bind("encrypted")
.bind(1)
.bind(&now)
.bind(&now)
.execute(&state.database)
.await
.expect("insert provider connection");

query(&prepare_sql(
DatabaseBackend::Sqlite,
"INSERT INTO model_routes (
id,
provider_connection_id,
public_model,
upstream_model,
max_tokens,
pricing_input_per_1m_tokens,
pricing_output_per_1m_tokens,
enabled,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
))
.bind(route_id.to_string())
.bind(provider_id.to_string())
.bind("routed-model")
.bind("upstream-routed-model")
.bind(4096_i64)
.bind(0_i64)
.bind(0_i64)
.bind(1)
.bind(&now)
.bind(&now)
.execute(&state.database)
.await
.expect("insert model route");

insert_daemon_node(
&state,
"fresh",
&now,
0,
DAEMON_HEALTHY_STATUS,
r#"["llama3.1","qwen2.5-coder"]"#,
)
.await;
insert_daemon_node(
&state,
"stale",
&stale,
0,
DAEMON_HEALTHY_STATUS,
r#"["stale-model"]"#,
)
.await;
insert_daemon_node(
&state,
"disabled",
&now,
1,
DAEMON_HEALTHY_STATUS,
r#"["disabled-model"]"#,
)
.await;
insert_daemon_node(
&state,
"unhealthy",
&now,
0,
"degraded",
r#"["unhealthy-model"]"#,
)
.await;

let response = list_models(axum::extract::State(state))
.await
.expect("list public models")
.0;

let ids = response
.data
.iter()
.map(|model| model.id.as_str())
.collect::<Vec<_>>();
assert_eq!(ids, vec!["llama3.1", "qwen2.5-coder", "routed-model"]);

let daemon_model = response
.data
.iter()
.find(|model| model.id == "llama3.1")
.expect("daemon model included");
assert_eq!(daemon_model.owned_by, DAEMON_OWNED_BY);
assert_eq!(daemon_model.provider_type, "openai-compatible");
assert_eq!(daemon_model.upstream_model, "llama3.1");
assert_eq!(daemon_model.route_id, DAEMON_ROUTE_ID);
assert_eq!(daemon_model.max_tokens, None);
}

async fn insert_daemon_node(
state: &AppState,
label: &str,
last_seen_at: &str,
disabled: i64,
health_status: &str,
model_ids_json: &str,
) {
let now = unix_timestamp_string();
query(&prepare_sql(
DatabaseBackend::Sqlite,
"INSERT INTO daemon_nodes (
id,
label,
token_hash,
status,
revoked,
last_seen_at,
created_at,
updated_at,
provider_family,
model_ids_json,
max_concurrency,
health_status,
disabled,
hostname,
labels_json,
capability_metadata_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
))
.bind(Uuid::now_v7().to_string())
.bind(label)
.bind(format!("hash-{label}"))
.bind(DAEMON_STATUS_ACTIVE)
.bind(0_i64)
.bind(last_seen_at)
.bind(&now)
.bind(&now)
.bind("openai-compatible")
.bind(model_ids_json)
.bind(2_i64)
.bind(health_status)
.bind(disabled)
.bind(format!("{label}.internal"))
.bind(r#"["private-label"]"#)
.bind(r#"{"local_provider_url":"http://127.0.0.1:11434/v1"}"#)
.execute(&state.database)
.await
.expect("insert daemon node");
}
}
Loading
Loading