diff --git a/Cargo.lock b/Cargo.lock index d0839fb1ea..3856d4d112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3476,17 +3476,21 @@ dependencies = [ "axum", "axum-extra", "http", + "tonic", ] [[package]] name = "saluki-app" version = "0.1.0" dependencies = [ + "arc-swap", + "async-trait", "axum", "bytesize", "chrono", "chrono-tz", "http", + "hyper", "iana-time-zone", "itoa", "memory-accounting", diff --git a/lib/saluki-api/Cargo.toml b/lib/saluki-api/Cargo.toml index 79361606cc..265028cf9d 100644 --- a/lib/saluki-api/Cargo.toml +++ b/lib/saluki-api/Cargo.toml @@ -12,3 +12,4 @@ workspace = true axum = { workspace = true, features = ["http1", "tokio"] } axum-extra = { workspace = true, features = ["query"] } http = { workspace = true } +tonic = { workspace = true } diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index e4524639e5..fd3e3a1417 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -2,6 +2,7 @@ pub use axum::response; pub use axum::routing; use axum::Router; pub use http::StatusCode; +use tonic::service::Routes; pub mod extract { pub use axum::extract::*; @@ -17,3 +18,100 @@ pub trait APIHandler { fn generate_initial_state(&self) -> Self::State; fn generate_routes(&self) -> Router; } + +/// API endpoint type. +/// +/// Identifies whether or not a route should be exposed on the unprivileged or privileged API endpoint. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum EndpointType { + /// The unprivileged (plain HTTP) API endpoint. + Unprivileged, + + /// The privileged (TLS-protected) API endpoint. + Privileged, +} + +impl EndpointType { + /// Returns a human-readable name for this endpoint type. + pub fn name(&self) -> &'static str { + match self { + Self::Unprivileged => "unprivileged", + Self::Privileged => "privileged", + } + } +} + +/// API endpoint protocol. +/// +/// Identifies which application protocol the route should be exposed to. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum EndpointProtocol { + /// HTTP. + Http, + + /// gRPC. + Grpc, +} + +impl EndpointProtocol { + /// Returns a human-readable name for this endpoint protocol. + pub fn name(&self) -> &'static str { + match self { + Self::Http => "HTTP", + Self::Grpc => "gRPC", + } + } +} + +/// A set of dynamic API routes. +/// +/// Dynamic routes allow for processes to dynamically register/unregister themselves from running API endpoints, +/// adapting to changes in the process state and without requiring all routes to be known and declared upfront. +#[derive(Clone, Debug)] +pub struct DynamicRoute { + /// Which API endpoint these routes target. + endpoint_type: EndpointType, + + /// Which API protocol these routes target. + endpoint_protocol: EndpointProtocol, + + /// The routes to serve. + router: Router<()>, +} + +impl DynamicRoute { + /// Creates a dynamic HTTP route from the given API handler. + pub fn http(endpoint_type: EndpointType, handler: T) -> Self { + let router = handler.generate_routes().with_state(handler.generate_initial_state()); + Self::new(endpoint_type, EndpointProtocol::Http, router) + } + + /// Creates a dynamic gRPC route from the given Tonic routes. + pub fn grpc(endpoint_type: EndpointType, routes: Routes) -> Self { + let router = routes.prepare().into_axum_router(); + Self::new(endpoint_type, EndpointProtocol::Grpc, router) + } + + fn new(endpoint_type: EndpointType, endpoint_protocol: EndpointProtocol, router: Router<()>) -> Self { + Self { + endpoint_type, + endpoint_protocol, + router, + } + } + + /// Returns the type of endpoint these routes target. + pub fn endpoint_type(&self) -> EndpointType { + self.endpoint_type + } + + /// Returns the protocol of endpoint these routes target. + pub fn endpoint_protocol(&self) -> EndpointProtocol { + self.endpoint_protocol + } + + /// Consumes this route and returns the underlying router. + pub fn into_router(self) -> Router<()> { + self.router + } +} diff --git a/lib/saluki-app/Cargo.toml b/lib/saluki-app/Cargo.toml index 2a476c8d61..2fb89960a3 100644 --- a/lib/saluki-app/Cargo.toml +++ b/lib/saluki-app/Cargo.toml @@ -12,11 +12,14 @@ workspace = true tls-fips = ["saluki-tls/fips"] [dependencies] +arc-swap = { workspace = true } +async-trait = { workspace = true } axum = { workspace = true } bytesize = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } http = { workspace = true } +hyper = { workspace = true } iana-time-zone = { workspace = true } itoa = { workspace = true } memory-accounting = { workspace = true } diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs new file mode 100644 index 0000000000..2764b8c987 --- /dev/null +++ b/lib/saluki-app/src/dynamic_api.rs @@ -0,0 +1,281 @@ +//! Dynamic API server. +//! +//! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time, +//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and hot-swaps the inner HTTP and +//! gRPC routers behind [`ArcSwap`]s as routes are asserted or retracted. + +use std::{ + convert::Infallible, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use axum::{body::Body as AxumBody, Router}; +use http::Response; +use rcgen::{generate_simple_self_signed, CertifiedKey}; +use rustls::{pki_types::PrivateKeyDer, ServerConfig}; +use rustls_pki_types::PrivatePkcs8KeyDer; +use saluki_api::{DynamicRoute, EndpointProtocol, EndpointType}; +use saluki_common::collections::FastIndexMap; +use saluki_core::runtime::{ + state::{AssertionUpdate, DataspaceRegistry, Handle, Subscription}, + InitializationError, ProcessShutdown, Supervisable, SupervisorFuture, +}; +use saluki_error::GenericError; +use saluki_io::net::{ + listener::ConnectionOrientedListener, + server::{ + http::{ErrorHandle, HttpServer, ShutdownHandle}, + multiplex_service::MultiplexService, + }, + util::hyper::TowerToHyperService, + ListenAddress, +}; +use tokio::{pin, select}; +use tower::Service; +use tracing::{debug, info, warn}; + +/// A dynamic API server that can add and remove routes at runtime. +/// +/// `DynamicAPIBuilder` serves HTTP and gRPC on a given address, multiplexing both protocols on a single port. A +/// background event loop subscribes to the [`DataspaceRegistry`] for [`DynamicHttpRoute`] and [`DynamicGrpcRoute`] +/// assertions and retractions, and atomically swaps the inner routers as handlers are added or removed. +/// +/// ## Publisher protocol +/// +/// Any process that wants to dynamically register API routes must: +/// +/// 1. Build a `Router<()>` (for HTTP, or via `tonic::Routes::into_axum_router()` for gRPC). +/// 2. Assert a [`DynamicHttpRoute`] or [`DynamicGrpcRoute`] in the [`DataspaceRegistry`] under a [`Handle`]. +/// +/// To withdraw routes, retract the assertion from the [`DataspaceRegistry`]. +pub struct DynamicAPIBuilder { + endpoint_type: EndpointType, + listen_address: ListenAddress, + tls_config: Option, + dataspace_registry: DataspaceRegistry, +} + +impl DynamicAPIBuilder { + /// Creates a new `DynamicAPIBuilder` for the given endpoint type. + pub fn new( + endpoint_type: EndpointType, listen_address: ListenAddress, dataspace_registry: DataspaceRegistry, + ) -> Self { + Self { + endpoint_type, + listen_address, + tls_config: None, + dataspace_registry, + } + } + + /// Sets the TLS configuration for the server. + pub fn with_tls_config(mut self, config: ServerConfig) -> Self { + self.tls_config = Some(config); + self + } + + /// Sets the TLS configuration for the server based on a dynamically generated self-signed certificate. + pub fn with_self_signed_tls(self) -> Self { + let CertifiedKey { cert, key_pair } = generate_simple_self_signed(["localhost".to_owned()]).unwrap(); + let cert_chain = vec![cert.der().clone()]; + let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der())); + + let config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(cert_chain, key) + .unwrap(); + + self.with_tls_config(config) + } +} + +#[async_trait] +impl Supervisable for DynamicAPIBuilder { + fn name(&self) -> &str { + match self.endpoint_type { + EndpointType::Unprivileged => "dynamic-unprivileged-api", + EndpointType::Privileged => "dynamic-privileged-api", + } + } + + async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { + // Create dynamic inner routers for both HTTP and gRPC sides. + let (inner_http, outer_http) = create_dynamic_router(); + let (inner_grpc, outer_grpc) = create_dynamic_router(); + + // Subscribe to all dynamic route assertions. + let route_assertions = self.dataspace_registry.subscribe_all::(); + + // Bind the HTTP listener immediately so we fail fast on bind errors. + let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) + .await + .map_err(|e| InitializationError::Failed { source: e.into() })?; + + let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc)); + + let mut http_server = HttpServer::from_listener(listener, multiplexed_service); + if let Some(tls_config) = self.tls_config.clone() { + http_server = http_server.with_tls_config(tls_config); + } + let (shutdown_handle, error_handle) = http_server.listen(); + + let endpoint_type = self.endpoint_type; + let listen_address = self.listen_address.clone(); + + Ok(Box::pin(async move { + info!("Serving {} API on {}.", endpoint_type.name(), listen_address); + + run_event_loop( + inner_http, + inner_grpc, + route_assertions, + endpoint_type, + process_shutdown, + shutdown_handle, + error_handle, + ) + .await + })) + } +} + +/// A [`tower::Service`] that routes a request based on a dynamically-updated [`Router`]. +/// +/// When installed as the fallback service for a top-level [`Router`], `DynamicRouterService` dynamically routing +/// requests based on the current defined "inner" router, which itself can be hot-swapped at runtime. This allows for +/// seamless updates to the API endpoint routing without requiring a restart of the HTTP listener or complicated +/// configuration changes. +#[derive(Clone)] +struct DynamicRouterService { + inner_router: Arc>, +} + +impl DynamicRouterService { + fn from_inner(inner_router: &Arc>) -> Self { + Self { + inner_router: Arc::clone(inner_router), + } + } +} + +impl Service> for DynamicRouterService { + type Response = Response; + type Error = Infallible; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let mut router = Arc::unwrap_or_clone(self.inner_router.load_full()); + Box::pin(async move { router.call(request).await }) + } +} + +/// Runs the event loop that listens for route assertions/retractions and hot-swaps the inner routers. +async fn run_event_loop( + inner_http: Arc>, inner_grpc: Arc>, + mut route_assertions: Subscription, endpoint_type: EndpointType, + mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, +) -> Result<(), GenericError> { + let mut http_handlers = FastIndexMap::default(); + let mut grpc_handlers = FastIndexMap::default(); + + let shutdown = process_shutdown.wait_for_shutdown(); + pin!(shutdown); + pin!(error_handle); + + loop { + select! { + _ = &mut shutdown => { + debug!("Dynamic API shutting down."); + shutdown_handle.shutdown(); + break; + } + + maybe_err = &mut error_handle => { + if let Some(e) = maybe_err { + return Err(GenericError::from(e)); + } + break; + } + + maybe_update = route_assertions.recv() => { + let Some(update) = maybe_update else { + warn!("Route subscription channel closed."); + break; + }; + + let mut rebuild_http = false; + let mut rebuild_grpc = false; + + match update { + AssertionUpdate::Asserted(handle, route) => { + if route.endpoint_type() != endpoint_type { + continue; + } + + match route.endpoint_protocol() { + EndpointProtocol::Http => { + debug!(?handle, "Registering dynamic HTTP handler."); + http_handlers.insert(handle, route.into_router()); + + rebuild_http = true; + }, + EndpointProtocol::Grpc => { + debug!(?handle, "Registering dynamic gRPC handler."); + grpc_handlers.insert(handle, route.into_router()); + + rebuild_grpc = true; + }, + } + } + AssertionUpdate::Retracted(handle) => { + if http_handlers.swap_remove(&handle).is_some() { + debug!(?handle, "Withdrawing dynamic HTTP handler."); + rebuild_http = true; + } + + if grpc_handlers.swap_remove(&handle).is_some() { + debug!(?handle, "Withdrawing dynamic gRPC handler."); + rebuild_grpc = true; + } + } + } + + if rebuild_http { + rebuild_router(&inner_http, &http_handlers); + } + + if rebuild_grpc { + rebuild_router(&inner_grpc, &grpc_handlers); + } + } + } + } + + Ok(()) +} + +/// Creates a dynamic router pair: a swappable inner router and an outer router that delegates to it. +fn create_dynamic_router() -> (Arc>, Router) { + let inner = Arc::new(ArcSwap::from_pointee(Router::new())); + let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner)); + (inner, outer) +} + +/// Rebuilds the merged inner router from all currently-registered handlers and stores it in the [`ArcSwap`]. +fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap) { + let mut merged = Router::new(); + for router in handlers.values() { + merged = merged.merge(router.clone()); + } + inner_router.store(Arc::new(merged)); + debug!(handler_count = handlers.len(), "Rebuilt inner router."); +} diff --git a/lib/saluki-app/src/lib.rs b/lib/saluki-app/src/lib.rs index 71cd80d34f..f9dfcf75b4 100644 --- a/lib/saluki-app/src/lib.rs +++ b/lib/saluki-app/src/lib.rs @@ -8,6 +8,7 @@ pub mod api; pub mod bootstrap; pub mod config; +pub mod dynamic_api; pub mod logging; pub mod memory; pub mod metrics;