From 6a2df66680e9ba4268c6d97190bebc8429cdd6e4 Mon Sep 17 00:00:00 2001 From: Adrien Langou Date: Tue, 26 May 2026 17:15:15 +0200 Subject: [PATCH] feat(server): add grpc rate limiting gateway-wide Signed-off-by: Adrien Langou --- architecture/gateway.md | 4 + crates/openshell-core/src/config.rs | 64 ++- crates/openshell-server/src/cli.rs | 121 +++++ crates/openshell-server/src/config_file.rs | 8 + crates/openshell-server/src/lib.rs | 5 + crates/openshell-server/src/multiplex.rs | 423 +++++++++++++++++- deploy/helm/openshell/README.md | 2 + .../openshell/templates/gateway-config.yaml | 11 + .../openshell/tests/gateway_config_test.yaml | 58 +++ deploy/helm/openshell/values.yaml | 11 + docs/reference/gateway-config.mdx | 5 + 11 files changed, 710 insertions(+), 2 deletions(-) diff --git a/architecture/gateway.md b/architecture/gateway.md index 35e2d6659..0601749c7 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -37,6 +37,10 @@ health, metrics, or tunnel routes. The plaintext service router also rejects browser requests whose Fetch Metadata, Origin, or Referer headers indicate a cross-origin or sibling-subdomain request. +Operators can configure a gateway-wide gRPC request rate limit. The limit is +applied only to gRPC API traffic after protocol multiplexing; health, metrics, +and local sandbox-service HTTP routes are not rate limited by this control. + Supported auth modes: | Mode | Use | diff --git a/crates/openshell-core/src/config.rs b/crates/openshell-core/src/config.rs index 04d6928da..eaaf1e4a0 100644 --- a/crates/openshell-core/src/config.rs +++ b/crates/openshell-core/src/config.rs @@ -13,7 +13,6 @@ use std::os::unix::fs::FileTypeExt; use std::path::{Path, PathBuf}; use std::process::Command; use std::str::FromStr; -#[cfg(unix)] use std::time::Duration; // ── Public default constants ──────────────────────────────────────────── @@ -364,6 +363,20 @@ pub struct Config { /// TTL for SSH session tokens, in seconds. 0 disables expiry. pub ssh_session_ttl_secs: u64, + /// Maximum gRPC requests allowed per rate-limit window. + /// + /// When paired with [`Self::grpc_rate_limit_window_secs`], positive values + /// enable gateway-wide gRPC request rate limiting. `None` or `0` disables + /// the limit. + pub grpc_rate_limit_requests: Option, + + /// gRPC rate-limit window length in seconds. + /// + /// When paired with [`Self::grpc_rate_limit_requests`], positive values + /// enable gateway-wide gRPC request rate limiting. `None` or `0` disables + /// the limit. + pub grpc_rate_limit_window_secs: Option, + /// Browser-facing sandbox service routing configuration. pub service_routing: ServiceRoutingConfig, } @@ -547,6 +560,8 @@ impl Config { database_url: String::new(), compute_drivers: vec![], ssh_session_ttl_secs: default_ssh_session_ttl_secs(), + grpc_rate_limit_requests: None, + grpc_rate_limit_window_secs: None, service_routing: ServiceRoutingConfig::default(), } } @@ -614,6 +629,29 @@ impl Config { self } + /// Set the gateway-wide gRPC request rate limit. + #[must_use] + pub const fn with_grpc_rate_limit( + mut self, + requests: Option, + window_secs: Option, + ) -> Self { + self.grpc_rate_limit_requests = requests; + self.grpc_rate_limit_window_secs = window_secs; + self + } + + /// Return the effective gRPC rate limit, if fully configured and enabled. + #[must_use] + pub fn grpc_rate_limit(&self) -> Option<(u64, Duration)> { + let requests = self.grpc_rate_limit_requests?; + let window_secs = self.grpc_rate_limit_window_secs?; + if requests == 0 || window_secs == 0 { + None + } else { + Some((requests, Duration::from_secs(window_secs))) + } + } /// Set the OIDC configuration for JWT-based authentication. #[must_use] pub fn with_oidc(mut self, oidc: OidcConfig) -> Self { @@ -737,6 +775,7 @@ mod tests { #[cfg(unix)] use std::os::unix::net::UnixListener; use std::path::PathBuf; + use std::time::Duration; #[test] fn compute_driver_kind_parses_supported_values() { @@ -794,6 +833,29 @@ mod tests { assert_eq!(cfg.ttl_secs, 0); } + #[test] + fn grpc_rate_limit_requires_positive_pair() { + assert!(Config::new(None).grpc_rate_limit().is_none()); + assert!( + Config::new(None) + .with_grpc_rate_limit(Some(10), None) + .grpc_rate_limit() + .is_none() + ); + assert!( + Config::new(None) + .with_grpc_rate_limit(Some(0), Some(60)) + .grpc_rate_limit() + .is_none() + ); + assert_eq!( + Config::new(None) + .with_grpc_rate_limit(Some(10), Some(60)) + .grpc_rate_limit(), + Some((10, Duration::from_secs(60))) + ); + } + #[test] fn service_routing_allows_loopback_plaintext_http_by_default() { let cfg = Config::new(None); diff --git a/crates/openshell-server/src/cli.rs b/crates/openshell-server/src/cli.rs index 748cec264..f0d1e0df1 100644 --- a/crates/openshell-server/src/cli.rs +++ b/crates/openshell-server/src/cli.rs @@ -175,6 +175,14 @@ struct RunArgs { #[arg(long, env = "OPENSHELL_OIDC_SCOPES_CLAIM", default_value = "")] oidc_scopes_claim: String, + /// Maximum gRPC requests allowed per rate-limit window. Set to 0 to disable. + #[arg(long, env = "OPENSHELL_GRPC_RATE_LIMIT_REQUESTS")] + grpc_rate_limit_requests: Option, + + /// gRPC rate-limit window length in seconds. Set to 0 to disable. + #[arg(long, env = "OPENSHELL_GRPC_RATE_LIMIT_WINDOW_SECONDS")] + grpc_rate_limit_window_seconds: Option, + /// Subject Alternative Names configured on the gateway server certificate. /// Wildcard DNS SANs also enable sandbox service URLs under that domain. #[arg( @@ -353,8 +361,16 @@ async fn run_from_args(mut args: RunArgs, matches: ArgMatches) -> Result<()> { config = config .with_database_url(db_url) .with_compute_drivers(args.drivers.clone()) + .with_grpc_rate_limit( + args.grpc_rate_limit_requests, + args.grpc_rate_limit_window_seconds, + ) .with_server_sans(args.server_sans.clone()) .with_loopback_service_http(args.enable_loopback_service_http); + validate_grpc_rate_limit_args( + args.grpc_rate_limit_requests, + args.grpc_rate_limit_window_seconds, + )?; if let Some(ttl) = file .as_ref() @@ -608,6 +624,37 @@ fn merge_file_into_args(args: &mut RunArgs, file: &GatewayFileSection, matches: args.oidc_scopes_claim.clone_from(&oidc.scopes_claim); } } + if let Some(requests) = file.grpc_rate_limit_requests + && args.grpc_rate_limit_requests.is_none() + && arg_defaulted(matches, "grpc_rate_limit_requests") + { + args.grpc_rate_limit_requests = Some(requests); + } + if let Some(window) = file.grpc_rate_limit_window_seconds + && args.grpc_rate_limit_window_seconds.is_none() + && arg_defaulted(matches, "grpc_rate_limit_window_seconds") + { + args.grpc_rate_limit_window_seconds = Some(window); + } +} + +fn validate_grpc_rate_limit_args(requests: Option, window_seconds: Option) -> Result<()> { + let disabled = matches!(requests, Some(0)) || matches!(window_seconds, Some(0)); + if disabled { + return Ok(()); + } + if matches!( + (requests, window_seconds), + (Some(requests), None) if requests > 0 + ) || matches!( + (requests, window_seconds), + (None, Some(window_seconds)) if window_seconds > 0 + ) { + return Err(miette::miette!( + "gRPC rate limiting requires both --grpc-rate-limit-requests and --grpc-rate-limit-window-seconds (TOML keys grpc_rate_limit_requests and grpc_rate_limit_window_seconds) to be positive; set either value to 0 to disable" + )); + } + Ok(()) } fn effective_single_driver(args: &RunArgs) -> Option { @@ -893,6 +940,41 @@ mod tests { assert!(cli.run.enable_mtls_auth); } + #[test] + fn command_parses_grpc_rate_limit_flags() { + let _lock = ENV_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _g1 = EnvVarGuard::remove("OPENSHELL_GRPC_RATE_LIMIT_REQUESTS"); + let _g2 = EnvVarGuard::remove("OPENSHELL_GRPC_RATE_LIMIT_WINDOW_SECONDS"); + + let cli = Cli::try_parse_from([ + "openshell-gateway", + "--db-url", + "sqlite::memory:", + "--grpc-rate-limit-requests", + "120", + "--grpc-rate-limit-window-seconds", + "60", + ]) + .unwrap(); + + assert_eq!(cli.run.grpc_rate_limit_requests, Some(120)); + assert_eq!(cli.run.grpc_rate_limit_window_seconds, Some(60)); + } + + #[test] + fn validate_grpc_rate_limit_args_requires_positive_pair() { + assert!(super::validate_grpc_rate_limit_args(None, None).is_ok()); + assert!(super::validate_grpc_rate_limit_args(Some(0), None).is_ok()); + assert!(super::validate_grpc_rate_limit_args(None, Some(0)).is_ok()); + assert!(super::validate_grpc_rate_limit_args(Some(0), Some(60)).is_ok()); + assert!(super::validate_grpc_rate_limit_args(Some(120), Some(0)).is_ok()); + assert!(super::validate_grpc_rate_limit_args(Some(120), Some(60)).is_ok()); + assert!(super::validate_grpc_rate_limit_args(Some(120), None).is_err()); + assert!(super::validate_grpc_rate_limit_args(None, Some(60)).is_err()); + } + #[test] fn command_rejects_removed_driver_flags() { let err = command() @@ -1316,6 +1398,45 @@ audience = "openshell-cli" assert_eq!(args.oidc_audience, "openshell-cli"); } + #[test] + fn file_grpc_rate_limit_populates_args_when_cli_omits() { + let (mut args, matches) = + parse_with_args(&["openshell-gateway", "--db-url", "sqlite::memory:"]); + let file = config_file_from_toml( + r" +[openshell.gateway] +grpc_rate_limit_requests = 100 +grpc_rate_limit_window_seconds = 30 +", + ); + merge_file_into_args(&mut args, &file.openshell.gateway, &matches); + + assert_eq!(args.grpc_rate_limit_requests, Some(100)); + assert_eq!(args.grpc_rate_limit_window_seconds, Some(30)); + } + + #[test] + fn cli_grpc_rate_limit_overrides_file_value() { + let (mut args, matches) = parse_with_args(&[ + "openshell-gateway", + "--db-url", + "sqlite::memory:", + "--grpc-rate-limit-requests", + "20", + ]); + let file = config_file_from_toml( + r" +[openshell.gateway] +grpc_rate_limit_requests = 100 +grpc_rate_limit_window_seconds = 30 +", + ); + merge_file_into_args(&mut args, &file.openshell.gateway, &matches); + + assert_eq!(args.grpc_rate_limit_requests, Some(20)); + assert_eq!(args.grpc_rate_limit_window_seconds, Some(30)); + } + #[test] fn aux_listener_preserves_file_ip_against_public_bind() { use std::net::SocketAddr; diff --git a/crates/openshell-server/src/config_file.rs b/crates/openshell-server/src/config_file.rs index 57037bcf5..39cf02bba 100644 --- a/crates/openshell-server/src/config_file.rs +++ b/crates/openshell-server/src/config_file.rs @@ -94,6 +94,10 @@ pub struct GatewayFileSection { pub sandbox_namespace: Option, #[serde(default)] pub ssh_session_ttl_secs: Option, + #[serde(default)] + pub grpc_rate_limit_requests: Option, + #[serde(default)] + pub grpc_rate_limit_window_seconds: Option, // ── Service routing ────────────────────────────────────────────────── /// Subject Alternative Names configured on the gateway server certificate. @@ -349,6 +353,8 @@ health_bind_address = "0.0.0.0:8081" log_level = "info" compute_drivers = ["kubernetes"] sandbox_namespace = "agents" +grpc_rate_limit_requests = 120 +grpc_rate_limit_window_seconds = 60 default_image = "ghcr.io/nvidia/openshell/sandbox:latest" supervisor_image = "ghcr.io/nvidia/openshell/supervisor:latest" client_tls_secret_name = "openshell-sandbox-tls" @@ -375,6 +381,8 @@ grpc_endpoint = "https://openshell-gateway.agents.svc:8080" gw.default_image.as_deref(), Some("ghcr.io/nvidia/openshell/sandbox:latest") ); + assert_eq!(gw.grpc_rate_limit_requests, Some(120)); + assert_eq!(gw.grpc_rate_limit_window_seconds, Some(60)); assert!(gw.tls.is_some()); assert!(gw.oidc.is_some()); assert!(file.openshell.drivers.contains_key("kubernetes")); diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 676e23071..8082ea3cc 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -132,6 +132,9 @@ pub struct ServerState { /// `IssueSandboxToken` bootstrap path. Only present when the gateway /// runs in-cluster. pub k8s_sa_authenticator: Option>, + + /// Gateway-wide gRPC request rate limiter shared by every multiplex path. + pub(crate) grpc_rate_limiter: Option, } fn is_benign_tls_handshake_failure(error: &std::io::Error) -> bool { @@ -164,6 +167,7 @@ impl ServerState { supervisor_sessions: Arc, oidc_cache: Option>, ) -> Self { + let grpc_rate_limiter = multiplex::GrpcRateLimiter::from_config(&config); Self { config, store, @@ -180,6 +184,7 @@ impl ServerState { sandbox_jwt_issuer: None, sandbox_jwt_authenticator: None, k8s_sa_authenticator: None, + grpc_rate_limiter, } } } diff --git a/crates/openshell-server/src/multiplex.rs b/crates/openshell-server/src/multiplex.rs index e94326f98..6f31d27e2 100644 --- a/crates/openshell-server/src/multiplex.rs +++ b/crates/openshell-server/src/multiplex.rs @@ -17,12 +17,13 @@ use hyper_util::{ service::TowerToHyperService, }; use metrics::{counter, histogram}; +use openshell_core::Config; use openshell_core::proto::{ inference_server::InferenceServer, open_shell_server::OpenShellServer, }; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -174,6 +175,8 @@ impl MultiplexService { self.state.config.mtls_auth.enabled, self.state.config.auth.allow_unauthenticated_users, ); + let grpc_service = + GrpcRateLimitService::new(grpc_service, self.state.grpc_rate_limiter.clone()); let http_service = http_router(self.state.clone()); let grpc_service = request_id_middleware!(grpc_service); @@ -211,6 +214,153 @@ impl MultiplexService { } } +#[derive(Clone, Debug)] +pub struct GrpcRateLimiter { + requests: u64, + window: Duration, + state: Arc>, +} + +#[derive(Debug)] +struct GrpcRateLimitState { + window_started: Instant, + remaining: u64, +} + +impl GrpcRateLimiter { + pub fn from_config(config: &Config) -> Option { + let (requests, window) = config.grpc_rate_limit()?; + Some(Self { + requests, + window, + state: Arc::new(Mutex::new(GrpcRateLimitState { + window_started: Instant::now(), + remaining: requests, + })), + }) + } + + fn allow(&self) -> bool { + let now = Instant::now(); + let mut state = self + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if now.duration_since(state.window_started) >= self.window { + state.window_started = now; + state.remaining = self.requests; + } + if state.remaining == 0 { + false + } else { + state.remaining -= 1; + true + } + } + + /// Report whether the limiter currently has capacity without consuming a + /// token, rolling the window over first so an elapsed window reports + /// capacity again. + /// + /// Used by `poll_ready` so an exhausted limiter reports readiness instead + /// of blocking on inner-service backpressure: `call` can then return + /// `RESOURCE_EXHAUSTED` immediately rather than waiting for the inner gRPC + /// service to become ready. + fn has_capacity(&self) -> bool { + let now = Instant::now(); + let mut state = self + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if now.duration_since(state.window_started) >= self.window { + state.window_started = now; + state.remaining = self.requests; + } + state.remaining > 0 + } +} + +#[derive(Clone)] +struct GrpcRateLimitService { + inner: S, + limiter: Option, + /// Set by `poll_ready` when it reports synthetic readiness for an + /// exhausted limiter without polling the inner service. The paired `call` + /// must then reject with `RESOURCE_EXHAUSTED` instead of forwarding to an + /// inner service that never reported readiness — even if the rate-limit + /// window rolls over in between. Reset whenever `poll_ready` defers to the + /// inner service. + rate_limited: bool, +} + +impl GrpcRateLimitService { + fn new(inner: S, limiter: Option) -> Self { + Self { + inner, + limiter, + rate_limited: false, + } + } +} + +impl tower::Service> for GrpcRateLimitService +where + S: tower::Service, Response = Response>, + S::Future: Send + 'static, + B: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // When the limiter is exhausted, report ready so `call` can return + // RESOURCE_EXHAUSTED immediately. Delegating to the inner service here + // would make rate-limited requests wait on inner backpressure (a + // pending inner `poll_ready`) before they are rejected. The check is + // non-consuming: the token is only consumed in `call` via `allow`. + // + // Crucially, this path does NOT poll the inner service, so the inner + // service has not reported readiness. Record that decision so the + // paired `call` rejects rather than forwarding to a service that never + // became ready — even if the rate-limit window rolls over in between. + if self + .limiter + .as_ref() + .is_some_and(|limiter| !limiter.has_capacity()) + { + self.rate_limited = true; + return Poll::Ready(Ok(())); + } + self.rate_limited = false; + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + // If `poll_ready` short-circuited an exhausted limiter, it never polled + // the inner service to readiness. Honor that decision regardless of the + // limiter's current state (the window may have rolled over since): the + // Tower contract forbids forwarding to an inner service that did not + // report readiness. + if std::mem::take(&mut self.rate_limited) { + let response = + tonic::Status::resource_exhausted("gRPC rate limit exceeded").into_http(); + return Box::pin(async move { Ok(response) }); + } + if self + .limiter + .as_ref() + .is_some_and(|limiter| !limiter.allow()) + { + let response = + tonic::Status::resource_exhausted("gRPC rate limit exceeded").into_http(); + return Box::pin(async move { Ok(response) }); + } + let future = self.inner.call(req); + Box::pin(future) + } +} + /// Combined gRPC service that routes between `OpenShell` and Inference services /// based on the request path prefix. #[derive(Clone)] @@ -654,6 +804,8 @@ mod tests { use bytes::Bytes; use http_body_util::Empty; use std::sync::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tower::Service; #[test] fn uuid_request_id_generates_valid_uuid() { @@ -801,6 +953,275 @@ mod tests { assert_eq!(request_id.to_str().unwrap(), "grpc-corr-id"); } + #[derive(Clone)] + struct CountingGrpcService { + calls: Arc, + } + + impl Service> for CountingGrpcService { + type Response = Response; + type Error = std::convert::Infallible; + type Future = std::future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request<()>) -> Self::Future { + self.calls.fetch_add(1, Ordering::Relaxed); + std::future::ready(Ok(Response::new(tonic::body::empty_body()))) + } + } + + /// Inner service that is never ready, used to prove the rate limiter does + /// not wait on inner-service backpressure when it is already exhausted. + /// Counts `call` invocations so tests can assert the limiter never forwards + /// to an inner service that did not report readiness. + #[derive(Clone)] + struct PendingInnerService { + calls: Arc, + } + + impl PendingInnerService { + fn new() -> Self { + Self { + calls: Arc::new(AtomicUsize::new(0)), + } + } + } + + impl Service> for PendingInnerService { + type Response = Response; + type Error = std::convert::Infallible; + type Future = std::future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn call(&mut self, _req: Request<()>) -> Self::Future { + self.calls.fetch_add(1, Ordering::Relaxed); + std::future::ready(Ok(Response::new(tonic::body::empty_body()))) + } + } + + #[tokio::test] + async fn grpc_rate_limit_poll_ready_short_circuits_exhausted_limiter() { + // An exhausted limiter must report ready even when the inner service is + // pending, so `call` returns RESOURCE_EXHAUSTED instead of waiting on + // inner backpressure. + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config).expect("limiter should be enabled"); + // Consume the single token so the limiter is exhausted. + assert!(limiter.allow()); + + let mut exhausted = GrpcRateLimitService::new(PendingInnerService::new(), Some(limiter)); + let mut cx = Context::from_waker(std::task::Waker::noop()); + assert!( + matches!(exhausted.poll_ready(&mut cx), Poll::Ready(Ok(()))), + "exhausted limiter should report ready despite a pending inner service", + ); + let response = exhausted.call(Request::new(())).await.unwrap(); + assert_eq!(grpc_status_from_response(&response), "8"); + + // A limiter with capacity must still respect inner backpressure. + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config); + let mut with_capacity = GrpcRateLimitService::new(PendingInnerService::new(), limiter); + assert!( + with_capacity.poll_ready(&mut cx).is_pending(), + "limiter with capacity should defer to the pending inner service", + ); + } + + #[tokio::test] + async fn grpc_rate_limit_call_rejects_after_poll_ready_short_circuit_despite_window_rollover() { + // Regression: when `poll_ready` reports synthetic readiness for an + // exhausted limiter, it does NOT poll the inner service. If the + // rate-limit window then rolls over before `call`, the request must + // still be rejected rather than forwarded to an inner service that + // never reported readiness (a Tower contract violation). + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config).expect("limiter should be enabled"); + // Exhaust the single token. + assert!(limiter.allow()); + + // Pending inner service: its `poll_ready` never reports ready and its + // `call` increments a counter. A ready result from the wrapper + // therefore proves the limiter short-circuited rather than delegating, + // and `calls == 0` proves the wrapper never forwarded. + let inner = PendingInnerService::new(); + let calls = inner.calls.clone(); + let mut service = GrpcRateLimitService::new(inner, Some(limiter.clone())); + + // poll_ready short-circuits the exhausted limiter and records synthetic + // readiness without polling the inner service. + let mut cx = Context::from_waker(std::task::Waker::noop()); + assert!( + matches!(service.poll_ready(&mut cx), Poll::Ready(Ok(()))), + "exhausted limiter should report ready despite a pending inner service", + ); + + // The window rolls over between poll_ready and call: the limiter now + // has capacity again. + { + let mut state = limiter + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + state.window_started = state + .window_started + .checked_sub(Duration::from_secs(61)) + .expect("test window rewind should be valid"); + } + + let response = service.call(Request::new(())).await.unwrap(); + assert_eq!(grpc_status_from_response(&response), "8"); + assert_eq!( + calls.load(Ordering::Relaxed), + 0, + "inner service must not be called when poll_ready short-circuited the limiter", + ); + } + + #[tokio::test] + async fn grpc_rate_limit_returns_resource_exhausted_after_limit() { + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config); + let calls = Arc::new(AtomicUsize::new(0)); + let mut service = GrpcRateLimitService::new( + CountingGrpcService { + calls: calls.clone(), + }, + limiter, + ); + + let first = service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&first), "0"); + + let second = service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&second), "8"); + assert_eq!(calls.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn grpc_rate_limit_disabled_passes_requests_through() { + let config = Config::new(None).with_grpc_rate_limit(Some(0), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config); + let calls = Arc::new(AtomicUsize::new(0)); + let mut service = GrpcRateLimitService::new( + CountingGrpcService { + calls: calls.clone(), + }, + limiter, + ); + + for _ in 0..3 { + let response = service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&response), "0"); + } + assert_eq!(calls.load(Ordering::Relaxed), 3); + } + + #[tokio::test] + async fn grpc_rate_limit_resets_after_window() { + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config).expect("limiter should be enabled"); + let calls = Arc::new(AtomicUsize::new(0)); + let mut service = GrpcRateLimitService::new( + CountingGrpcService { + calls: calls.clone(), + }, + Some(limiter.clone()), + ); + + let first = service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&first), "0"); + + { + let mut state = limiter + .state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + state.window_started = state + .window_started + .checked_sub(Duration::from_secs(61)) + .expect("test window rewind should be valid"); + } + + let second = service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&second), "0"); + assert_eq!(calls.load(Ordering::Relaxed), 2); + } + + #[tokio::test] + async fn grpc_rate_limit_state_is_shared_across_service_clones() { + let config = Config::new(None).with_grpc_rate_limit(Some(1), Some(60)); + let limiter = GrpcRateLimiter::from_config(&config); + let calls = Arc::new(AtomicUsize::new(0)); + let mut first_service = GrpcRateLimitService::new( + CountingGrpcService { + calls: calls.clone(), + }, + limiter.clone(), + ); + let mut second_service = GrpcRateLimitService::new( + CountingGrpcService { + calls: calls.clone(), + }, + limiter, + ); + + let first = first_service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&first), "0"); + + let second = second_service + .ready() + .await + .unwrap() + .call(Request::new(())) + .await + .unwrap(); + assert_eq!(grpc_status_from_response(&second), "8"); + assert_eq!(calls.load(Ordering::Relaxed), 1); + } + #[derive(Clone)] struct TraceBuf(Arc>>); diff --git a/deploy/helm/openshell/README.md b/deploy/helm/openshell/README.md index ab5b6eb45..e5328a6c7 100644 --- a/deploy/helm/openshell/README.md +++ b/deploy/helm/openshell/README.md @@ -194,6 +194,8 @@ JWT signing Secret. | server.enableUserNamespaces | bool | `false` | Enable Kubernetes user namespace isolation (hostUsers: false) for sandbox pods. Requires Kubernetes 1.33+ with user namespace support available (beta through 1.35, GA in 1.36+), plus a supporting container runtime and Linux 5.12+. When enabled, container UID 0 maps to an unprivileged host UID and capabilities become namespaced. | | server.externalDbSecret | string | `""` | Name of a pre-existing Opaque Secret containing a PostgreSQL connection URI (key: uri). When set, the gateway reads OPENSHELL_DB_URL from this Secret instead of using dbUrl. The Secret must contain a `uri` key, e.g. postgresql://user:pass@host:5432/dbname. | | server.grpcEndpoint | string | `""` | gRPC endpoint sandboxes call back into the gateway. Leave empty to derive it from the chart fullname, release namespace, service port, and disableTls flag, for example https://openshell.openshell.svc.cluster.local:8080. Override only when sandboxes must reach the gateway via a different hostname (e.g. an external ingress or a host alias). | +| server.grpcRateLimit.requests | int | `0` | Maximum gRPC requests allowed per window. Must be positive (alongside windowSeconds) to enable rate limiting; 0 (default) disables it. | +| server.grpcRateLimit.windowSeconds | int | `0` | gRPC rate-limit window length in seconds. Must be positive (alongside requests) to enable rate limiting; 0 (default) disables it. | | server.hostGatewayIP | string | `""` | Host gateway IP for sandbox pod hostAliases. When set, sandbox pods get hostAliases entries mapping host.docker.internal and host.openshell.internal to this IP, allowing them to reach services running on the Docker host. Auto-detected by the cluster entrypoint script. | | server.logLevel | string | `"info"` | Gateway log level. | | server.oidc.adminRole | string | `""` | Role name for admin access. Leave empty (with userRole also empty) for authentication-only mode. Both must be set or both empty. | diff --git a/deploy/helm/openshell/templates/gateway-config.yaml b/deploy/helm/openshell/templates/gateway-config.yaml index f46547c3f..202d997c0 100644 --- a/deploy/helm/openshell/templates/gateway-config.yaml +++ b/deploy/helm/openshell/templates/gateway-config.yaml @@ -56,6 +56,17 @@ data: {{- if $sans }} server_sans = [{{- range $i, $san := $sans }}{{ if $i }}, {{ end }}{{ $san | quote }}{{- end }}] {{- end }} + {{- $rlRequests := int .Values.server.grpcRateLimit.requests }} + {{- $rlWindowSeconds := int .Values.server.grpcRateLimit.windowSeconds }} + {{- if or (lt $rlRequests 0) (lt $rlWindowSeconds 0) }} + {{- fail "server.grpcRateLimit.requests and server.grpcRateLimit.windowSeconds must not be negative; they map to unsigned gateway settings" }} + {{- end }} + {{- if and (gt $rlRequests 0) (gt $rlWindowSeconds 0) }} + grpc_rate_limit_requests = {{ $rlRequests }} + grpc_rate_limit_window_seconds = {{ $rlWindowSeconds }} + {{- else if or (gt $rlRequests 0) (gt $rlWindowSeconds 0) }} + {{- fail "server.grpcRateLimit requires both requests and windowSeconds to be positive to enable rate limiting, or both 0/unset to disable it" }} + {{- end }} {{- if not .Values.server.disableTls }} diff --git a/deploy/helm/openshell/tests/gateway_config_test.yaml b/deploy/helm/openshell/tests/gateway_config_test.yaml index 6b14fe12a..27251c905 100644 --- a/deploy/helm/openshell/tests/gateway_config_test.yaml +++ b/deploy/helm/openshell/tests/gateway_config_test.yaml @@ -119,6 +119,64 @@ tests: path: data["gateway.toml"] pattern: '(?ms)\[openshell\.gateway\.auth\].*?allow_unauthenticated_users\s*=\s*true' + - it: omits the gRPC rate limit by default + template: templates/gateway-config.yaml + asserts: + - notMatchRegex: + path: data["gateway.toml"] + pattern: 'grpc_rate_limit_requests\s*=' + - notMatchRegex: + path: data["gateway.toml"] + pattern: 'grpc_rate_limit_window_seconds\s*=' + + - it: renders the gRPC rate limit under [openshell.gateway] when both values are positive + template: templates/gateway-config.yaml + set: + server.grpcRateLimit.requests: 120 + server.grpcRateLimit.windowSeconds: 60 + asserts: + - matchRegex: + path: data["gateway.toml"] + pattern: '(?ms)\[openshell\.gateway\].*?grpc_rate_limit_requests\s*=\s*120' + - matchRegex: + path: data["gateway.toml"] + pattern: '(?ms)\[openshell\.gateway\].*?grpc_rate_limit_window_seconds\s*=\s*60' + + # The validation lives in gateway-config.yaml but surfaces through the + # statefulset checksum include, so the failure is asserted against that + # template (mirrors the postgres serviceBindings failure test below). + - it: fails to render when only requests is positive + template: templates/statefulset.yaml + set: + server.grpcRateLimit.requests: 120 + asserts: + - failedTemplate: + errorMessage: "server.grpcRateLimit requires both requests and windowSeconds to be positive to enable rate limiting, or both 0/unset to disable it" + + - it: fails to render when only windowSeconds is positive + template: templates/statefulset.yaml + set: + server.grpcRateLimit.windowSeconds: 60 + asserts: + - failedTemplate: + errorMessage: "server.grpcRateLimit requires both requests and windowSeconds to be positive to enable rate limiting, or both 0/unset to disable it" + + - it: fails to render when requests is negative + template: templates/statefulset.yaml + set: + server.grpcRateLimit.requests: -1 + asserts: + - failedTemplate: + errorMessage: "server.grpcRateLimit.requests and server.grpcRateLimit.windowSeconds must not be negative; they map to unsigned gateway settings" + + - it: fails to render when windowSeconds is negative + template: templates/statefulset.yaml + set: + server.grpcRateLimit.windowSeconds: -5 + asserts: + - failedTemplate: + errorMessage: "server.grpcRateLimit.requests and server.grpcRateLimit.windowSeconds must not be negative; they map to unsigned gateway settings" + - it: uses the configured existing sandbox service account name template: templates/gateway-config.yaml set: diff --git a/deploy/helm/openshell/values.yaml b/deploy/helm/openshell/values.yaml index f0cd43c73..3e913e685 100644 --- a/deploy/helm/openshell/values.yaml +++ b/deploy/helm/openshell/values.yaml @@ -195,6 +195,17 @@ server: # -- Enable plaintext HTTP routing for loopback sandbox service URLs on # TLS-enabled gateways. enableLoopbackServiceHttp: true + # Optional gateway-wide gRPC request rate limit. Applies only to gRPC API + # traffic after protocol multiplexing; health, metrics, and loopback service + # HTTP routes are not rate limited. Both values must be positive to enable the + # limit, otherwise it is omitted from the rendered config and stays disabled. + grpcRateLimit: + # -- Maximum gRPC requests allowed per window. Must be positive (alongside + # windowSeconds) to enable rate limiting; 0 (default) disables it. + requests: 0 + # -- gRPC rate-limit window length in seconds. Must be positive (alongside + # requests) to enable rate limiting; 0 (default) disables it. + windowSeconds: 0 auth: # -- UNSAFE: accept unauthenticated CLI/user requests as a local developer # principal. Intended only for trusted local Skaffold/k3d development or a diff --git a/docs/reference/gateway-config.mdx b/docs/reference/gateway-config.mdx index c70d8acbd..859cdccbc 100644 --- a/docs/reference/gateway-config.mdx +++ b/docs/reference/gateway-config.mdx @@ -98,6 +98,11 @@ guest_tls_ca = "/etc/openshell/certs/ca.pem" guest_tls_cert = "/etc/openshell/certs/client.pem" guest_tls_key = "/etc/openshell/certs/client-key.pem" +# Optional gRPC rate limit. Both values must be positive to enable the limit. +# Set either value to 0, or omit both, to disable rate limiting. +grpc_rate_limit_requests = 120 +grpc_rate_limit_window_seconds = 60 + # Gateway listener TLS (distinct from the per-driver guest_tls_*). [openshell.gateway.tls] cert_path = "/etc/openshell/certs/gateway.pem"