From 7d7860ea25bebb1c1462bccbf38ed9cfc4715c74 Mon Sep 17 00:00:00 2001 From: Philipp Schroeppel Date: Sat, 28 Jun 2025 18:54:30 +0200 Subject: [PATCH 1/3] dev: recompile when src or Cargo.toml changes in dev env --- Dockerfile.dev | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile.dev b/Dockerfile.dev index 8595808..551524a 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -6,11 +6,11 @@ RUN apt-get update && apt-get install -y \ libssl-dev \ && rm -rf /var/lib/apt/lists/* -RUN cargo install sccache +RUN cargo install sccache --locked watchexec-cli ENV RUSTC_WRAPPER=sccache ENV SCCACHE_DIR=/sccache WORKDIR /usr/src/nebulous -CMD ["cargo", "run", "--", "serve", "--host", "0.0.0.0", "--port", "3000"] +CMD ["watchexec", "-w", "src", "-w", "Cargo.toml", "-r", "cargo", "run", "--", "serve", "--host", "0.0.0.0", "--port", "3000"] From 3182c331ca8d3c3141785da5e35056f021b038d9 Mon Sep 17 00:00:00 2001 From: Philipp Schroeppel Date: Sun, 29 Jun 2025 19:21:23 +0200 Subject: [PATCH 2/3] feat: split config into client and server parts --- Dockerfile.dev | 1 + src/agent/ns.rs | 4 +- src/client/client.rs | 4 +- src/commands/delete_cmd.rs | 4 +- src/commands/get_cmd.rs | 4 +- src/commands/log_cmd.rs | 6 +- src/commands/login_cmd.rs | 18 +- src/commands/request.rs | 3 +- src/commands/send_cmd.rs | 4 +- src/commands/set_cmd.rs | 6 +- src/commands/show_cmd.rs | 12 +- src/config.rs | 337 +++++++++++++++++------- src/db.rs | 5 +- src/handlers/v1/iam.rs | 8 +- src/handlers/v1/namespaces.rs | 6 +- src/handlers/v1/processors.rs | 10 +- src/lib.rs | 73 ++--- src/middleware.rs | 10 +- src/resources/v1/containers/base.rs | 39 ++- src/resources/v1/processors/standard.rs | 30 +-- 20 files changed, 358 insertions(+), 226 deletions(-) diff --git a/Dockerfile.dev b/Dockerfile.dev index 551524a..4d27d73 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -10,6 +10,7 @@ RUN cargo install sccache --locked watchexec-cli ENV RUSTC_WRAPPER=sccache ENV SCCACHE_DIR=/sccache +ENV RUSTFLAGS="-Awarnings" WORKDIR /usr/src/nebulous diff --git a/src/agent/ns.rs b/src/agent/ns.rs index fa3b4bf..63445ef 100644 --- a/src/agent/ns.rs +++ b/src/agent/ns.rs @@ -1,4 +1,4 @@ -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use crate::entities::namespaces; use anyhow::Result; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; @@ -25,7 +25,7 @@ pub async fn auth_ns( if namespace == "root" { debug!("Namespace is root"); - let root_owner = CONFIG.root_owner.clone(); + let root_owner = SERVER_CONFIG.root_owner.clone(); if !owner_ids.contains(&root_owner) { error!("User not authorized to access root namespace"); return Err(anyhow::anyhow!("User not authorized to access namespace")); diff --git a/src/client/client.rs b/src/client/client.rs index f44ea1d..eb8402a 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -1,4 +1,4 @@ -use crate::config::GlobalConfig; +use crate::config::ClientConfig; use crate::models::V1StreamData; use crate::resources::v1::containers::models::{ V1Container, V1ContainerRequest, V1ContainerSearch, V1Containers, V1UpdateContainer, @@ -49,7 +49,7 @@ impl NebulousClient { /// Creates a new NebulousClient by reading from the global config. /// You could also pass server and api key directly if preferred. pub fn new_from_config() -> Result> { - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; let current_server = config .get_current_server_config() .ok_or("No current server config found")?; diff --git a/src/commands/delete_cmd.rs b/src/commands/delete_cmd.rs index 167ed31..cb722a6 100644 --- a/src/commands/delete_cmd.rs +++ b/src/commands/delete_cmd.rs @@ -1,6 +1,6 @@ use crate::commands::request::server_request; use futures::future::join_all; -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use nebulous::resources::v1::containers::models::V1Containers; use reqwest::Client; use std::error::Error; @@ -138,7 +138,7 @@ pub async fn delete_processor( namespace: Option, ) -> Result<(), Box> { let client = Client::new(); - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; let current_server = config.get_current_server_config().unwrap(); let server = current_server.server.as_ref().unwrap(); let api_key = current_server.api_key.as_ref().unwrap(); diff --git a/src/commands/get_cmd.rs b/src/commands/get_cmd.rs index 3821d55..4a18f90 100644 --- a/src/commands/get_cmd.rs +++ b/src/commands/get_cmd.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use serde::Serialize; use crate::commands::request::server_request; -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use nebulous::resources::v1::containers::models::{V1Container, V1Containers}; use serde_json::Value; use std::error::Error; @@ -281,7 +281,7 @@ pub async fn get_processors( name: Option, namespace: Option, ) -> Result<(), Box> { - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; debug!("Config: {:?}", config); let current_server = config.get_current_server_config().unwrap(); let server = current_server.server.as_ref().unwrap(); diff --git a/src/commands/log_cmd.rs b/src/commands/log_cmd.rs index 2104886..6a602da 100644 --- a/src/commands/log_cmd.rs +++ b/src/commands/log_cmd.rs @@ -1,4 +1,4 @@ -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use std::error::Error as StdError; use std::io::Write; @@ -15,7 +15,7 @@ pub async fn fetch_container_logs( follow: bool, ) -> Result> { // Load config - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; let current_server = config.get_current_server_config().unwrap(); let server = current_server.server.as_ref().unwrap(); let api_key = current_server.api_key.as_ref().unwrap(); @@ -143,7 +143,7 @@ async fn fetch_container_id_from_api( namespace: &str, name: &str, ) -> Result> { - let config = nebulous::config::GlobalConfig::read()?; + let config = ClientConfig::read()?; let current_server = config.get_current_server_config().unwrap(); let server = current_server.server.as_ref().unwrap(); let api_key = current_server.api_key.as_ref().unwrap(); diff --git a/src/commands/login_cmd.rs b/src/commands/login_cmd.rs index 10c3793..d5fb6c0 100644 --- a/src/commands/login_cmd.rs +++ b/src/commands/login_cmd.rs @@ -1,7 +1,7 @@ use std::error::Error; use std::io::{self, Write}; -use nebulous::config::{GlobalConfig, ServerConfig}; +use nebulous::config::{ClientConfig, ClientServerConfig}; use open; use rpassword; @@ -17,7 +17,7 @@ pub async fn execute( let nebu_url = nebu_url.trim().trim_end_matches("/").to_string(); - let mut config = GlobalConfig::read()?; + let mut client_config = ClientConfig::read()?; if auth.is_some() && hub.is_some() { let auth_url = auth.unwrap().trim().trim_end_matches("/").to_string(); @@ -35,13 +35,13 @@ pub async fn execute( io::stdout().flush()?; let api_key = rpassword::read_password()?; - config.servers.push(ServerConfig { - name: Some("cloud".to_string()), + client_config.servers.push(ClientServerConfig { + name: "cloud".to_string(), server: Some(nebu_url), api_key: Some(api_key), auth_server: Some(auth_url), }); - config.current_server = Some("cloud".to_string()); + client_config.current_server = Some("cloud".to_string()); } else { println!( r#"Configuring the Nebulous CLI to use the integrated auth server. @@ -63,15 +63,15 @@ When you're running nebulous on Kubernetes, use: io::stdout().flush()?; let api_key = rpassword::read_password()?; - config.servers.push(ServerConfig { - name: Some("nebu".to_string()), + client_config.servers.push(ClientServerConfig { + name: "nebu".to_string(), server: Some(nebu_url), api_key: Some(api_key), auth_server: None, }); - config.current_server = Some("nebu".to_string()); + client_config.current_server = Some("nebu".to_string()); } - config.write()?; + client_config.write()?; // TODO: Check that we can actually reach and authenticate with the server diff --git a/src/commands/request.rs b/src/commands/request.rs index f7999f7..ac89036 100644 --- a/src/commands/request.rs +++ b/src/commands/request.rs @@ -1,10 +1,11 @@ use serde_json::Value; +use nebulous::config::ClientConfig; fn prepare_request( path: &str, method: reqwest::Method, ) -> Result> { - let config = nebulous::config::GlobalConfig::read()?; + let config = ClientConfig::read()?; let current_server = config .get_current_server_config() .ok_or("Failed to get current server configuration")?; diff --git a/src/commands/send_cmd.rs b/src/commands/send_cmd.rs index 7e5289f..314c034 100644 --- a/src/commands/send_cmd.rs +++ b/src/commands/send_cmd.rs @@ -1,4 +1,4 @@ -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use nebulous::models::V1StreamData; use serde_json::Value; use std::error::Error; @@ -7,7 +7,7 @@ use std::io::{self, Read}; use tracing::debug; pub async fn send_messages(args: &crate::cli::SendMessageCommands) -> Result<(), Box> { - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; debug!("Config: {:?}", config); let current_server = config.get_current_server_config().unwrap(); // Handle error more gracefully let server = current_server.server.as_ref().unwrap(); // Handle error diff --git a/src/commands/set_cmd.rs b/src/commands/set_cmd.rs index 7a9d9bc..9747ff9 100644 --- a/src/commands/set_cmd.rs +++ b/src/commands/set_cmd.rs @@ -1,16 +1,16 @@ use colored::Colorize; -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use std::error::Error; pub async fn set_context(server_name: &str) -> Result<(), Box> { // Read the current config - let mut config = GlobalConfig::read()?; + let mut config = ClientConfig::read()?; // Check if the server exists let server_exists = config .servers .iter() - .any(|s| s.name.as_deref() == Some(server_name)); + .any(|s| s.name == server_name); if !server_exists { return Err(format!("Server '{}' not found in configuration", server_name).into()); diff --git a/src/commands/show_cmd.rs b/src/commands/show_cmd.rs index 96204a7..a57ce6d 100644 --- a/src/commands/show_cmd.rs +++ b/src/commands/show_cmd.rs @@ -1,10 +1,10 @@ use anyhow::{Context, Result}; use colored::Colorize; -use nebulous::config::GlobalConfig; +use nebulous::config::ClientConfig; use std::error::Error; pub async fn show_config() -> Result<(), Box> { - let config = GlobalConfig::read()?; + let config = ClientConfig::read()?; println!("{}", "Global Configuration:".bold().underline()); @@ -23,7 +23,7 @@ pub async fn show_config() -> Result<(), Box> { let is_current = config .current_server .as_ref() - .and_then(|current| server.name.as_ref().map(|name| current == name)) + .map(|current| current == &server.name) .unwrap_or(false); let prefix = if is_current { @@ -32,11 +32,7 @@ pub async fn show_config() -> Result<(), Box> { " ".normal() }; - if let Some(name) = &server.name { - println!("{}{}", prefix, name.bold()); - } else { - println!("{}{}", prefix, format!("Server #{}", idx + 1).bold()); - } + println!("{}{}", prefix, server.name.bold()); if let Some(api_key) = &server.api_key { let hidden_key = format!( diff --git a/src/config.rs b/src/config.rs index e3e2c7f..66d6b3c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,21 +7,20 @@ use std::fs; use std::path::PathBuf; #[derive(Serialize, Deserialize, Default, Debug)] -pub struct GlobalConfig { - pub servers: Vec, +pub struct ClientConfig { + pub servers: Vec, pub current_server: Option, } #[derive(Serialize, Deserialize, Default, Clone, Debug)] -pub struct ServerConfig { - /// Optional identifier for your server config. - pub name: Option, +pub struct ClientServerConfig { + pub name: String, pub api_key: Option, pub server: Option, pub auth_server: Option, } -impl GlobalConfig { +impl ClientConfig { /// Read the config from disk, or create a default one. /// Then ensure that we either find or create a matching server in `self.servers` /// based on environment variables, and set that as the `default_server`. @@ -32,66 +31,55 @@ impl GlobalConfig { // Load or create default let mut config = if path_exists { let yaml = fs::read_to_string(&config_path)?; - serde_yaml::from_str::(&yaml)? + serde_yaml::from_str::(&yaml)? } else { - GlobalConfig::default() + ClientConfig::default() }; - // Collect environment variables (NO fallback defaults here) + // Only write if the file didn't already exist + if !path_exists { + config.write()?; + } + + config.create_config_from_environment(); + + Ok(config) + } + + fn create_config_from_environment(&mut self) { let env_api_key = env::var("NEBU_API_KEY") - .or_else(|_| env::var("NEBULOUS_API_KEY")) .or_else(|_| env::var("AGENTSEA_API_KEY")) .ok(); let env_server = env::var("NEBU_SERVER") - .or_else(|_| env::var("NEBULOUS_SERVER")) .or_else(|_| env::var("AGENTSEA_SERVER")) .ok(); let env_auth_server = env::var("NEBU_AUTH_SERVER") - .or_else(|_| env::var("NEBULOUS_AUTH_SERVER")) .or_else(|_| env::var("AGENTSEA_AUTH_SERVER")) .ok(); - // Only proceed if all three environment variables are present. if let (Some(env_api_key), Some(env_server), Some(env_auth_server)) = (env_api_key, env_server, env_auth_server) { // Find a matching server (all three fields match). - let found_server = config.servers.iter_mut().find(|srv| { + let found_server = self.servers.iter_mut().find(|srv| { srv.api_key.as_deref() == Some(&env_api_key) && srv.server.as_deref() == Some(&env_server) && srv.auth_server.as_deref() == Some(&env_auth_server) }); // If found, use that. If not, create a new entry. - let server_name = "env-based-server".to_string(); - let chosen_name = if let Some(srv) = found_server { - // Make sure it has a name, so we can set default_server to it - if srv.name.is_none() { - srv.name = Some(server_name.clone()); - } - srv.name.clone().unwrap() + if let Some(srv) = found_server { + self.current_server = Some(srv.name.clone()); } else { - // Need to create a new server entry - let new_server = ServerConfig { - name: Some(server_name.clone()), + let new_server = ClientServerConfig { + name: "env-based-server".to_string(), api_key: Some(env_api_key), server: Some(env_server), auth_server: Some(env_auth_server), }; - config.servers.push(new_server); - server_name + self.update_server(new_server, true); }; - - // Set that server as the "current" or default - config.current_server = Some(chosen_name); } - - // Only write if the file didn't already exist - if !path_exists { - config.write()?; - } - - Ok(config) } /// Write the current GlobalConfig to disk (YAML). @@ -109,21 +97,61 @@ impl GlobalConfig { Ok(()) } - /// Get the server config for the current `default_server`. - /// Returns `None` if `default_server` is unset or if no server - /// with that name is found. - pub fn get_current_server_config(&self) -> Option<&ServerConfig> { - self.current_server.as_deref().and_then(|name| { - self.servers - .iter() - .find(|srv| srv.name.as_deref() == Some(name)) - }) + /// Get the server config for the current server. + pub fn get_current_server_config(&self) -> Option<&ClientServerConfig> { + self.current_server + .as_deref() + .and_then(|name| self.servers.iter().find(|srv| srv.name == name)) } - pub fn get_auth_server(&self) -> Option<&str> { - self.get_current_server_config() - .and_then(|cfg| cfg.auth_server.as_deref()) - .or_else(|| Some(CONFIG.auth_server.as_str())) + /// Get the server config for a specific server. + pub fn get_server(&self, name: &str) -> Option<&ClientServerConfig> { + self.servers.iter().find(|srv| srv.name == name) + } + + /// Remove a server from the config. + pub fn drop_server(&mut self, name: &str) { + if let Some(pos) = self.servers.iter().position(|srv| srv.name == name) { + self.servers.remove(pos); + + // If the removed server was the current one, clear it. + if self.current_server == Some(name.to_string()) { + self.current_server = None; + } + } + } + + /// Update or add a server config. + pub fn update_server(&mut self, new_config: ClientServerConfig, make_current: bool) { + if let Some(pos) = self + .servers + .iter() + .position(|srv| srv.name == new_config.name) + { + self.servers[pos] = new_config; + } else { + if make_current { + self.current_server = Some(new_config.name.clone()); + } + self.servers.push(new_config); + } + } + + /// Add a server. + pub fn add_server(&mut self, config: ClientServerConfig, make_current: bool) { + if self.contains_server(&config.name) { + eprintln!( + "Server with name '{}' already exists. Please choose a different name.", + config.name + ); + return; + } + self.update_server(config, make_current); + } + + /// Check if a server with the given name exists. + pub fn contains_server(&self, name: &str) -> bool { + self.servers.iter().any(|srv| srv.name == name) } } @@ -135,58 +163,191 @@ fn get_config_file_path() -> Result> { } #[derive(Debug, Clone)] -pub struct Config { - pub message_queue_type: String, - pub kafka_bootstrap_servers: String, - pub kafka_timeout_ms: String, - pub redis_host: String, - pub redis_port: String, - pub redis_password: Option, - pub redis_url: Option, - pub redis_publish_url: Option, +pub struct ServerConfig { pub database_url: String, - pub tailscale_api_key: Option, - pub tailscale_tailnet: Option, + pub message_queue_type: String, + pub redis: RedisConfig, + pub kafka: KafkaConfig, + + pub tailscale: Option, + + pub auth: ServerAuthConfig, + pub bucket_name: String, pub bucket_region: String, pub root_owner: String, - pub auth_server: String, + + pub publish_url: Option, +} + +#[derive(Debug, Clone)] +pub struct DatabaseConfig { + pub host: String, + pub port: u16, + pub user: String, + pub password: Option, + pub name: String, +} + +impl DatabaseConfig { + pub fn new() -> Self { + dotenv().ok(); + + Self { + host: env::var("DATABASE_HOST").unwrap_or_else(|_| "localhost".to_string()), + port: env::var("DATABASE_PORT") + .unwrap_or_else(|_| "5432".to_string()) + .parse() + .expect("Invalid value for DATABASE_PORT."), + user: env::var("DATABASE_USER").unwrap_or_else(|_| "postgres".to_string()), + password: env::var("DATABASE_PASSWORD").ok(), + name: env::var("DATABASE_NAME").unwrap_or_else(|_| "postgres".to_string()), + } + } +} + +#[derive(Debug, Clone)] +pub struct RedisConfig { + pub host: String, + pub port: u16, + pub user: Option, + pub password: Option, + pub database: u16, pub publish_url: Option, } -impl Config { +impl RedisConfig { + pub fn new() -> Self { + dotenv().ok(); + + Self { + host: env::var("REDIS_HOST").unwrap_or_else(|_| "localhost".to_string()), + port: env::var("REDIS_PORT") + .unwrap_or_else(|_| "6379".to_string()) + .parse() + .expect("Invalid value for REDIS_PORT."), + user: env::var("REDIS_USER").ok(), + password: env::var("REDIS_PASSWORD").ok(), + database: env::var("REDIS_DATABASE") + .unwrap_or_else(|_| "0".to_string()) + .parse() + .expect("Invalid value for REDIS_DATABASE."), + publish_url: env::var("REDIS_PUBLISH_URL").ok(), + } + } + + pub fn get_url(&self) -> String { + format!( + "redis://{}:{}@{}:{}/{}", + self.user.as_deref().unwrap_or(""), + self.password.as_deref().unwrap_or(""), + self.host, + self.port, + self.database + ) + } +} + +#[derive(Debug, Clone)] +pub struct KafkaConfig { + pub bootstrap_servers: String, + pub timeout_ms: u32, +} + +impl KafkaConfig { pub fn new() -> Self { dotenv().ok(); + Self { + bootstrap_servers: env::var("KAFKA_BOOTSTRAP_SERVERS").unwrap_or_else(|_| "localhost:9092".to_string()), + timeout_ms: env::var("KAFKA_TIMEOUT_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(5000), + } + } +} + +#[derive(Debug, Clone)] +pub struct TailscaleConfig { + pub api_key: String, + pub tailnet: String, +} + +#[derive(Debug, Clone)] +pub struct ServerAuthConfig { + pub internal: bool, + pub url: String, +} + +impl ServerAuthConfig { + pub fn new() -> Self { + dotenv().ok(); + + let url = env::var("NEBU_AUTH_URL") + .or_else(|_| env::var("NEBULOUS_AUTH_SERVER")) + .or_else(|_| env::var("AGENTSEA_AUTH_SERVER")) + .or_else(|_| env::var("AGENTSEA_AUTH_URL")) + .unwrap_or_else(|_| "https://auth.hub.agentlabs.xyz".to_string()); + + Self { + internal: true, + url, + } + } +} + +impl ServerConfig { + pub fn new() -> Self { + dotenv().ok(); + + let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| { + let db_config = DatabaseConfig::new(); + format!( + "postgres://{}:{}@{}:{}/{}", + db_config.user, + db_config.password.as_deref().unwrap_or(""), + db_config.host, + db_config.port, + db_config.name + ) + }); + + let message_queue_type = match env::var("MESSAGE_QUEUE_TYPE") { + Ok(queue_type) => { + if queue_type == "redis" { + queue_type + } else { + panic!("Invalid MESSAGE_QUEUE_TYPE. Only 'redis' is supported. (You can safely omit this value.)") + } + } + Err(_) => "redis".to_string(), + }; + + + let redis = RedisConfig::new(); + let kafka = KafkaConfig::new(); + + let tailscale = match (env::var("TS_API_KEY"), env::var("TS_TAILNET")) { + (Ok(api_key), Ok(tailnet)) => Some(TailscaleConfig { api_key, tailnet }), + _ => None, + }; + + let auth = ServerAuthConfig::new(); Self { - message_queue_type: env::var("MESSAGE_QUEUE_TYPE") - .unwrap_or_else(|_| "redis".to_string()), - kafka_bootstrap_servers: env::var("KAFKA_BOOTSTRAP_SERVERS") - .unwrap_or_else(|_| "localhost:9092".to_string()), - kafka_timeout_ms: env::var("KAFKA_TIMEOUT_MS").unwrap_or_else(|_| "5000".to_string()), - redis_host: env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()), - redis_port: env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string()), - redis_password: env::var("REDIS_PASSWORD").ok(), - redis_url: env::var("REDIS_URL").ok(), - redis_publish_url: env::var("REDIS_PUBLISH_URL").ok(), - database_url: env::var("DATABASE_URL") - .unwrap_or_else(|_| "sqlite://.data/data.db".to_string()), - tailscale_api_key: env::var("TS_API_KEY").or_else(|_| env::var("TAILSCALE_API_KEY")).ok(), - tailscale_tailnet: env::var("TAILSCALE_TAILNET").ok(), + database_url, + message_queue_type, + redis, + kafka, + tailscale, + auth, + // TODO: Move this to dedicated config bucket_name: env::var("NEBU_BUCKET_NAME") - .or_else(|_| env::var("NEBULOUS_BUCKET_NAME")) - .unwrap_or_else(|_| panic!("NEBU_BUCKET_NAME or NEBULOUS_BUCKET_NAME environment variable must be set")), + .unwrap_or_else(|_| panic!("NEBU_BUCKET_NAME environment variable must be set")), bucket_region: env::var("NEBU_BUCKET_REGION") - .or_else(|_| env::var("NEBULOUS_BUCKET_REGION")) - .unwrap_or_else(|_| panic!("NEBU_BUCKET_REGION or NEBULOUS_BUCKET_REGION environment variable must be set")), + .unwrap_or_else(|_| panic!("NEBU_BUCKET_REGION environment variable must be set")), root_owner: env::var("NEBU_ROOT_OWNER") - .or_else(|_| env::var("NEBULOUS_ROOT_OWNER")) - .unwrap_or_else(|_| panic!("NEBU_ROOT_OWNER or NEBULOUS_ROOT_OWNER environment variable must be set")), - auth_server: env::var("NEBU_AUTH_SERVER") - .or_else(|_| env::var("NEBULOUS_AUTH_SERVER")) - .or_else(|_| env::var("AGENTSEA_AUTH_SERVER")) - .or_else(|_| env::var("AGENTSEA_AUTH_URL")) - .unwrap_or_else(|_| "https://auth.hub.agentlabs.xyz".to_string()), + .unwrap_or_else(|_| panic!("NEBU_ROOT_OWNER environment variable must be set")), publish_url: env::var("NEBU_PUBLISH_URL") .or_else(|_| env::var("NEBULOUS_PUBLISH_URL")) .ok(), @@ -194,4 +355,4 @@ impl Config { } } // Global static CONFIG instance -pub static CONFIG: Lazy = Lazy::new(Config::new); +pub static SERVER_CONFIG: Lazy = Lazy::new(ServerConfig::new); \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 7028300..81cdb5b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,4 @@ -// src/db.rs -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use sea_orm::{ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbErr, Schema}; use std::time::Duration; @@ -21,7 +20,7 @@ fn create_connect_options(url: String) -> ConnectOptions { } pub async fn init_db() -> Result { - let database_url = &CONFIG.database_url; + let database_url = &SERVER_CONFIG.database_url; println!("Connecting to database at: {}", database_url); // Create the data directory if it doesn't exist diff --git a/src/handlers/v1/iam.rs b/src/handlers/v1/iam.rs index 25daa31..d12c9a7 100644 --- a/src/handlers/v1/iam.rs +++ b/src/handlers/v1/iam.rs @@ -3,7 +3,7 @@ use crate::agent::aws::{ IamCredentials, StsCredentials, }; use crate::agent::ns::auth_ns; -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use crate::models::{V1ResourceMeta, V1UserProfile}; use crate::state::AppState; use aws_config::{self, BehaviorVersion, Region}; @@ -98,8 +98,8 @@ pub async fn create_scoped_s3_token( }; // --- Get Bucket Name from Config --- - // Bucket name is read from global CONFIG at startup - let bucket_name = CONFIG.bucket_name.clone(); + // Bucket name is read from global SERVER_CONFIG at startup + let bucket_name = SERVER_CONFIG.bucket_name.clone(); debug!(?bucket_name, "Retrieved bucket name from config"); // --- Call AWS Agent --- @@ -269,7 +269,7 @@ pub async fn generate_temp_s3_credentials( }; // --- Get Bucket Name from Config --- - let bucket_name = CONFIG.bucket_name.clone(); + let bucket_name = SERVER_CONFIG.bucket_name.clone(); // Default duration: 1 hour (3600 seconds) let duration_seconds = 3600; diff --git a/src/handlers/v1/namespaces.rs b/src/handlers/v1/namespaces.rs index e436d94..d6f92c1 100644 --- a/src/handlers/v1/namespaces.rs +++ b/src/handlers/v1/namespaces.rs @@ -1,6 +1,6 @@ // src/handlers/containers.rs -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use crate::entities::namespaces::{self, ActiveModel as NamespaceActiveModel}; use crate::handlers::v1::volumes::ensure_volume; use crate::models::V1UserProfile; @@ -160,7 +160,7 @@ pub async fn create_namespace( &namespace_entity.owner.clone(), &format!( "s3://{}/data/{}", - &CONFIG.bucket_name, + &SERVER_CONFIG.bucket_name, &namespace_entity.name.clone() ), &namespace_entity.created_by.clone(), @@ -332,7 +332,7 @@ pub async fn ensure_ns_and_resources( name, owner, owner, - format!("s3://{}", &CONFIG.bucket_name).as_str(), + format!("s3://{}", &SERVER_CONFIG.bucket_name).as_str(), created_by, labels, ) diff --git a/src/handlers/v1/processors.rs b/src/handlers/v1/processors.rs index f13da29..5d1ed24 100644 --- a/src/handlers/v1/processors.rs +++ b/src/handlers/v1/processors.rs @@ -1,5 +1,5 @@ use crate::agent::ns::auth_ns; -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use crate::entities::processors; use crate::middleware::get_user_profile_from_token; use crate::models::{V1ResourceMetaRequest, V1StreamData, V1StreamMessage, V1UserProfile}; @@ -853,12 +853,12 @@ pub async fn send_processor( } debug!("User token: {}", user_token); - let auth_server = CONFIG.auth_server.clone(); + let auth_server = SERVER_CONFIG.auth.url.clone(); if auth_server.is_empty() { - error!("Auth server URL is not configured or empty."); + error!("Auth server URL is empty."); return Err(( StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({"error": "Auth server configuration missing"})), + Json(json!({"error": "Auth server configuration is empty"})), )); } @@ -2944,7 +2944,7 @@ async fn generate_agent_key_for_processor( processor: &crate::entities::processors::Model, user_token: &str, ) -> Result { - let auth_server = CONFIG.auth_server.clone(); + let auth_server = SERVER_CONFIG.auth.url.clone(); if auth_server.is_empty() { return Err("Auth server configuration missing".to_string()); } diff --git a/src/lib.rs b/src/lib.rs index f309325..629ee80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,14 +30,14 @@ pub mod utils; pub mod validate; pub mod volumes; -use crate::config::CONFIG; +use crate::config::SERVER_CONFIG; use crate::handlers::v1::namespaces::ensure_namespace; use crate::handlers::v1::volumes::ensure_volume; use axum::Router; use db::init_db; use rdkafka::admin::AdminClient; use rdkafka::producer::FutureProducer; -use rdkafka::ClientConfig; +use rdkafka::ClientConfig as KafkaClientConfig; use routes::create_routes; use sea_orm::DatabaseConnection; use state::AppState; @@ -55,48 +55,25 @@ pub async fn create_app_state() -> Result> println!("Database pool created"); // Initialize the appropriate message queue based on configuration - let message_queue = match CONFIG.message_queue_type.to_lowercase().as_str() { + let message_queue = match SERVER_CONFIG.message_queue_type.to_lowercase().as_str() { "redis" => { - let redis_url = match &CONFIG.redis_url { - Some(url) if !url.is_empty() => { - // Redis URL exists, so use it directly but also parse it to set env vars - if let Ok(parsed_url) = Url::parse(url) { - // Extract and set host - if let Some(host) = parsed_url.host_str() { - env::set_var("REDIS_HOST", host); - } - - // Extract and set port - if let Some(port) = parsed_url.port() { - env::set_var("REDIS_PORT", port.to_string()); - } else { - // Default redis port if not specified in URL - env::set_var("REDIS_PORT", "6379"); - } - - // Extract and set password if present - if let Some(password) = parsed_url.password() { - env::set_var("REDIS_PASSWORD", password); - } - } - - url.clone() + let redis_url = SERVER_CONFIG.redis.get_url(); + + if let Ok(parsed_url) = Url::parse(redis_url.as_str()) { + + if let Some(host) = parsed_url.host_str() { + env::set_var("REDIS_HOST", host); } - _ => { - // Redis URL not present or empty, build from components - let host = &CONFIG.redis_host; - let port = &CONFIG.redis_port; - - match &CONFIG.redis_password { - Some(password) if !password.is_empty() => { - format!("redis://:{}@{}:{}", password, host, port) - } - _ => format!("redis://{}:{}", host, port), - } + + if let Some(port) = parsed_url.port() { + env::set_var("REDIS_PORT", port.to_string()); } - }; - // Create the Redis client using the constructed URL + if let Some(password) = parsed_url.password() { + env::set_var("REDIS_PASSWORD", password); + } + + }; let redis_client = Arc::new(redis::Client::open(redis_url.as_str())?); MessageQueue::Redis { @@ -104,10 +81,10 @@ pub async fn create_app_state() -> Result> } } "kafka" => { - let mut client_config = ClientConfig::new(); - let kafka_config = client_config - .set("bootstrap.servers", &CONFIG.kafka_bootstrap_servers) - .set("message.timeout.ms", &CONFIG.kafka_timeout_ms); + let mut kafka_client_config = KafkaClientConfig::new(); + let kafka_config = kafka_client_config + .set("bootstrap.servers", &SERVER_CONFIG.kafka.bootstrap_servers) + .set("message.timeout.ms", &SERVER_CONFIG.kafka.timeout_ms.to_string()); let producer = Arc::new(kafka_config.clone().create::()?); let admin = Arc::new(kafka_config.create::>()?); @@ -153,8 +130,8 @@ pub async fn ensure_base_resources( match ensure_namespace( db_pool, "root", - &CONFIG.root_owner, - &CONFIG.root_owner, + &SERVER_CONFIG.root_owner, + &SERVER_CONFIG.root_owner, None, ) .await @@ -167,8 +144,8 @@ pub async fn ensure_base_resources( db_pool, "root", "root", - &CONFIG.root_owner, - format!("s3://{}", &CONFIG.bucket_name).as_str(), + &SERVER_CONFIG.root_owner, + format!("s3://{}", &SERVER_CONFIG.bucket_name).as_str(), "root", None, ) diff --git a/src/middleware.rs b/src/middleware.rs index 1fd06cc..a42d37d 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,5 +1,5 @@ use crate::auth; -use crate::config::CONFIG; +use crate::config::{ClientConfig, ServerConfig, SERVER_CONFIG}; use crate::models::V1UserProfile; use crate::AppState; use axum::{ @@ -131,15 +131,15 @@ pub async fn get_user_profile_from_internal_token( pub async fn get_user_profile_from_external_token( token: &str, ) -> Result { - let config = crate::config::GlobalConfig::read().unwrap(); + let client_config = ClientConfig::read().unwrap(); - let auth_server = config.get_current_server_config().map_or_else( - || CONFIG.auth_server.clone(), + let auth_server = client_config.get_current_server_config().map_or_else( + || SERVER_CONFIG.auth.url.clone(), |server_config| { server_config .auth_server .clone() - .unwrap_or_else(|| CONFIG.auth_server.clone()) + .unwrap_or_else(|| SERVER_CONFIG.auth.url.clone()) }, ); diff --git a/src/resources/v1/containers/base.rs b/src/resources/v1/containers/base.rs index fefe038..5a3ccba 100644 --- a/src/resources/v1/containers/base.rs +++ b/src/resources/v1/containers/base.rs @@ -1,7 +1,6 @@ use crate::agent::agent::create_agent_key; use crate::agent::aws::create_s3_scoped_user; -use crate::config::GlobalConfig; -use crate::config::CONFIG; +use crate::config::{ClientConfig, SERVER_CONFIG}; use crate::entities::containers; use crate::handlers::v1::volumes::ensure_volume; use crate::models::{V1CreateAgentKeyRequest, V1UserProfile}; @@ -184,7 +183,7 @@ pub trait ContainerPlatform { model: &containers::Model, db: &DatabaseConnection, ) -> Result, Box> { - let config = GlobalConfig::read().unwrap(); + let config = ClientConfig::read().unwrap(); let mut env = HashMap::new(); debug!("Getting agent key"); @@ -196,7 +195,7 @@ pub trait ContainerPlatform { } }; - let root_volume_uri = format!("s3://{}/data", CONFIG.bucket_name); + let root_volume_uri = format!("s3://{}/data", SERVER_CONFIG.bucket_name); let source = format!("{}/{}", root_volume_uri, model.namespace); debug!("Ensuring volume: {:?}", source.clone()); @@ -220,7 +219,7 @@ pub trait ContainerPlatform { debug!("Creating s3 token"); let s3_token = - match create_s3_scoped_user(&CONFIG.bucket_name, &model.namespace, &model.id).await { + match create_s3_scoped_user(&SERVER_CONFIG.bucket_name, &model.namespace, &model.id).await { Ok(token) => token, Err(e) => { error!("Error creating s3 token: {:?}", e); @@ -249,7 +248,7 @@ pub trait ContainerPlatform { ); env.insert( "RCLONE_CONFIG_S3REMOTE_REGION".to_string(), - CONFIG.bucket_region.clone(), + SERVER_CONFIG.bucket_region.clone(), ); env.insert("RCLONE_S3_NO_CHECK_BUCKET".to_string(), "true".to_string()); env.insert("NEBU_API_KEY".to_string(), agent_key.clone().unwrap()); @@ -261,11 +260,11 @@ pub trait ContainerPlatform { } env.insert( "AGENTSEA_AUTH_SERVER".to_string(), - CONFIG.auth_server.clone(), + SERVER_CONFIG.auth.url.clone(), ); env.insert( "NEBULOUS_SERVER".to_string(), - CONFIG.publish_url.clone().unwrap(), + SERVER_CONFIG.publish_url.clone().unwrap(), ); env.insert("NEBU_NAMESPACE".to_string(), model.namespace.clone()); @@ -332,9 +331,9 @@ pub trait ContainerPlatform { } async fn get_tailscale_client(&self) -> TailscaleClient { - let tailscale_api_key = CONFIG - .tailscale_api_key - .clone() + let tailscale_api_key = SERVER_CONFIG + .tailscale.as_ref() + .map(|ts| ts.api_key.clone()) .expect("Tailscale API key not found in config"); debug!("Tailscale key: {}", tailscale_api_key); TailscaleClient::new(tailscale_api_key) @@ -344,10 +343,10 @@ pub trait ContainerPlatform { &self, model: &containers::Model, ) -> Result> { - let tailnet = CONFIG - .tailscale_tailnet - .clone() - .ok_or_else(|| "tailscale_tailnet not found in config".to_string())?; + let tailnet = SERVER_CONFIG + .tailscale.as_ref() + .map(|ts| ts.tailnet.clone()) + .expect("Tailscale tailnet not found in config"); debug!("Tailnet: {}", tailnet); @@ -412,7 +411,7 @@ pub trait ContainerPlatform { &self, user_profile: &V1UserProfile, ) -> Result> { - let config = crate::config::GlobalConfig::read().unwrap(); + let config = ClientConfig::read().unwrap(); debug!("[DEBUG] get_agent_key: Entering function"); debug!("[DEBUG] get_agent_key: user_profile = {:?}", user_profile); @@ -606,10 +605,10 @@ pub async fn get_ip_for_tailscale_device_hostname( "[Tailscale] Attempting to fetch IP for hostname: {}", device_hostname ); - let tailscale_api_key = CONFIG - .tailscale_api_key - .clone() - .ok_or_else(|| "TAILSCALE_API_KEY not found in config".to_string())?; + let tailscale_api_key = SERVER_CONFIG + .tailscale.as_ref() + .map(|ts| ts.api_key.clone()) + .expect("Tailscale API key not found in config"); let client = TailscaleClient::new(tailscale_api_key); // Use "-" for the default tailnet. diff --git a/src/resources/v1/processors/standard.rs b/src/resources/v1/processors/standard.rs index b0863f3..9796941 100644 --- a/src/resources/v1/processors/standard.rs +++ b/src/resources/v1/processors/standard.rs @@ -1,5 +1,5 @@ use crate::agent::agent::create_agent_key; -use crate::config::CONFIG; +use crate::config::{ClientConfig, SERVER_CONFIG}; use crate::entities::containers; use crate::entities::processors; use crate::models::V1CreateAgentKeyRequest; @@ -154,17 +154,12 @@ impl StandardProcessor { ); // Fallback to existing config if Tailscale lookup fails // This maintains previous behavior in case of Tailscale issues. - match CONFIG.redis_publish_url.clone() { + match SERVER_CONFIG.redis.publish_url.clone() { Some(url) => { debug!("Using REDIS_PUBLISH_URL: {}", url); url } - None => CONFIG.redis_url.clone().ok_or_else(|| { - Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "REDIS_URL or REDIS_PUBLISH_URL must be set if Tailscale lookup fails", - )) - })?, + None => SERVER_CONFIG.redis.get_url() } } }; @@ -1087,11 +1082,13 @@ impl ProcessorPlatform for StandardProcessor { // Assume a function exists to create the key using user profile // We need the auth server URL, user token, desired agent ID, name, and duration. - let config = crate::config::GlobalConfig::read() + let client_config = ClientConfig::read() .map_err(|e| format!("Failed to read global config: {}", e))?; - let auth_server = config - .get_auth_server() - .ok_or_else(|| "Auth server URL not configured".to_string())?; + let auth_server = client_config + .get_current_server_config() + .and_then(|s| s.auth_server.clone()) + .expect("Auth server URL not configured"); + let user_token = user_profile .token .as_ref() @@ -1240,11 +1237,12 @@ impl ProcessorPlatform for StandardProcessor { .decrypt_value() .map_err(|e| format!("Failed to decrypt agent key: {}", e))?; - let config = crate::config::GlobalConfig::read() + let client_config = ClientConfig::read() .map_err(|e| format!("Failed to read global config: {}", e))?; - let auth_server = config - .get_auth_server() - .ok_or_else(|| "Auth server URL not configured".to_string())?; + let auth_server = client_config + .get_current_server_config() + .and_then(|s| s.auth_server.clone()) + .expect("Auth server URL not configured"); debug!( "Fetching user profile using processor agent key from {}", From 5ae27e69bbcb5453957d696bfbc3322a2994bcab Mon Sep 17 00:00:00 2001 From: Philipp Schroeppel Date: Tue, 1 Jul 2025 13:16:29 +0200 Subject: [PATCH 3/3] feat: use hashmap to store client-side server configs and enforce interaction via api --- src/commands/login_cmd.rs | 10 ++++----- src/commands/set_cmd.rs | 5 +---- src/commands/show_cmd.rs | 8 ++----- src/config.rs | 44 +++++++++++++-------------------------- 4 files changed, 21 insertions(+), 46 deletions(-) diff --git a/src/commands/login_cmd.rs b/src/commands/login_cmd.rs index d5fb6c0..51b1b26 100644 --- a/src/commands/login_cmd.rs +++ b/src/commands/login_cmd.rs @@ -35,13 +35,12 @@ pub async fn execute( io::stdout().flush()?; let api_key = rpassword::read_password()?; - client_config.servers.push(ClientServerConfig { + client_config.add_server(ClientServerConfig { name: "cloud".to_string(), server: Some(nebu_url), api_key: Some(api_key), auth_server: Some(auth_url), - }); - client_config.current_server = Some("cloud".to_string()); + }, true); } else { println!( r#"Configuring the Nebulous CLI to use the integrated auth server. @@ -63,13 +62,12 @@ When you're running nebulous on Kubernetes, use: io::stdout().flush()?; let api_key = rpassword::read_password()?; - client_config.servers.push(ClientServerConfig { + client_config.add_server(ClientServerConfig { name: "nebu".to_string(), server: Some(nebu_url), api_key: Some(api_key), auth_server: None, - }); - client_config.current_server = Some("nebu".to_string()); + }, true); } client_config.write()?; diff --git a/src/commands/set_cmd.rs b/src/commands/set_cmd.rs index 9747ff9..e0e125f 100644 --- a/src/commands/set_cmd.rs +++ b/src/commands/set_cmd.rs @@ -7,10 +7,7 @@ pub async fn set_context(server_name: &str) -> Result<(), Box> { let mut config = ClientConfig::read()?; // Check if the server exists - let server_exists = config - .servers - .iter() - .any(|s| s.name == server_name); + let server_exists = config.contains_server(server_name); if !server_exists { return Err(format!("Server '{}' not found in configuration", server_name).into()); diff --git a/src/commands/show_cmd.rs b/src/commands/show_cmd.rs index a57ce6d..8b3bb7b 100644 --- a/src/commands/show_cmd.rs +++ b/src/commands/show_cmd.rs @@ -19,12 +19,8 @@ pub async fn show_config() -> Result<(), Box> { if config.servers.is_empty() { println!(" {}", "No servers configured".yellow()); } else { - for (idx, server) in config.servers.iter().enumerate() { - let is_current = config - .current_server - .as_ref() - .map(|current| current == &server.name) - .unwrap_or(false); + for (idx, (_, server)) in config.servers.iter().enumerate() { + let is_current = config.current_server == Some(server.name.clone()); let prefix = if is_current { "→ ".green() diff --git a/src/config.rs b/src/config.rs index 66d6b3c..32eb585 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,13 +2,14 @@ use dirs; use dotenv::dotenv; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::env; use std::fs; use std::path::PathBuf; #[derive(Serialize, Deserialize, Default, Debug)] pub struct ClientConfig { - pub servers: Vec, + pub servers: HashMap, pub current_server: Option, } @@ -61,15 +62,15 @@ impl ClientConfig { (env_api_key, env_server, env_auth_server) { // Find a matching server (all three fields match). - let found_server = self.servers.iter_mut().find(|srv| { + let found_server = self.servers.iter_mut().find(|(_, srv)| { srv.api_key.as_deref() == Some(&env_api_key) && srv.server.as_deref() == Some(&env_server) && srv.auth_server.as_deref() == Some(&env_auth_server) }); // If found, use that. If not, create a new entry. - if let Some(srv) = found_server { - self.current_server = Some(srv.name.clone()); + if let Some((name, _)) = found_server { + self.current_server = Some(name.clone()); } else { let new_server = ClientServerConfig { name: "env-based-server".to_string(), @@ -97,47 +98,31 @@ impl ClientConfig { Ok(()) } - /// Get the server config for the current server. pub fn get_current_server_config(&self) -> Option<&ClientServerConfig> { self.current_server .as_deref() - .and_then(|name| self.servers.iter().find(|srv| srv.name == name)) + .and_then(|name| self.servers.get(name)) } - /// Get the server config for a specific server. pub fn get_server(&self, name: &str) -> Option<&ClientServerConfig> { - self.servers.iter().find(|srv| srv.name == name) + self.servers.get(name) } - /// Remove a server from the config. pub fn drop_server(&mut self, name: &str) { - if let Some(pos) = self.servers.iter().position(|srv| srv.name == name) { - self.servers.remove(pos); + self.servers.remove(name); - // If the removed server was the current one, clear it. - if self.current_server == Some(name.to_string()) { - self.current_server = None; - } + if self.current_server == Some(name.to_string()) { + self.current_server = None; } } - /// Update or add a server config. pub fn update_server(&mut self, new_config: ClientServerConfig, make_current: bool) { - if let Some(pos) = self - .servers - .iter() - .position(|srv| srv.name == new_config.name) - { - self.servers[pos] = new_config; - } else { - if make_current { - self.current_server = Some(new_config.name.clone()); - } - self.servers.push(new_config); + if make_current { + self.current_server = Some(new_config.name.clone()); } + self.servers.insert(new_config.name.clone(), new_config); } - /// Add a server. pub fn add_server(&mut self, config: ClientServerConfig, make_current: bool) { if self.contains_server(&config.name) { eprintln!( @@ -149,9 +134,8 @@ impl ClientConfig { self.update_server(config, make_current); } - /// Check if a server with the given name exists. pub fn contains_server(&self, name: &str) -> bool { - self.servers.iter().any(|srv| srv.name == name) + self.servers.contains_key(name) } }