From c15f4980b2ed599a84c2f05db76dee49a09b073f Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Fri, 6 Feb 2026 08:37:38 -0500 Subject: [PATCH 1/6] enhancement(api): add dynamic API router based on runtime state --- Cargo.lock | 3 + lib/saluki-api/src/lib.rs | 32 ++++ lib/saluki-app/Cargo.toml | 3 + lib/saluki-app/src/dynamic_api.rs | 302 ++++++++++++++++++++++++++++++ lib/saluki-app/src/lib.rs | 1 + 5 files changed, 341 insertions(+) create mode 100644 lib/saluki-app/src/dynamic_api.rs diff --git a/Cargo.lock b/Cargo.lock index d0839fb1ea..110f0cb57d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3482,11 +3482,14 @@ dependencies = [ name = "saluki-app" version = "0.1.0" dependencies = [ + "arc-swap", + "async-trait", "axum", "bytesize", "chrono", "chrono-tz", "http", + "hyper", "iana-time-zone", "itoa", "memory-accounting", diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index e4524639e5..75cc842bf0 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -17,3 +17,35 @@ pub trait APIHandler { fn generate_initial_state(&self) -> Self::State; fn generate_routes(&self) -> Router; } + +/// Identifies which API endpoint a handler should be bound to. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum EndpointType { + /// The unprivileged (plain HTTP) API endpoint. + Unprivileged, + /// The privileged (TLS-protected) API endpoint. + Privileged, +} + +/// Describes whether a handler is being registered or withdrawn. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum InterestType { + /// Register a new set of routes. + Register, + /// Withdraw a previously-registered set of routes. + Withdraw, +} + +/// A notification that an API handler wants to register or withdraw routes for a specific endpoint. +/// +/// The `identifier` must match the resource registry identifier under which the corresponding `Router<()>` (with state +/// already applied) has been published. +#[derive(Clone, Debug)] +pub struct APIEndpointInterest { + /// Which API endpoint this interest targets. + pub endpoint: EndpointType, + /// Whether routes are being added or removed. + pub interest: InterestType, + /// The resource registry identifier for the `Router<()>` resource. + pub identifier: String, +} diff --git a/lib/saluki-app/Cargo.toml b/lib/saluki-app/Cargo.toml index 2a476c8d61..2fb89960a3 100644 --- a/lib/saluki-app/Cargo.toml +++ b/lib/saluki-app/Cargo.toml @@ -12,11 +12,14 @@ workspace = true tls-fips = ["saluki-tls/fips"] [dependencies] +arc-swap = { workspace = true } +async-trait = { workspace = true } axum = { workspace = true } bytesize = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } http = { workspace = true } +hyper = { workspace = true } iana-time-zone = { workspace = true } itoa = { workspace = true } memory-accounting = { workspace = true } diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs new file mode 100644 index 0000000000..db44349049 --- /dev/null +++ b/lib/saluki-app/src/dynamic_api.rs @@ -0,0 +1,302 @@ +//! Dynamic API server. +//! +//! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time, +//! `DynamicAPIBuilder` subscribes to runtime notifications via the pub/sub registry and acquires route resources from +//! the resource registry, hot-swapping the inner router behind an [`ArcSwap`] as handlers are registered or withdrawn. + +use std::{ + convert::Infallible, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use axum::{body::Body as AxumBody, Router}; +use http::Response; +use rcgen::{generate_simple_self_signed, CertifiedKey}; +use rustls::{pki_types::PrivateKeyDer, ServerConfig}; +use rustls_pki_types::PrivatePkcs8KeyDer; +use saluki_api::{APIEndpointInterest, EndpointType, InterestType}; +use saluki_common::collections::FastIndexMap; +use saluki_core::runtime::{ + state::{PubSubRegistry, ResourceRegistry, Subscription}, + InitializationError, ProcessShutdown, Supervisable, SupervisorFuture, +}; +use saluki_error::GenericError; +use saluki_io::net::{ + listener::ConnectionOrientedListener, + server::{ + http::{ErrorHandle, HttpServer, ShutdownHandle}, + multiplex_service::MultiplexService, + }, + util::hyper::TowerToHyperService, + ListenAddress, +}; +use tokio::{pin, select}; +use tonic::service::RoutesBuilder; +use tower::Service; +use tracing::{debug, info, warn}; + +/// The well-known pub/sub topic for API endpoint interest notifications. +const API_ENDPOINT_INTEREST_TOPIC: &str = "api-endpoint-interest"; + +/// A dynamic API server that can add and remove route sets at runtime. +/// +/// `DynamicAPIBuilder` serves HTTP on a given address using an outer [`Router`] whose fallback delegates every request +/// to an inner [`Router`] stored behind an [`ArcSwap`]. A background event loop subscribes to the [`PubSubRegistry`] +/// for [`APIEndpointInterest`] notifications, acquires `Router<()>` resources from the [`ResourceRegistry`], and +/// atomically swaps the inner router as handlers register or withdraw. +/// +/// ## Publisher protocol +/// +/// Any worker that wants to dynamically register API routes must: +/// +/// 1. Build a `Router<()>` with state applied (i.e. call `handler.generate_routes().with_state(handler.generate_initial_state())`) +/// 2. Publish the `Router<()>` to the [`ResourceRegistry`] under a string identifier +/// 3. Publish an [`APIEndpointInterest`] to the [`PubSubRegistry`] on the `"api-endpoint-interest"` topic +/// +/// The resource **must** be published before the interest. If the interest arrives before the resource, the +/// registration will be silently skipped with a warning. +pub struct DynamicAPIBuilder { + endpoint_type: EndpointType, + listen_address: ListenAddress, + tls_config: Option, + pubsub_registry: PubSubRegistry, + resource_registry: ResourceRegistry, +} + +impl DynamicAPIBuilder { + /// Creates a new `DynamicAPIBuilder` for the given endpoint type. + pub fn new( + endpoint_type: EndpointType, listen_address: ListenAddress, pubsub_registry: PubSubRegistry, + resource_registry: ResourceRegistry, + ) -> Self { + Self { + endpoint_type, + listen_address, + tls_config: None, + pubsub_registry, + resource_registry, + } + } + + /// Sets the TLS configuration for the server. + pub fn with_tls_config(mut self, config: ServerConfig) -> Self { + self.tls_config = Some(config); + self + } + + /// Sets the TLS configuration for the server based on a dynamically generated self-signed certificate. + pub fn with_self_signed_tls(self) -> Self { + let CertifiedKey { cert, key_pair } = generate_simple_self_signed(["localhost".to_owned()]).unwrap(); + let cert_chain = vec![cert.der().clone()]; + let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der())); + + let config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(cert_chain, key) + .unwrap(); + + self.with_tls_config(config) + } +} + +#[async_trait] +impl Supervisable for DynamicAPIBuilder { + fn name(&self) -> &str { + match self.endpoint_type { + EndpointType::Unprivileged => "dynamic-unprivileged-api", + EndpointType::Privileged => "dynamic-privileged-api", + } + } + + async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { + // Create the ArcSwap holding the initial (empty) inner router. + let inner_router: Arc> = Arc::new(ArcSwap::from_pointee(Router::new())); + + // Build the outer "shell" router whose fallback delegates to whatever is currently in the ArcSwap. + let outer_router = build_outer_router(Arc::clone(&inner_router)); + + // Subscribe to the pub/sub topic for APIEndpointInterest notifications. + let subscription: Subscription = + self.pubsub_registry.subscribe(API_ENDPOINT_INTEREST_TOPIC); + + // Bind the HTTP listener immediately so we fail fast on bind errors. + let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) + .await + .map_err(|e| InitializationError::Failed { source: e.into() })?; + + // Wrap the outer router in the same MultiplexService + TowerToHyperService pattern used by APIBuilder::serve. + let multiplexed_service = TowerToHyperService::new(MultiplexService::new( + outer_router, + RoutesBuilder::default().routes().into_axum_router(), + )); + + let mut http_server = HttpServer::from_listener(listener, multiplexed_service); + if let Some(tls_config) = self.tls_config.clone() { + http_server = http_server.with_tls_config(tls_config); + } + let (shutdown_handle, error_handle) = http_server.listen(); + + let resource_registry = self.resource_registry.clone(); + let endpoint_type = self.endpoint_type.clone(); + let listen_address = self.listen_address.clone(); + + Ok(Box::pin(async move { + info!( + "Serving dynamic {} API on {}.", + endpoint_name(&endpoint_type), + listen_address + ); + + run_event_loop( + inner_router, + subscription, + resource_registry, + endpoint_type, + process_shutdown, + shutdown_handle, + error_handle, + ) + .await + })) + } +} + +/// Builds the outer shell router with a fallback service that delegates to the current inner router. +fn build_outer_router(inner: Arc>) -> Router { + Router::new().fallback_service(ArcSwapRouterService { inner }) +} + +/// A [`tower::Service`] that reads the current [`Router`] from an [`ArcSwap`] on every request. +/// +/// This is used as the fallback service for the outer shell router, allowing the inner route set to be hot-swapped at +/// runtime without restarting the HTTP listener. +#[derive(Clone)] +struct ArcSwapRouterService { + inner: Arc>, +} + +impl Service> for ArcSwapRouterService { + type Response = Response; + type Error = Infallible; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + // ArcSwap::load_full() is wait-free; Router::clone() is cheap (Arc internals). + let router = self.inner.load_full(); + Box::pin(async move { + let mut svc = (*router).clone(); + // Router<()> implements Service, Response = Response, Error = Infallible>. + svc.call(request).await + }) + } +} + +/// Runs the event loop that listens for API endpoint interest notifications and hot-swaps the inner router. +async fn run_event_loop( + inner_router: Arc>, mut subscription: Subscription, + resource_registry: ResourceRegistry, endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, + shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, +) -> Result<(), GenericError> { + let mut registered_handlers: FastIndexMap = FastIndexMap::default(); + + let shutdown = process_shutdown.wait_for_shutdown(); + pin!(shutdown); + pin!(error_handle); + + loop { + select! { + _ = &mut shutdown => { + debug!("Dynamic API shutting down."); + shutdown_handle.shutdown(); + break; + } + + maybe_err = &mut error_handle => { + if let Some(e) = maybe_err { + return Err(GenericError::from(e)); + } + break; + } + + maybe_interest = subscription.recv() => { + let Some(interest) = maybe_interest else { + warn!("API endpoint interest channel closed."); + break; + }; + + // Only process interests targeting our endpoint type. + if interest.endpoint != endpoint_type { + continue; + } + + match interest.interest { + InterestType::Register => { + // Acquire the Router<()> from the resource registry (non-blocking). + // The publisher must have published the resource before sending the interest. + match resource_registry.try_acquire::(interest.identifier.as_str()) { + Some(guard) => { + let router: Router = (*guard).clone(); + drop(guard); + + info!( + identifier = %interest.identifier, + "Registering dynamic API handler.", + ); + registered_handlers.insert(interest.identifier, router); + rebuild_router(&inner_router, ®istered_handlers); + } + None => { + warn!( + identifier = %interest.identifier, + "Received Register interest but router resource not found in registry. Ignoring.", + ); + } + } + } + InterestType::Withdraw => { + if registered_handlers.swap_remove(&interest.identifier).is_some() { + info!( + identifier = %interest.identifier, + "Withdrawing dynamic API handler.", + ); + rebuild_router(&inner_router, ®istered_handlers); + } else { + warn!( + identifier = %interest.identifier, + "Received Withdraw interest but no handler was registered with this identifier.", + ); + } + } + } + } + } + } + + Ok(()) +} + +/// Rebuilds the merged inner router from all currently-registered handlers and stores it in the [`ArcSwap`]. +fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap) { + let mut merged = Router::new(); + for router in handlers.values() { + merged = merged.merge(router.clone()); + } + inner_router.store(Arc::new(merged)); + debug!(handler_count = handlers.len(), "Rebuilt inner router."); +} + +fn endpoint_name(endpoint_type: &EndpointType) -> &'static str { + match endpoint_type { + EndpointType::Unprivileged => "unprivileged", + EndpointType::Privileged => "privileged", + } +} diff --git a/lib/saluki-app/src/lib.rs b/lib/saluki-app/src/lib.rs index 71cd80d34f..f9dfcf75b4 100644 --- a/lib/saluki-app/src/lib.rs +++ b/lib/saluki-app/src/lib.rs @@ -8,6 +8,7 @@ pub mod api; pub mod bootstrap; pub mod config; +pub mod dynamic_api; pub mod logging; pub mod memory; pub mod metrics; From df7d3ca5e69fd3d960a0d0fcaffedb2b884a6170 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sun, 8 Feb 2026 15:29:27 -0500 Subject: [PATCH 2/6] update to dataspace stuff --- lib/saluki-api/src/lib.rs | 20 ++------ lib/saluki-app/src/dynamic_api.rs | 84 ++++++++++++++----------------- 2 files changed, 43 insertions(+), 61 deletions(-) diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index 75cc842bf0..2d3235b180 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -27,25 +27,13 @@ pub enum EndpointType { Privileged, } -/// Describes whether a handler is being registered or withdrawn. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum InterestType { - /// Register a new set of routes. - Register, - /// Withdraw a previously-registered set of routes. - Withdraw, -} - -/// A notification that an API handler wants to register or withdraw routes for a specific endpoint. +/// Describes which API endpoint a dynamic handler targets. /// -/// The `identifier` must match the resource registry identifier under which the corresponding `Router<()>` (with state -/// already applied) has been published. +/// Publishers assert this value in the [`DataspaceRegistry`] alongside a `Router<()>` resource published to the +/// [`ResourceRegistry`] under the same [`Handle`]. The dynamic API server observes assertions and retractions via a +/// wildcard subscription, using the handle to look up the corresponding router resource. #[derive(Clone, Debug)] pub struct APIEndpointInterest { /// Which API endpoint this interest targets. pub endpoint: EndpointType, - /// Whether routes are being added or removed. - pub interest: InterestType, - /// The resource registry identifier for the `Router<()>` resource. - pub identifier: String, } diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs index db44349049..3ca19ae43a 100644 --- a/lib/saluki-app/src/dynamic_api.rs +++ b/lib/saluki-app/src/dynamic_api.rs @@ -1,8 +1,8 @@ //! Dynamic API server. //! //! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time, -//! `DynamicAPIBuilder` subscribes to runtime notifications via the pub/sub registry and acquires route resources from -//! the resource registry, hot-swapping the inner router behind an [`ArcSwap`] as handlers are registered or withdrawn. +//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and acquires route resources from +//! the resource registry, hot-swapping the inner router behind an [`ArcSwap`] as handlers are asserted or retracted. use std::{ convert::Infallible, @@ -19,10 +19,10 @@ use http::Response; use rcgen::{generate_simple_self_signed, CertifiedKey}; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pki_types::PrivatePkcs8KeyDer; -use saluki_api::{APIEndpointInterest, EndpointType, InterestType}; +use saluki_api::{APIEndpointInterest, EndpointType}; use saluki_common::collections::FastIndexMap; use saluki_core::runtime::{ - state::{PubSubRegistry, ResourceRegistry, Subscription}, + state::{AssertionUpdate, DataspaceRegistry, Handle, ResourceRegistry, WildcardSubscription}, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture, }; use saluki_error::GenericError; @@ -40,45 +40,44 @@ use tonic::service::RoutesBuilder; use tower::Service; use tracing::{debug, info, warn}; -/// The well-known pub/sub topic for API endpoint interest notifications. -const API_ENDPOINT_INTEREST_TOPIC: &str = "api-endpoint-interest"; - /// A dynamic API server that can add and remove route sets at runtime. /// /// `DynamicAPIBuilder` serves HTTP on a given address using an outer [`Router`] whose fallback delegates every request -/// to an inner [`Router`] stored behind an [`ArcSwap`]. A background event loop subscribes to the [`PubSubRegistry`] -/// for [`APIEndpointInterest`] notifications, acquires `Router<()>` resources from the [`ResourceRegistry`], and -/// atomically swaps the inner router as handlers register or withdraw. +/// to an inner [`Router`] stored behind an [`ArcSwap`]. A background event loop subscribes to the [`DataspaceRegistry`] +/// for [`APIEndpointInterest`] assertions and retractions, acquires `Router<()>` resources from the +/// [`ResourceRegistry`], and atomically swaps the inner router as handlers are added or removed. /// /// ## Publisher protocol /// /// Any worker that wants to dynamically register API routes must: /// /// 1. Build a `Router<()>` with state applied (i.e. call `handler.generate_routes().with_state(handler.generate_initial_state())`) -/// 2. Publish the `Router<()>` to the [`ResourceRegistry`] under a string identifier -/// 3. Publish an [`APIEndpointInterest`] to the [`PubSubRegistry`] on the `"api-endpoint-interest"` topic +/// 2. Publish the `Router<()>` to the [`ResourceRegistry`] under a [`Handle`] +/// 3. Assert an [`APIEndpointInterest`] in the [`DataspaceRegistry`] under the same [`Handle`] /// -/// The resource **must** be published before the interest. If the interest arrives before the resource, the +/// The resource **must** be published before the assertion. If the assertion arrives before the resource, the /// registration will be silently skipped with a warning. +/// +/// To withdraw routes, retract the [`APIEndpointInterest`] from the [`DataspaceRegistry`]. pub struct DynamicAPIBuilder { endpoint_type: EndpointType, listen_address: ListenAddress, tls_config: Option, - pubsub_registry: PubSubRegistry, + dataspace_registry: DataspaceRegistry, resource_registry: ResourceRegistry, } impl DynamicAPIBuilder { /// Creates a new `DynamicAPIBuilder` for the given endpoint type. pub fn new( - endpoint_type: EndpointType, listen_address: ListenAddress, pubsub_registry: PubSubRegistry, + endpoint_type: EndpointType, listen_address: ListenAddress, dataspace_registry: DataspaceRegistry, resource_registry: ResourceRegistry, ) -> Self { Self { endpoint_type, listen_address, tls_config: None, - pubsub_registry, + dataspace_registry, resource_registry, } } @@ -120,9 +119,9 @@ impl Supervisable for DynamicAPIBuilder { // Build the outer "shell" router whose fallback delegates to whatever is currently in the ArcSwap. let outer_router = build_outer_router(Arc::clone(&inner_router)); - // Subscribe to the pub/sub topic for APIEndpointInterest notifications. - let subscription: Subscription = - self.pubsub_registry.subscribe(API_ENDPOINT_INTEREST_TOPIC); + // Subscribe to all APIEndpointInterest assertions and retractions in the dataspace. + let subscription: WildcardSubscription = + self.dataspace_registry.subscribe_all::(); // Bind the HTTP listener immediately so we fail fast on bind errors. let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) @@ -200,13 +199,13 @@ impl Service> for ArcSwapRouterService { } } -/// Runs the event loop that listens for API endpoint interest notifications and hot-swaps the inner router. +/// Runs the event loop that listens for API endpoint interest assertions/retractions and hot-swaps the inner router. async fn run_event_loop( - inner_router: Arc>, mut subscription: Subscription, + inner_router: Arc>, mut subscription: WildcardSubscription, resource_registry: ResourceRegistry, endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, ) -> Result<(), GenericError> { - let mut registered_handlers: FastIndexMap = FastIndexMap::default(); + let mut registered_handlers: FastIndexMap = FastIndexMap::default(); let shutdown = process_shutdown.wait_for_shutdown(); pin!(shutdown); @@ -227,53 +226,48 @@ async fn run_event_loop( break; } - maybe_interest = subscription.recv() => { - let Some(interest) = maybe_interest else { + maybe_update = subscription.recv() => { + let Some((handle, update)) = maybe_update else { warn!("API endpoint interest channel closed."); break; }; - // Only process interests targeting our endpoint type. - if interest.endpoint != endpoint_type { - continue; - } + match update { + AssertionUpdate::Asserted(interest) => { + // Only process interests targeting our endpoint type. + if interest.endpoint != endpoint_type { + continue; + } - match interest.interest { - InterestType::Register => { // Acquire the Router<()> from the resource registry (non-blocking). - // The publisher must have published the resource before sending the interest. - match resource_registry.try_acquire::(interest.identifier.as_str()) { + // The publisher must have published the resource before asserting the interest. + match resource_registry.try_acquire::(handle) { Some(guard) => { let router: Router = (*guard).clone(); drop(guard); info!( - identifier = %interest.identifier, + handle = ?handle, "Registering dynamic API handler.", ); - registered_handlers.insert(interest.identifier, router); + registered_handlers.insert(handle, router); rebuild_router(&inner_router, ®istered_handlers); } None => { warn!( - identifier = %interest.identifier, - "Received Register interest but router resource not found in registry. Ignoring.", + handle = ?handle, + "Received assertion but router resource not found in registry. Ignoring.", ); } } } - InterestType::Withdraw => { - if registered_handlers.swap_remove(&interest.identifier).is_some() { + AssertionUpdate::Retracted => { + if registered_handlers.swap_remove(&handle).is_some() { info!( - identifier = %interest.identifier, + handle = ?handle, "Withdrawing dynamic API handler.", ); rebuild_router(&inner_router, ®istered_handlers); - } else { - warn!( - identifier = %interest.identifier, - "Received Withdraw interest but no handler was registered with this identifier.", - ); } } } @@ -285,7 +279,7 @@ async fn run_event_loop( } /// Rebuilds the merged inner router from all currently-registered handlers and stores it in the [`ArcSwap`]. -fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap) { +fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap) { let mut merged = Router::new(); for router in handlers.values() { merged = merged.merge(router.clone()); From 2205dbb6fb45c1551707d58f14e2d73e5da70351 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sun, 8 Feb 2026 22:14:27 -0500 Subject: [PATCH 3/6] some manual cleanup --- lib/saluki-app/src/dynamic_api.rs | 61 +++++++++++++++---------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs index 3ca19ae43a..6d1fda52a3 100644 --- a/lib/saluki-app/src/dynamic_api.rs +++ b/lib/saluki-app/src/dynamic_api.rs @@ -22,7 +22,7 @@ use rustls_pki_types::PrivatePkcs8KeyDer; use saluki_api::{APIEndpointInterest, EndpointType}; use saluki_common::collections::FastIndexMap; use saluki_core::runtime::{ - state::{AssertionUpdate, DataspaceRegistry, Handle, ResourceRegistry, WildcardSubscription}, + state::{AssertionUpdate, DataspaceRegistry, Handle, ResourceRegistry, Subscription}, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture, }; use saluki_error::GenericError; @@ -113,15 +113,12 @@ impl Supervisable for DynamicAPIBuilder { } async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { - // Create the ArcSwap holding the initial (empty) inner router. - let inner_router: Arc> = Arc::new(ArcSwap::from_pointee(Router::new())); + // Create our default, empty router. + let inner_router = Arc::new(ArcSwap::from_pointee(Router::new())); + let outer_router = Router::new().fallback_service(DynamicRouterService::from_inner(&inner_router)); - // Build the outer "shell" router whose fallback delegates to whatever is currently in the ArcSwap. - let outer_router = build_outer_router(Arc::clone(&inner_router)); - - // Subscribe to all APIEndpointInterest assertions and retractions in the dataspace. - let subscription: WildcardSubscription = - self.dataspace_registry.subscribe_all::(); + // Register our interest in all asserted API endpoints. + let endpoint_subscriptions = self.dataspace_registry.subscribe_all::(); // Bind the HTTP listener immediately so we fail fast on bind errors. let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) @@ -153,7 +150,7 @@ impl Supervisable for DynamicAPIBuilder { run_event_loop( inner_router, - subscription, + endpoint_subscriptions, resource_registry, endpoint_type, process_shutdown, @@ -165,21 +162,26 @@ impl Supervisable for DynamicAPIBuilder { } } -/// Builds the outer shell router with a fallback service that delegates to the current inner router. -fn build_outer_router(inner: Arc>) -> Router { - Router::new().fallback_service(ArcSwapRouterService { inner }) -} - -/// A [`tower::Service`] that reads the current [`Router`] from an [`ArcSwap`] on every request. +/// A [`tower::Service`] that routes a request based on a dynamically-updated [`Router`]. /// -/// This is used as the fallback service for the outer shell router, allowing the inner route set to be hot-swapped at -/// runtime without restarting the HTTP listener. +/// When installed as the fallback service for a top-level [`Router`], `DynamicRouterService` dynamically routing +/// requests based on the current defined "inner" router, which itself can be hot-swapped at runtime. This allows for +/// seamless updates to the API endpoint routing without requiring a restart of the HTTP listener or complicated +/// configuration changes. #[derive(Clone)] -struct ArcSwapRouterService { - inner: Arc>, +struct DynamicRouterService { + inner_router: Arc>, +} + +impl DynamicRouterService { + fn from_inner(inner_router: &Arc>) -> Self { + Self { + inner_router: Arc::clone(inner_router), + } + } } -impl Service> for ArcSwapRouterService { +impl Service> for DynamicRouterService { type Response = Response; type Error = Infallible; type Future = Pin> + Send>>; @@ -189,19 +191,14 @@ impl Service> for ArcSwapRouterService { } fn call(&mut self, request: http::Request) -> Self::Future { - // ArcSwap::load_full() is wait-free; Router::clone() is cheap (Arc internals). - let router = self.inner.load_full(); - Box::pin(async move { - let mut svc = (*router).clone(); - // Router<()> implements Service, Response = Response, Error = Infallible>. - svc.call(request).await - }) + let mut router = Arc::unwrap_or_clone(self.inner_router.load_full()); + Box::pin(async move { router.call(request).await }) } } /// Runs the event loop that listens for API endpoint interest assertions/retractions and hot-swaps the inner router. async fn run_event_loop( - inner_router: Arc>, mut subscription: WildcardSubscription, + inner_router: Arc>, mut subscription: Subscription, resource_registry: ResourceRegistry, endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, ) -> Result<(), GenericError> { @@ -227,13 +224,13 @@ async fn run_event_loop( } maybe_update = subscription.recv() => { - let Some((handle, update)) = maybe_update else { + let Some(update) = maybe_update else { warn!("API endpoint interest channel closed."); break; }; match update { - AssertionUpdate::Asserted(interest) => { + AssertionUpdate::Asserted(handle, interest) => { // Only process interests targeting our endpoint type. if interest.endpoint != endpoint_type { continue; @@ -261,7 +258,7 @@ async fn run_event_loop( } } } - AssertionUpdate::Retracted => { + AssertionUpdate::Retracted(handle) => { if registered_handlers.swap_remove(&handle).is_some() { info!( handle = ?handle, From 3522083773d10c21429b707f7fd4bad0472e0620 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sun, 8 Feb 2026 22:42:28 -0500 Subject: [PATCH 4/6] spit and polish --- lib/saluki-api/src/lib.rs | 35 +++++-- lib/saluki-app/src/dynamic_api.rs | 155 ++++++++++++++---------------- 2 files changed, 101 insertions(+), 89 deletions(-) diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index 2d3235b180..b9cc5deece 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -27,13 +27,36 @@ pub enum EndpointType { Privileged, } -/// Describes which API endpoint a dynamic handler targets. +impl EndpointType { + /// Returns a human-readable name for this endpoint type. + pub fn name(&self) -> &'static str { + match self { + Self::Unprivileged => "unprivileged", + Self::Privileged => "privileged", + } + } +} + +/// A dynamically-registered HTTP route set, asserted into the dataspace registry. +/// +/// Publishers assert a `DynamicHttpRoute` under a [`Handle`] to register HTTP routes with a dynamic API server. The +/// server observes assertions and retractions via a wildcard subscription and hot-swaps its inner HTTP router. +#[derive(Clone, Debug)] +pub struct DynamicHttpRoute { + /// Which API endpoint these routes target. + pub endpoint: EndpointType, + /// The HTTP routes to serve. + pub router: Router<()>, +} + +/// A dynamically-registered gRPC route set, asserted into the dataspace registry. /// -/// Publishers assert this value in the [`DataspaceRegistry`] alongside a `Router<()>` resource published to the -/// [`ResourceRegistry`] under the same [`Handle`]. The dynamic API server observes assertions and retractions via a -/// wildcard subscription, using the handle to look up the corresponding router resource. +/// Publishers assert a `DynamicGrpcRoute` under a [`Handle`] to register gRPC routes with a dynamic API server. The +/// router should be pre-converted from tonic routes (via [`tonic::Routes::into_axum_router`]). #[derive(Clone, Debug)] -pub struct APIEndpointInterest { - /// Which API endpoint this interest targets. +pub struct DynamicGrpcRoute { + /// Which API endpoint these routes target. pub endpoint: EndpointType, + /// The gRPC routes to serve, as an axum router. + pub router: Router<()>, } diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs index 6d1fda52a3..7fa0ae3232 100644 --- a/lib/saluki-app/src/dynamic_api.rs +++ b/lib/saluki-app/src/dynamic_api.rs @@ -1,8 +1,8 @@ //! Dynamic API server. //! //! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time, -//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and acquires route resources from -//! the resource registry, hot-swapping the inner router behind an [`ArcSwap`] as handlers are asserted or retracted. +//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and hot-swaps the inner HTTP and +//! gRPC routers behind [`ArcSwap`]s as routes are asserted or retracted. use std::{ convert::Infallible, @@ -19,10 +19,10 @@ use http::Response; use rcgen::{generate_simple_self_signed, CertifiedKey}; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pki_types::PrivatePkcs8KeyDer; -use saluki_api::{APIEndpointInterest, EndpointType}; +use saluki_api::{DynamicGrpcRoute, DynamicHttpRoute, EndpointType}; use saluki_common::collections::FastIndexMap; use saluki_core::runtime::{ - state::{AssertionUpdate, DataspaceRegistry, Handle, ResourceRegistry, Subscription}, + state::{AssertionUpdate, DataspaceRegistry, Handle, Subscription}, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture, }; use saluki_error::GenericError; @@ -36,49 +36,40 @@ use saluki_io::net::{ ListenAddress, }; use tokio::{pin, select}; -use tonic::service::RoutesBuilder; use tower::Service; use tracing::{debug, info, warn}; -/// A dynamic API server that can add and remove route sets at runtime. +/// A dynamic API server that can add and remove routes at runtime. /// -/// `DynamicAPIBuilder` serves HTTP on a given address using an outer [`Router`] whose fallback delegates every request -/// to an inner [`Router`] stored behind an [`ArcSwap`]. A background event loop subscribes to the [`DataspaceRegistry`] -/// for [`APIEndpointInterest`] assertions and retractions, acquires `Router<()>` resources from the -/// [`ResourceRegistry`], and atomically swaps the inner router as handlers are added or removed. +/// `DynamicAPIBuilder` serves HTTP and gRPC on a given address, multiplexing both protocols on a single port. A +/// background event loop subscribes to the [`DataspaceRegistry`] for [`DynamicHttpRoute`] and [`DynamicGrpcRoute`] +/// assertions and retractions, and atomically swaps the inner routers as handlers are added or removed. /// /// ## Publisher protocol /// -/// Any worker that wants to dynamically register API routes must: +/// Any process that wants to dynamically register API routes must: /// -/// 1. Build a `Router<()>` with state applied (i.e. call `handler.generate_routes().with_state(handler.generate_initial_state())`) -/// 2. Publish the `Router<()>` to the [`ResourceRegistry`] under a [`Handle`] -/// 3. Assert an [`APIEndpointInterest`] in the [`DataspaceRegistry`] under the same [`Handle`] +/// 1. Build a `Router<()>` (for HTTP, or via `tonic::Routes::into_axum_router()` for gRPC). +/// 2. Assert a [`DynamicHttpRoute`] or [`DynamicGrpcRoute`] in the [`DataspaceRegistry`] under a [`Handle`]. /// -/// The resource **must** be published before the assertion. If the assertion arrives before the resource, the -/// registration will be silently skipped with a warning. -/// -/// To withdraw routes, retract the [`APIEndpointInterest`] from the [`DataspaceRegistry`]. +/// To withdraw routes, retract the assertion from the [`DataspaceRegistry`]. pub struct DynamicAPIBuilder { endpoint_type: EndpointType, listen_address: ListenAddress, tls_config: Option, dataspace_registry: DataspaceRegistry, - resource_registry: ResourceRegistry, } impl DynamicAPIBuilder { /// Creates a new `DynamicAPIBuilder` for the given endpoint type. pub fn new( endpoint_type: EndpointType, listen_address: ListenAddress, dataspace_registry: DataspaceRegistry, - resource_registry: ResourceRegistry, ) -> Self { Self { endpoint_type, listen_address, tls_config: None, dataspace_registry, - resource_registry, } } @@ -113,23 +104,20 @@ impl Supervisable for DynamicAPIBuilder { } async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result { - // Create our default, empty router. - let inner_router = Arc::new(ArcSwap::from_pointee(Router::new())); - let outer_router = Router::new().fallback_service(DynamicRouterService::from_inner(&inner_router)); + // Create dynamic inner routers for both HTTP and gRPC sides. + let (inner_http, outer_http) = create_dynamic_router(); + let (inner_grpc, outer_grpc) = create_dynamic_router(); - // Register our interest in all asserted API endpoints. - let endpoint_subscriptions = self.dataspace_registry.subscribe_all::(); + // Subscribe to both HTTP and gRPC route assertions. + let http_subscription = self.dataspace_registry.subscribe_all::(); + let grpc_subscription = self.dataspace_registry.subscribe_all::(); // Bind the HTTP listener immediately so we fail fast on bind errors. let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) .await .map_err(|e| InitializationError::Failed { source: e.into() })?; - // Wrap the outer router in the same MultiplexService + TowerToHyperService pattern used by APIBuilder::serve. - let multiplexed_service = TowerToHyperService::new(MultiplexService::new( - outer_router, - RoutesBuilder::default().routes().into_axum_router(), - )); + let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc)); let mut http_server = HttpServer::from_listener(listener, multiplexed_service); if let Some(tls_config) = self.tls_config.clone() { @@ -137,21 +125,17 @@ impl Supervisable for DynamicAPIBuilder { } let (shutdown_handle, error_handle) = http_server.listen(); - let resource_registry = self.resource_registry.clone(); let endpoint_type = self.endpoint_type.clone(); let listen_address = self.listen_address.clone(); Ok(Box::pin(async move { - info!( - "Serving dynamic {} API on {}.", - endpoint_name(&endpoint_type), - listen_address - ); + info!("Serving {} API on {}.", endpoint_type.name(), listen_address); run_event_loop( - inner_router, - endpoint_subscriptions, - resource_registry, + inner_http, + inner_grpc, + http_subscription, + grpc_subscription, endpoint_type, process_shutdown, shutdown_handle, @@ -196,13 +180,15 @@ impl Service> for DynamicRouterService { } } -/// Runs the event loop that listens for API endpoint interest assertions/retractions and hot-swaps the inner router. +/// Runs the event loop that listens for route assertions/retractions and hot-swaps the inner routers. async fn run_event_loop( - inner_router: Arc>, mut subscription: Subscription, - resource_registry: ResourceRegistry, endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, - shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, + inner_http: Arc>, inner_grpc: Arc>, + mut http_subscription: Subscription, mut grpc_subscription: Subscription, + endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, + error_handle: ErrorHandle, ) -> Result<(), GenericError> { - let mut registered_handlers: FastIndexMap = FastIndexMap::default(); + let mut http_handlers = FastIndexMap::default(); + let mut grpc_handlers = FastIndexMap::default(); let shutdown = process_shutdown.wait_for_shutdown(); pin!(shutdown); @@ -223,51 +209,54 @@ async fn run_event_loop( break; } - maybe_update = subscription.recv() => { + maybe_update = http_subscription.recv() => { let Some(update) = maybe_update else { - warn!("API endpoint interest channel closed."); + warn!("HTTP route subscription channel closed."); break; }; match update { - AssertionUpdate::Asserted(handle, interest) => { - // Only process interests targeting our endpoint type. - if interest.endpoint != endpoint_type { + AssertionUpdate::Asserted(handle, route) => { + if route.endpoint != endpoint_type { continue; } - // Acquire the Router<()> from the resource registry (non-blocking). - // The publisher must have published the resource before asserting the interest. - match resource_registry.try_acquire::(handle) { - Some(guard) => { - let router: Router = (*guard).clone(); - drop(guard); - - info!( - handle = ?handle, - "Registering dynamic API handler.", - ); - registered_handlers.insert(handle, router); - rebuild_router(&inner_router, ®istered_handlers); - } - None => { - warn!( - handle = ?handle, - "Received assertion but router resource not found in registry. Ignoring.", - ); - } + debug!(?handle, "Registering dynamic HTTP handler."); + http_handlers.insert(handle, route.router); + } + AssertionUpdate::Retracted(handle) => { + if http_handlers.swap_remove(&handle).is_some() { + debug!(?handle, "Withdrawing dynamic HTTP handler."); + } + } + } + + rebuild_router(&inner_http, &http_handlers); + } + + maybe_update = grpc_subscription.recv() => { + let Some(update) = maybe_update else { + warn!("gRPC route subscription channel closed."); + break; + }; + + match update { + AssertionUpdate::Asserted(handle, route) => { + if route.endpoint != endpoint_type { + continue; } + + info!(handle = ?handle, "Registering dynamic gRPC handler."); + grpc_handlers.insert(handle, route.router); } AssertionUpdate::Retracted(handle) => { - if registered_handlers.swap_remove(&handle).is_some() { - info!( - handle = ?handle, - "Withdrawing dynamic API handler.", - ); - rebuild_router(&inner_router, ®istered_handlers); + if grpc_handlers.swap_remove(&handle).is_some() { + info!(handle = ?handle, "Withdrawing dynamic gRPC handler."); } } } + + rebuild_router(&inner_grpc, &grpc_handlers); } } } @@ -275,6 +264,13 @@ async fn run_event_loop( Ok(()) } +/// Creates a dynamic router pair: a swappable inner router and an outer router that delegates to it. +fn create_dynamic_router() -> (Arc>, Router) { + let inner = Arc::new(ArcSwap::from_pointee(Router::new())); + let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner)); + (inner, outer) +} + /// Rebuilds the merged inner router from all currently-registered handlers and stores it in the [`ArcSwap`]. fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap) { let mut merged = Router::new(); @@ -284,10 +280,3 @@ fn rebuild_router(inner_router: &Arc>, handlers: &FastIndexMap &'static str { - match endpoint_type { - EndpointType::Unprivileged => "unprivileged", - EndpointType::Privileged => "privileged", - } -} From b47a60094d3284b13f58c17038b5efbbba4f94cf Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sat, 21 Feb 2026 14:11:06 -0500 Subject: [PATCH 5/6] expose helper for creating dynamic HTTP route --- lib/saluki-api/src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index b9cc5deece..f469d10c30 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -49,6 +49,15 @@ pub struct DynamicHttpRoute { pub router: Router<()>, } +impl DynamicHttpRoute { + pub fn unprivilged(handler: T) -> Self { + Self { + endpoint: EndpointType::Unprivileged, + router: handler.generate_routes().with_state(handler.generate_initial_state()), + } + } +} + /// A dynamically-registered gRPC route set, asserted into the dataspace registry. /// /// Publishers assert a `DynamicGrpcRoute` under a [`Handle`] to register gRPC routes with a dynamic API server. The From 970d60230e2991ce8f7a38665a2060356fb084a1 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sat, 21 Feb 2026 14:47:42 -0500 Subject: [PATCH 6/6] switch to single type for both HTTP and gRPC dynamic routes --- Cargo.lock | 1 + lib/saluki-api/Cargo.toml | 1 + lib/saluki-api/src/lib.rs | 92 +++++++++++++++++++++++-------- lib/saluki-app/src/dynamic_api.rs | 75 +++++++++++++------------ 4 files changed, 108 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 110f0cb57d..3856d4d112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3476,6 +3476,7 @@ dependencies = [ "axum", "axum-extra", "http", + "tonic", ] [[package]] diff --git a/lib/saluki-api/Cargo.toml b/lib/saluki-api/Cargo.toml index 79361606cc..265028cf9d 100644 --- a/lib/saluki-api/Cargo.toml +++ b/lib/saluki-api/Cargo.toml @@ -12,3 +12,4 @@ workspace = true axum = { workspace = true, features = ["http1", "tokio"] } axum-extra = { workspace = true, features = ["query"] } http = { workspace = true } +tonic = { workspace = true } diff --git a/lib/saluki-api/src/lib.rs b/lib/saluki-api/src/lib.rs index f469d10c30..fd3e3a1417 100644 --- a/lib/saluki-api/src/lib.rs +++ b/lib/saluki-api/src/lib.rs @@ -2,6 +2,7 @@ pub use axum::response; pub use axum::routing; use axum::Router; pub use http::StatusCode; +use tonic::service::Routes; pub mod extract { pub use axum::extract::*; @@ -18,11 +19,14 @@ pub trait APIHandler { fn generate_routes(&self) -> Router; } -/// Identifies which API endpoint a handler should be bound to. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +/// API endpoint type. +/// +/// Identifies whether or not a route should be exposed on the unprivileged or privileged API endpoint. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum EndpointType { /// The unprivileged (plain HTTP) API endpoint. Unprivileged, + /// The privileged (TLS-protected) API endpoint. Privileged, } @@ -37,35 +41,77 @@ impl EndpointType { } } -/// A dynamically-registered HTTP route set, asserted into the dataspace registry. +/// API endpoint protocol. /// -/// Publishers assert a `DynamicHttpRoute` under a [`Handle`] to register HTTP routes with a dynamic API server. The -/// server observes assertions and retractions via a wildcard subscription and hot-swaps its inner HTTP router. -#[derive(Clone, Debug)] -pub struct DynamicHttpRoute { - /// Which API endpoint these routes target. - pub endpoint: EndpointType, - /// The HTTP routes to serve. - pub router: Router<()>, +/// Identifies which application protocol the route should be exposed to. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum EndpointProtocol { + /// HTTP. + Http, + + /// gRPC. + Grpc, } -impl DynamicHttpRoute { - pub fn unprivilged(handler: T) -> Self { - Self { - endpoint: EndpointType::Unprivileged, - router: handler.generate_routes().with_state(handler.generate_initial_state()), +impl EndpointProtocol { + /// Returns a human-readable name for this endpoint protocol. + pub fn name(&self) -> &'static str { + match self { + Self::Http => "HTTP", + Self::Grpc => "gRPC", } } } -/// A dynamically-registered gRPC route set, asserted into the dataspace registry. +/// A set of dynamic API routes. /// -/// Publishers assert a `DynamicGrpcRoute` under a [`Handle`] to register gRPC routes with a dynamic API server. The -/// router should be pre-converted from tonic routes (via [`tonic::Routes::into_axum_router`]). +/// Dynamic routes allow for processes to dynamically register/unregister themselves from running API endpoints, +/// adapting to changes in the process state and without requiring all routes to be known and declared upfront. #[derive(Clone, Debug)] -pub struct DynamicGrpcRoute { +pub struct DynamicRoute { /// Which API endpoint these routes target. - pub endpoint: EndpointType, - /// The gRPC routes to serve, as an axum router. - pub router: Router<()>, + endpoint_type: EndpointType, + + /// Which API protocol these routes target. + endpoint_protocol: EndpointProtocol, + + /// The routes to serve. + router: Router<()>, +} + +impl DynamicRoute { + /// Creates a dynamic HTTP route from the given API handler. + pub fn http(endpoint_type: EndpointType, handler: T) -> Self { + let router = handler.generate_routes().with_state(handler.generate_initial_state()); + Self::new(endpoint_type, EndpointProtocol::Http, router) + } + + /// Creates a dynamic gRPC route from the given Tonic routes. + pub fn grpc(endpoint_type: EndpointType, routes: Routes) -> Self { + let router = routes.prepare().into_axum_router(); + Self::new(endpoint_type, EndpointProtocol::Grpc, router) + } + + fn new(endpoint_type: EndpointType, endpoint_protocol: EndpointProtocol, router: Router<()>) -> Self { + Self { + endpoint_type, + endpoint_protocol, + router, + } + } + + /// Returns the type of endpoint these routes target. + pub fn endpoint_type(&self) -> EndpointType { + self.endpoint_type + } + + /// Returns the protocol of endpoint these routes target. + pub fn endpoint_protocol(&self) -> EndpointProtocol { + self.endpoint_protocol + } + + /// Consumes this route and returns the underlying router. + pub fn into_router(self) -> Router<()> { + self.router + } } diff --git a/lib/saluki-app/src/dynamic_api.rs b/lib/saluki-app/src/dynamic_api.rs index 7fa0ae3232..2764b8c987 100644 --- a/lib/saluki-app/src/dynamic_api.rs +++ b/lib/saluki-app/src/dynamic_api.rs @@ -19,7 +19,7 @@ use http::Response; use rcgen::{generate_simple_self_signed, CertifiedKey}; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pki_types::PrivatePkcs8KeyDer; -use saluki_api::{DynamicGrpcRoute, DynamicHttpRoute, EndpointType}; +use saluki_api::{DynamicRoute, EndpointProtocol, EndpointType}; use saluki_common::collections::FastIndexMap; use saluki_core::runtime::{ state::{AssertionUpdate, DataspaceRegistry, Handle, Subscription}, @@ -108,9 +108,8 @@ impl Supervisable for DynamicAPIBuilder { let (inner_http, outer_http) = create_dynamic_router(); let (inner_grpc, outer_grpc) = create_dynamic_router(); - // Subscribe to both HTTP and gRPC route assertions. - let http_subscription = self.dataspace_registry.subscribe_all::(); - let grpc_subscription = self.dataspace_registry.subscribe_all::(); + // Subscribe to all dynamic route assertions. + let route_assertions = self.dataspace_registry.subscribe_all::(); // Bind the HTTP listener immediately so we fail fast on bind errors. let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone()) @@ -125,7 +124,7 @@ impl Supervisable for DynamicAPIBuilder { } let (shutdown_handle, error_handle) = http_server.listen(); - let endpoint_type = self.endpoint_type.clone(); + let endpoint_type = self.endpoint_type; let listen_address = self.listen_address.clone(); Ok(Box::pin(async move { @@ -134,8 +133,7 @@ impl Supervisable for DynamicAPIBuilder { run_event_loop( inner_http, inner_grpc, - http_subscription, - grpc_subscription, + route_assertions, endpoint_type, process_shutdown, shutdown_handle, @@ -183,9 +181,8 @@ impl Service> for DynamicRouterService { /// Runs the event loop that listens for route assertions/retractions and hot-swaps the inner routers. async fn run_event_loop( inner_http: Arc>, inner_grpc: Arc>, - mut http_subscription: Subscription, mut grpc_subscription: Subscription, - endpoint_type: EndpointType, mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, - error_handle: ErrorHandle, + mut route_assertions: Subscription, endpoint_type: EndpointType, + mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle, ) -> Result<(), GenericError> { let mut http_handlers = FastIndexMap::default(); let mut grpc_handlers = FastIndexMap::default(); @@ -209,54 +206,56 @@ async fn run_event_loop( break; } - maybe_update = http_subscription.recv() => { + maybe_update = route_assertions.recv() => { let Some(update) = maybe_update else { - warn!("HTTP route subscription channel closed."); + warn!("Route subscription channel closed."); break; }; + let mut rebuild_http = false; + let mut rebuild_grpc = false; + match update { AssertionUpdate::Asserted(handle, route) => { - if route.endpoint != endpoint_type { + if route.endpoint_type() != endpoint_type { continue; } - debug!(?handle, "Registering dynamic HTTP handler."); - http_handlers.insert(handle, route.router); + match route.endpoint_protocol() { + EndpointProtocol::Http => { + debug!(?handle, "Registering dynamic HTTP handler."); + http_handlers.insert(handle, route.into_router()); + + rebuild_http = true; + }, + EndpointProtocol::Grpc => { + debug!(?handle, "Registering dynamic gRPC handler."); + grpc_handlers.insert(handle, route.into_router()); + + rebuild_grpc = true; + }, + } } AssertionUpdate::Retracted(handle) => { if http_handlers.swap_remove(&handle).is_some() { debug!(?handle, "Withdrawing dynamic HTTP handler."); - } - } - } - - rebuild_router(&inner_http, &http_handlers); - } - - maybe_update = grpc_subscription.recv() => { - let Some(update) = maybe_update else { - warn!("gRPC route subscription channel closed."); - break; - }; - - match update { - AssertionUpdate::Asserted(handle, route) => { - if route.endpoint != endpoint_type { - continue; + rebuild_http = true; } - info!(handle = ?handle, "Registering dynamic gRPC handler."); - grpc_handlers.insert(handle, route.router); - } - AssertionUpdate::Retracted(handle) => { if grpc_handlers.swap_remove(&handle).is_some() { - info!(handle = ?handle, "Withdrawing dynamic gRPC handler."); + debug!(?handle, "Withdrawing dynamic gRPC handler."); + rebuild_grpc = true; } } } - rebuild_router(&inner_grpc, &grpc_handlers); + if rebuild_http { + rebuild_router(&inner_http, &http_handlers); + } + + if rebuild_grpc { + rebuild_router(&inner_grpc, &grpc_handlers); + } } } }