diff --git a/architecture/sandbox.md b/architecture/sandbox.md index e60b727a5..0e2d1c559 100644 --- a/architecture/sandbox.md +++ b/architecture/sandbox.md @@ -49,6 +49,12 @@ paths, such as proxy support files or GPU device paths when a GPU is present. All ordinary agent egress is routed through the sandbox proxy. The proxy identifies the calling binary, checks trust-on-first-use binary identity, rejects unsafe internal destinations, and evaluates the active policy. +For inspected HTTP traffic, the proxy can enforce REST method/path rules, +WebSocket upgrade and text-message rules, GraphQL operation rules, and +JSON-RPC method and params rules on sandbox-to-server request bodies. JSON-RPC +request inspection buffers up to the endpoint `json_rpc.max_body_bytes` limit. +JSON-RPC responses and server-to-client MCP messages on response or SSE streams +are relayed but are not currently parsed for policy enforcement. `https://inference.local` is special. It bypasses OPA network policy and is handled by the inference interception path: diff --git a/crates/openshell-cli/src/policy_update.rs b/crates/openshell-cli/src/policy_update.rs index 57656b878..22363c920 100644 --- a/crates/openshell-cli/src/policy_update.rs +++ b/crates/openshell-cli/src/policy_update.rs @@ -205,6 +205,8 @@ fn group_allow_rules(specs: &[String]) -> Result Result, #[serde(default, skip_serializing_if = "is_zero_u32")] graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + json_rpc: Option, } // Signature dictated by serde's `skip_serializing_if`, which requires `&T`. @@ -149,6 +151,25 @@ fn is_zero_u32(v: &u32) -> bool { *v == 0 } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct JsonRpcConfigDef { + #[serde(default, skip_serializing_if = "is_zero_u32")] + max_body_bytes: u32, + #[serde(default, skip_serializing_if = "String::is_empty")] + on_parse_error: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + batch_policy: String, +} + +fn json_rpc_config_from_proto(max_body_bytes: u32) -> Option { + (max_body_bytes > 0).then_some(JsonRpcConfigDef { + max_body_bytes, + on_parse_error: String::new(), + batch_policy: String::new(), + }) +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct GraphqlOperationDef { @@ -183,6 +204,10 @@ struct L7AllowDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -216,6 +241,10 @@ struct L7DenyRuleDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -232,6 +261,24 @@ struct NetworkBinaryDef { // YAML → proto conversion // --------------------------------------------------------------------------- +fn matcher_def_to_proto(matcher: QueryMatcherDef) -> L7QueryMatcher { + match matcher { + QueryMatcherDef::Glob(glob) => L7QueryMatcher { glob, any: vec![] }, + QueryMatcherDef::Any(any) => L7QueryMatcher { + glob: String::new(), + any: any.any, + }, + } +} + +fn matcher_proto_to_def(matcher: L7QueryMatcher) -> QueryMatcherDef { + if matcher.any.is_empty() { + QueryMatcherDef::Glob(matcher.glob) + } else { + QueryMatcherDef::Any(QueryAnyDef { any: matcher.any }) + } +} + fn to_proto(raw: PolicyFile) -> SandboxPolicy { let network_policies = raw .network_policies @@ -276,21 +323,21 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { operation_type: r.allow.operation_type, operation_name: r.allow.operation_name, fields: r.allow.fields, + rpc_method: r.allow.rpc_method, query: r .allow .query .into_iter() .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) + (key, matcher_def_to_proto(matcher)) + }) + .collect(), + params: r + .allow + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_def_to_proto(matcher)) }) .collect(), }), @@ -307,21 +354,16 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { operation_type: d.operation_type, operation_name: d.operation_name, fields: d.fields, + rpc_method: d.rpc_method, query: d .query .into_iter() - .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) - }) + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) + .collect(), + params: d + .params + .into_iter() + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) .collect(), }) .collect(), @@ -347,6 +389,10 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc_max_body_bytes: e + .json_rpc + .as_ref() + .map_or(0, |config| config.max_body_bytes), } }) .collect(), @@ -448,18 +494,19 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: a.operation_type, operation_name: a.operation_name, fields: a.fields, + rpc_method: a.rpc_method, query: a .query .into_iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any, - }) - }; - (key, yaml_matcher) + (key, matcher_proto_to_def(matcher)) + }) + .collect(), + params: a + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_proto_to_def(matcher)) }) .collect(), }, @@ -477,18 +524,19 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: d.operation_type.clone(), operation_name: d.operation_name.clone(), fields: d.fields.clone(), + rpc_method: d.rpc_method.clone(), query: d .query .iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob.clone()) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any.clone(), - }) - }; - (key.clone(), yaml_matcher) + (key.clone(), matcher_proto_to_def(matcher.clone())) + }) + .collect(), + params: d + .params + .iter() + .map(|(key, matcher)| { + (key.clone(), matcher_proto_to_def(matcher.clone())) }) .collect(), }) @@ -512,6 +560,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc: json_rpc_config_from_proto(e.json_rpc_max_body_bytes), } }) .collect(), @@ -1715,6 +1764,35 @@ network_policies: assert_eq!(ep.deny_rules[0].fields, vec!["deleteRepository"]); } + #[test] + fn round_trip_preserves_json_rpc_max_body_bytes() { + let yaml = r" +version: 1 +network_policies: + mcp: + name: mcp + endpoints: + - host: mcp.example.com + port: 443 + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + binaries: + - path: /usr/bin/curl +"; + let proto1 = parse_sandbox_policy(yaml).expect("parse failed"); + let yaml_out = serialize_sandbox_policy(&proto1).expect("serialize failed"); + let proto2 = parse_sandbox_policy(&yaml_out).expect("re-parse failed"); + + let ep = &proto2.network_policies["mcp"].endpoints[0]; + assert_eq!(ep.protocol, "json-rpc"); + assert_eq!(ep.json_rpc_max_body_bytes, 131_072); + } + #[test] fn round_trip_preserves_websocket_credential_rewrite() { let yaml = r" diff --git a/crates/openshell-policy/src/merge.rs b/crates/openshell-policy/src/merge.rs index f191cd272..6be185ca0 100644 --- a/crates/openshell-policy/src/merge.rs +++ b/crates/openshell-policy/src/merge.rs @@ -747,6 +747,8 @@ fn expand_access_preset(protocol: &str, access: &str) -> Option> { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::default(), }), }) .collect(), @@ -961,6 +963,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::default(), }), } } diff --git a/crates/openshell-providers/src/profiles.rs b/crates/openshell-providers/src/profiles.rs index d2a35ca80..f94c09ef9 100644 --- a/crates/openshell-providers/src/profiles.rs +++ b/crates/openshell-providers/src/profiles.rs @@ -200,6 +200,8 @@ pub struct EndpointProfile { pub graphql_persisted_queries: HashMap, #[serde(default, skip_serializing_if = "is_zero")] pub graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "is_zero")] + pub json_rpc_max_body_bytes: u32, #[serde(default, skip_serializing_if = "String::is_empty")] pub path: String, } @@ -743,6 +745,7 @@ fn endpoint_to_proto(endpoint: &EndpointProfile) -> NetworkEndpoint { .map(|(name, operation)| (name.clone(), graphql_operation_to_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } @@ -773,6 +776,7 @@ fn endpoint_from_proto(endpoint: &NetworkEndpoint) -> EndpointProfile { .map(|(name, operation)| (name.clone(), graphql_operation_from_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } @@ -816,6 +820,8 @@ fn allow_to_proto(allow: &L7AllowProfile) -> L7Allow { operation_type: allow.operation_type.clone(), operation_name: allow.operation_name.clone(), fields: allow.fields.clone(), + rpc_method: String::new(), + params: HashMap::new(), } } @@ -848,6 +854,8 @@ fn deny_rule_to_proto(rule: &L7DenyRuleProfile) -> L7DenyRule { operation_type: rule.operation_type.clone(), operation_name: rule.operation_name.clone(), fields: rule.fields.clone(), + rpc_method: String::new(), + params: HashMap::new(), } } diff --git a/crates/openshell-sandbox/data/sandbox-policy.rego b/crates/openshell-sandbox/data/sandbox-policy.rego index afcd28863..38070911c 100644 --- a/crates/openshell-sandbox/data/sandbox-policy.rego +++ b/crates/openshell-sandbox/data/sandbox-policy.rego @@ -274,6 +274,15 @@ request_denied_for_endpoint(request, endpoint) if { command_matches(request.command, deny_rule.command) } +# --- L7 deny rule matching: JSON-RPC method + params --- + +request_denied_for_endpoint(request, endpoint) if { + some deny_rule + deny_rule := endpoint.deny_rules[_] + deny_rule.rpc_method + jsonrpc_rule_matches(request, deny_rule) +} + # --- L7 deny rule matching: GraphQL operation --- request_denied_for_endpoint(request, endpoint) if { @@ -417,6 +426,15 @@ request_allowed_for_endpoint(request, endpoint) if { command_matches(request.command, rule.allow.command) } +# --- L7 rule matching: JSON-RPC method --- + +request_allowed_for_endpoint(request, endpoint) if { + some rule + rule := endpoint.rules[_] + rule.allow.rpc_method + jsonrpc_rule_matches(request, rule.allow) +} + # --- L7 rule matching: GraphQL operation --- request_allowed_for_endpoint(request, endpoint) if { @@ -638,6 +656,35 @@ query_value_matches(value, matcher) if { glob.match(any_patterns[i], [], value) } +# JSON-RPC method and params matching. The sandbox flattens object params into +# dot-separated keys before policy evaluation, e.g. arguments.scope. +jsonrpc_rule_matches(request, rule) if { + jsonrpc := object.get(request, "jsonrpc", {}) + method := object.get(jsonrpc, "method", null) + method != null + glob.match(rule.rpc_method, [], method) + jsonrpc_params_match(jsonrpc, rule) +} + +jsonrpc_params_match(jsonrpc, rule) if { + param_rules := object.get(rule, "params", {}) + not jsonrpc_param_mismatch(jsonrpc, param_rules) +} + +jsonrpc_param_mismatch(jsonrpc, param_rules) if { + some key + matcher := param_rules[key] + not jsonrpc_param_key_matches(jsonrpc, key, matcher) +} + +jsonrpc_param_key_matches(jsonrpc, key, matcher) if { + params := object.get(jsonrpc, "params", {}) + value := object.get(params, key, null) + value != null + is_string(value) + query_value_matches(value, matcher) +} + # SQL command matching: "*" matches any; otherwise case-insensitive. command_matches(_, "*") if true diff --git a/crates/openshell-sandbox/src/l7/graphql.rs b/crates/openshell-sandbox/src/l7/graphql.rs index 82c35720e..77ec3b6fd 100644 --- a/crates/openshell-sandbox/src/l7/graphql.rs +++ b/crates/openshell-sandbox/src/l7/graphql.rs @@ -3,14 +3,14 @@ //! GraphQL-over-HTTP L7 inspection. -use crate::l7::provider::{BodyLength, L7Provider, L7Request}; +use crate::l7::provider::{L7Provider, L7Request}; use apollo_parser::Parser; use apollo_parser::cst; -use miette::{IntoDiagnostic, Result, miette}; +use miette::{Result, miette}; use serde::Serialize; use serde_json::Value; use std::collections::{HashMap, HashSet}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024; @@ -61,7 +61,7 @@ pub(crate) async fn inspect_graphql_request( ) -> Result { let header_str = header_str(request)?; reject_unsupported_headers(header_str)?; - let body = read_body_for_inspection(client, request, max_body_bytes).await?; + let body = crate::l7::http::read_body_for_inspection(client, request, max_body_bytes).await?; Ok(classify_request(request, &body)) } @@ -355,195 +355,6 @@ fn unique_persisted_query_id( Ok(selected.map(|(_, value)| value)) } -async fn read_body_for_inspection( - client: &mut C, - request: &mut L7Request, - max_body_bytes: usize, -) -> Result> { - let header_end = request - .raw_header - .windows(4) - .position(|w| w == b"\r\n\r\n") - .map_or(request.raw_header.len(), |p| p + 4); - let overflow = request.raw_header[header_end..].to_vec(); - - match request.body_length { - BodyLength::None => Ok(Vec::new()), - BodyLength::ContentLength(len) => { - let len = usize::try_from(len) - .map_err(|_| miette!("GraphQL request body length exceeds platform limit"))?; - if len > max_body_bytes { - return Err(miette!( - "GraphQL request body exceeds {max_body_bytes} byte inspection limit" - )); - } - if overflow.len() > len { - return Err(miette!( - "GraphQL request contains more body bytes than Content-Length" - )); - } - let remaining = len - overflow.len(); - let mut body = overflow; - if remaining > 0 { - let start = body.len(); - body.resize(len, 0); - client - .read_exact(&mut body[start..]) - .await - .into_diagnostic()?; - } - request.raw_header.truncate(header_end); - request.raw_header.extend_from_slice(&body); - Ok(body) - } - BodyLength::Chunked => { - let body = read_chunked_body_for_inspection( - client, - request, - header_end, - overflow, - max_body_bytes, - ) - .await?; - normalize_chunked_request_to_content_length(request, header_end, &body)?; - Ok(body) - } - } -} - -fn normalize_chunked_request_to_content_length( - request: &mut L7Request, - header_end: usize, - body: &[u8], -) -> Result<()> { - let header_str = std::str::from_utf8(&request.raw_header[..header_end]) - .map_err(|_| miette!("GraphQL HTTP headers contain invalid UTF-8"))?; - let header_str = header_str - .strip_suffix("\r\n\r\n") - .ok_or_else(|| miette!("GraphQL HTTP headers missing terminator"))?; - - let mut normalized = Vec::with_capacity(header_str.len() + body.len() + 32); - for (idx, line) in header_str.split("\r\n").enumerate() { - if idx > 0 { - let name = line - .split_once(':') - .map(|(name, _)| name.trim().to_ascii_lowercase()); - if matches!( - name.as_deref(), - Some("transfer-encoding" | "content-length" | "trailer") - ) { - continue; - } - } - normalized.extend_from_slice(line.as_bytes()); - normalized.extend_from_slice(b"\r\n"); - } - normalized.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()); - normalized.extend_from_slice(body); - - request.raw_header = normalized; - request.body_length = BodyLength::ContentLength(body.len() as u64); - Ok(()) -} - -async fn read_chunked_body_for_inspection( - client: &mut C, - request: &mut L7Request, - header_end: usize, - overflow: Vec, - max_body_bytes: usize, -) -> Result> { - let mut raw = overflow; - let mut decoded = Vec::new(); - let mut pos = 0usize; - - loop { - let size_line_end = loop { - if let Some(end) = find_crlf(&raw, pos) { - break end; - } - read_more(client, &mut raw, max_body_bytes).await?; - }; - let size_line = std::str::from_utf8(&raw[pos..size_line_end]) - .into_diagnostic() - .map_err(|_| miette!("Invalid UTF-8 in GraphQL chunk-size line"))?; - let size_token = size_line - .split(';') - .next() - .map(str::trim) - .unwrap_or_default(); - let chunk_size = usize::from_str_radix(size_token, 16) - .into_diagnostic() - .map_err(|_| miette!("Invalid GraphQL chunk size token: {size_token:?}"))?; - pos = size_line_end + 2; - - if decoded.len().saturating_add(chunk_size) > max_body_bytes { - return Err(miette!( - "GraphQL request body exceeds {max_body_bytes} byte inspection limit" - )); - } - - if chunk_size == 0 { - loop { - let trailer_end = loop { - if let Some(end) = find_crlf(&raw, pos) { - break end; - } - read_more(client, &mut raw, max_body_bytes).await?; - }; - let trailer_line = &raw[pos..trailer_end]; - pos = trailer_end + 2; - if trailer_line.is_empty() { - request.raw_header.truncate(header_end); - request.raw_header.extend_from_slice(&raw[..pos]); - return Ok(decoded); - } - } - } - - let chunk_end = pos - .checked_add(chunk_size) - .ok_or_else(|| miette!("GraphQL chunk size overflow"))?; - let chunk_with_crlf_end = chunk_end - .checked_add(2) - .ok_or_else(|| miette!("GraphQL chunk size overflow"))?; - while raw.len() < chunk_with_crlf_end { - read_more(client, &mut raw, max_body_bytes).await?; - } - decoded.extend_from_slice(&raw[pos..chunk_end]); - if raw.get(chunk_end..chunk_with_crlf_end) != Some(&b"\r\n"[..]) { - return Err(miette!("GraphQL chunk payload missing terminating CRLF")); - } - pos = chunk_with_crlf_end; - } -} - -async fn read_more( - client: &mut C, - raw: &mut Vec, - max_body_bytes: usize, -) -> Result<()> { - if raw.len() > max_body_bytes.saturating_mul(2).max(max_body_bytes) { - return Err(miette!( - "GraphQL chunked request body exceeds inspection framing limit" - )); - } - let mut buf = [0u8; 8192]; - let n = client.read(&mut buf).await.into_diagnostic()?; - if n == 0 { - return Err(miette!("GraphQL chunked body ended before terminator")); - } - raw.extend_from_slice(&buf[..n]); - Ok(()) -} - -fn find_crlf(buf: &[u8], start: usize) -> Option { - buf.get(start..)? - .windows(2) - .position(|w| w == b"\r\n") - .map(|p| start + p) -} - fn header_str(request: &L7Request) -> Result<&str> { let header_end = request .raw_header @@ -578,6 +389,7 @@ fn reject_unsupported_headers(headers: &str) -> Result<()> { #[cfg(test)] mod tests { use super::*; + use crate::l7::provider::BodyLength; fn request(method: &str, target: &str) -> L7Request { L7Request { @@ -810,6 +622,7 @@ network_policies: target: req.target, query_params: req.query_params, graphql: Some(info), + jsonrpc: None, }; let tunnel_engine = engine diff --git a/crates/openshell-sandbox/src/l7/http.rs b/crates/openshell-sandbox/src/l7/http.rs new file mode 100644 index 000000000..66269f6ba --- /dev/null +++ b/crates/openshell-sandbox/src/l7/http.rs @@ -0,0 +1,199 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared HTTP/1.1 request helpers for L7 protocols carried over HTTP. + +use crate::l7::provider::{BodyLength, L7Request}; +use miette::{IntoDiagnostic, Result, miette}; +use tokio::io::{AsyncRead, AsyncReadExt}; + +const READ_BUF_SIZE: usize = 8192; + +pub async fn read_body_for_inspection( + client: &mut C, + request: &mut L7Request, + max_body_bytes: usize, +) -> Result> { + let header_end = request + .raw_header + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(request.raw_header.len(), |p| p + 4); + let overflow = request.raw_header[header_end..].to_vec(); + + match request.body_length { + BodyLength::None => Ok(Vec::new()), + BodyLength::ContentLength(len) => { + let len = usize::try_from(len) + .map_err(|_| miette!("HTTP request body length exceeds platform limit"))?; + if len > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + if overflow.len() > len { + return Err(miette!( + "HTTP request contains more body bytes than Content-Length" + )); + } + let remaining = len - overflow.len(); + let mut body = overflow; + if remaining > 0 { + let start = body.len(); + body.resize(len, 0); + client + .read_exact(&mut body[start..]) + .await + .into_diagnostic()?; + } + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&body); + Ok(body) + } + BodyLength::Chunked => { + let body = read_chunked_body_for_inspection( + client, + request, + header_end, + overflow, + max_body_bytes, + ) + .await?; + normalize_chunked_request_to_content_length(request, header_end, &body)?; + Ok(body) + } + } +} + +fn normalize_chunked_request_to_content_length( + request: &mut L7Request, + header_end: usize, + body: &[u8], +) -> Result<()> { + let header_str = std::str::from_utf8(&request.raw_header[..header_end]) + .map_err(|_| miette!("HTTP headers contain invalid UTF-8"))?; + let header_str = header_str + .strip_suffix("\r\n\r\n") + .ok_or_else(|| miette!("HTTP headers missing terminator"))?; + + let mut normalized = Vec::with_capacity(header_str.len() + body.len() + 32); + for (idx, line) in header_str.split("\r\n").enumerate() { + if idx > 0 { + let name = line + .split_once(':') + .map(|(name, _)| name.trim().to_ascii_lowercase()); + if matches!( + name.as_deref(), + Some("transfer-encoding" | "content-length" | "trailer") + ) { + continue; + } + } + normalized.extend_from_slice(line.as_bytes()); + normalized.extend_from_slice(b"\r\n"); + } + normalized.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()); + normalized.extend_from_slice(body); + + request.raw_header = normalized; + request.body_length = BodyLength::ContentLength(body.len() as u64); + Ok(()) +} + +async fn read_chunked_body_for_inspection( + client: &mut C, + request: &mut L7Request, + header_end: usize, + overflow: Vec, + max_body_bytes: usize, +) -> Result> { + let mut raw = overflow; + let mut decoded = Vec::new(); + let mut pos = 0usize; + + loop { + let size_line_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let size_line = std::str::from_utf8(&raw[pos..size_line_end]) + .into_diagnostic() + .map_err(|_| miette!("Invalid UTF-8 in HTTP chunk-size line"))?; + let size_token = size_line + .split(';') + .next() + .map(str::trim) + .unwrap_or_default(); + let chunk_size = usize::from_str_radix(size_token, 16) + .into_diagnostic() + .map_err(|_| miette!("Invalid HTTP chunk size token: {size_token:?}"))?; + pos = size_line_end + 2; + + if decoded.len().saturating_add(chunk_size) > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + + if chunk_size == 0 { + loop { + let trailer_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let trailer_line = &raw[pos..trailer_end]; + pos = trailer_end + 2; + if trailer_line.is_empty() { + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&raw[..pos]); + return Ok(decoded); + } + } + } + + let chunk_end = pos + .checked_add(chunk_size) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + let chunk_with_crlf_end = chunk_end + .checked_add(2) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + while raw.len() < chunk_with_crlf_end { + read_more(client, &mut raw, max_body_bytes).await?; + } + decoded.extend_from_slice(&raw[pos..chunk_end]); + if raw.get(chunk_end..chunk_with_crlf_end) != Some(&b"\r\n"[..]) { + return Err(miette!("HTTP chunk payload missing terminating CRLF")); + } + pos = chunk_with_crlf_end; + } +} + +async fn read_more( + client: &mut C, + raw: &mut Vec, + max_body_bytes: usize, +) -> Result<()> { + if raw.len() > max_body_bytes.saturating_mul(2).max(max_body_bytes) { + return Err(miette!( + "HTTP chunked request body exceeds inspection framing limit" + )); + } + let mut buf = [0u8; READ_BUF_SIZE]; + let n = client.read(&mut buf).await.into_diagnostic()?; + if n == 0 { + return Err(miette!("HTTP chunked body ended before terminator")); + } + raw.extend_from_slice(&buf[..n]); + Ok(()) +} + +fn find_crlf(buf: &[u8], start: usize) -> Option { + buf.get(start..)? + .windows(2) + .position(|w| w == b"\r\n") + .map(|p| start + p) +} diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs new file mode 100644 index 000000000..563767200 --- /dev/null +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -0,0 +1,268 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! JSON-RPC 2.0 over HTTP L7 inspection. + +use miette::Result; +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; +use std::collections::HashMap; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::l7::provider::{L7Provider, L7Request}; + +pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024; + +pub struct JsonRpcHttpRequest { + pub request: L7Request, + pub info: JsonRpcRequestInfo, +} + +pub(crate) async fn parse_jsonrpc_http_request( + client: &mut C, + max_body_bytes: usize, + canonicalize_options: crate::l7::path::CanonicalizeOptions, +) -> Result> { + let provider = crate::l7::rest::RestProvider::with_options(canonicalize_options); + let Some(mut request) = provider.parse_request(client).await? else { + return Ok(None); + }; + let body = + crate::l7::http::read_body_for_inspection(client, &mut request, max_body_bytes).await?; + let info = parse_jsonrpc_body(&body); + Ok(Some(JsonRpcHttpRequest { request, info })) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcRequestInfo { + pub calls: Vec, + pub is_batch: bool, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcCallInfo { + pub method: String, + pub params: HashMap, +} + +impl JsonRpcRequestInfo { + pub(crate) fn params_sha256(&self) -> Option { + if self.is_batch { + if self.calls.is_empty() || self.calls.iter().all(|call| call.params.is_empty()) { + return None; + } + let canonical_params = self + .calls + .iter() + .map(|call| canonical_params_map(&call.params)) + .collect::>(); + return Some(sha256_json(&canonical_params)); + } + + let call = self.calls.first()?; + if call.params.is_empty() { + return None; + } + Some(sha256_json(&canonical_params_map(&call.params))) + } +} +/// Parse a JSON-RPC 2.0 request body and extract the `method` field. +/// +/// Returns an info struct with `method` set on success, or `error` set if the +/// body is not valid JSON-RPC 2.0. +pub fn parse_jsonrpc_body(body: &[u8]) -> JsonRpcRequestInfo { + let Ok(value) = serde_json::from_slice::(body) else { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: false, + error: Some("invalid JSON".to_string()), + }; + }; + + if let serde_json::Value::Array(items) = value { + if items.is_empty() { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some("empty batch".to_string()), + }; + } + let mut calls = Vec::new(); + for item in &items { + let Some(call) = parse_jsonrpc_call(item) else { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some("batch item missing or non-string 'method' field".to_string()), + }; + }; + calls.push(call); + } + return JsonRpcRequestInfo { + calls, + is_batch: true, + error: None, + }; + } + + let Some(call) = parse_jsonrpc_call(&value) else { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: false, + error: Some("missing or non-string 'method' field".to_string()), + }; + }; + JsonRpcRequestInfo { + calls: vec![call], + is_batch: false, + error: None, + } +} + +fn parse_jsonrpc_call(value: &serde_json::Value) -> Option { + let method = value.get("method").and_then(|m| m.as_str())?; + Some(JsonRpcCallInfo { + method: method.to_string(), + params: value + .get("params") + .map_or_else(HashMap::new, flatten_jsonrpc_params), + }) +} + +fn flatten_jsonrpc_params(value: &serde_json::Value) -> HashMap { + let mut params = HashMap::new(); + flatten_json_value("", value, &mut params); + params +} + +fn canonical_params_map(params: &HashMap) -> BTreeMap { + params + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect() +} + +fn sha256_json(value: &impl serde::Serialize) -> String { + let encoded = serde_json::to_vec(value).expect("canonical JSON-RPC params should serialize"); + hex::encode(Sha256::digest(&encoded)) +} + +fn flatten_json_value(prefix: &str, value: &serde_json::Value, out: &mut HashMap) { + match value { + serde_json::Value::Object(map) => { + for (key, child) in map { + let next = if prefix.is_empty() { + key.clone() + } else { + format!("{prefix}.{key}") + }; + flatten_json_value(&next, child, out); + } + } + serde_json::Value::String(s) if !prefix.is_empty() => { + out.insert(prefix.to_string(), s.clone()); + } + serde_json::Value::Number(n) if !prefix.is_empty() => { + out.insert(prefix.to_string(), n.to_string()); + } + serde_json::Value::Bool(b) if !prefix.is_empty() => { + out.insert(prefix.to_string(), b.to_string()); + } + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_method_from_request_body() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#; + let info = parse_jsonrpc_body(body); + assert_eq!( + info.calls.first().map(|call| call.method.as_str()), + Some("initialize") + ); + assert_eq!(info.calls.len(), 1); + assert!(!info.is_batch); + assert!(info.error.is_none()); + } + + #[test] + fn flattens_object_params_for_policy_matching() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#; + let info = parse_jsonrpc_body(body); + let params = &info.calls.first().expect("single request call").params; + assert_eq!( + params.get("name").map(String::as_str), + Some("submit_report") + ); + assert_eq!( + params.get("arguments.scope").map(String::as_str), + Some("workspace/main") + ); + } + + #[test] + fn parses_valid_batch_without_error() { + let body = br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#; + let info = parse_jsonrpc_body(body); + assert!(info.error.is_none()); + assert!(info.is_batch); + assert_eq!(info.calls.len(), 2); + assert_eq!(info.calls[0].method, "tools/list"); + assert_eq!(info.calls[1].method, "tools/call"); + assert_eq!( + info.calls[1].params.get("name").map(String::as_str), + Some("read_status") + ); + } + + #[test] + fn params_digest_is_canonical_and_redacted() { + let first = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#, + ); + let reordered = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"arguments":{"scope":"workspace/main"},"name":"submit_report"}}"#, + ); + let changed = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/other"}}}"#, + ); + + let digest = first.params_sha256().expect("params digest"); + assert_eq!(Some(digest.as_str()), reordered.params_sha256().as_deref()); + assert_ne!(Some(digest.as_str()), changed.params_sha256().as_deref()); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("workspace/main")); + assert!(!digest.contains("submit_report")); + } + + #[test] + fn batch_params_digest_covers_call_params_without_raw_values() { + let batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}} + ]"#, + ); + let empty_batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"initialize"} + ]"#, + ); + + let digest = batch.params_sha256().expect("batch params digest"); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("blocked_action")); + assert!(empty_batch.params_sha256().is_none()); + } +} diff --git a/crates/openshell-sandbox/src/l7/mod.rs b/crates/openshell-sandbox/src/l7/mod.rs index 365bb379a..14fee09da 100644 --- a/crates/openshell-sandbox/src/l7/mod.rs +++ b/crates/openshell-sandbox/src/l7/mod.rs @@ -9,7 +9,9 @@ //! evaluated against OPA policy, and either forwarded or denied. pub mod graphql; +pub(crate) mod http; pub mod inference; +pub mod jsonrpc; pub mod path; pub mod provider; pub mod relay; @@ -25,6 +27,7 @@ pub enum L7Protocol { Websocket, Graphql, Sql, + JsonRpc, } impl L7Protocol { @@ -34,6 +37,7 @@ impl L7Protocol { "websocket" => Some(Self::Websocket), "graphql" => Some(Self::Graphql), "sql" => Some(Self::Sql), + "json-rpc" => Some(Self::JsonRpc), _ => None, } } @@ -76,6 +80,8 @@ pub struct L7EndpointConfig { pub enforcement: EnforcementMode, /// Maximum GraphQL request body bytes to buffer for inspection. pub graphql_max_body_bytes: usize, + /// Maximum JSON-RPC request body bytes to buffer for inspection. + pub json_rpc_max_body_bytes: usize, /// When true, percent-encoded `/` (`%2F`) is preserved in path segments /// rather than rejected at the parser. Needed by upstreams like GitLab /// that embed `%2F` in namespaced project paths. Defaults to false. @@ -110,6 +116,8 @@ pub struct L7RequestInfo { pub query_params: std::collections::HashMap>, /// Parsed GraphQL operation metadata for GraphQL endpoints. pub graphql: Option, + /// Parsed JSON-RPC request metadata for JSON-RPC endpoints. + pub jsonrpc: Option, } /// Parse an L7 endpoint config from a regorus Value (returned by Rego query). @@ -165,6 +173,10 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { .and_then(|v| usize::try_from(v).ok()) .filter(|v| *v > 0) .unwrap_or(graphql::DEFAULT_MAX_BODY_BYTES); + let json_rpc_max_body_bytes = get_object_u64(val, "json_rpc_max_body_bytes") + .and_then(|v| usize::try_from(v).ok()) + .filter(|v| *v > 0) + .unwrap_or(jsonrpc::DEFAULT_MAX_BODY_BYTES); Some(L7EndpointConfig { protocol, @@ -172,6 +184,7 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { tls, enforcement, graphql_max_body_bytes, + json_rpc_max_body_bytes, allow_encoded_slash, websocket_credential_rewrite, request_body_credential_rewrite, @@ -598,7 +611,7 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< if !protocol.is_empty() && L7Protocol::parse(protocol).is_none() { errors.push(format!( - "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, or sql)" + "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, sql, or json-rpc)" )); } @@ -624,6 +637,18 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< } } + if ep.get("json_rpc_max_body_bytes").is_some() { + let valid_max = ep + .get("json_rpc_max_body_bytes") + .and_then(serde_json::Value::as_u64) + .is_some_and(|v| v > 0); + if !valid_max { + errors.push(format!( + "{loc}: json_rpc_max_body_bytes must be a positive integer" + )); + } + } + if protocol != "graphql" && protocol != "websocket" && (ep.get("persisted_queries").is_some() @@ -635,6 +660,12 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< )); } + if protocol != "json-rpc" && ep.get("json_rpc_max_body_bytes").is_some() { + warnings.push(format!( + "{loc}: JSON-RPC-specific endpoint fields are ignored unless protocol is json-rpc" + )); + } + if ep .get("websocket_credential_rewrite") .and_then(serde_json::Value::as_bool) @@ -1195,6 +1226,46 @@ mod tests { assert_eq!(config.protocol, L7Protocol::Rest); assert_eq!(config.tls, TlsMode::Auto); assert_eq!(config.enforcement, EnforcementMode::Audit); + assert_eq!( + config.json_rpc_max_body_bytes, + jsonrpc::DEFAULT_MAX_BODY_BYTES + ); + } + + #[test] + fn parse_l7_config_jsonrpc_max_body_bytes() { + let val = regorus::Value::from_json_str( + r#"{"protocol": "json-rpc", "host": "mcp.example.com", "port": 443, "json_rpc_max_body_bytes": 131072}"#, + ) + .unwrap(); + let config = parse_l7_config(&val).unwrap(); + assert_eq!(config.protocol, L7Protocol::JsonRpc); + assert_eq!(config.json_rpc_max_body_bytes, 131_072); + } + + #[test] + fn validate_jsonrpc_max_body_bytes_must_be_positive() { + let data = serde_json::json!({ + "network_policies": { + "test": { + "endpoints": [{ + "host": "mcp.example.com", + "port": 443, + "protocol": "json-rpc", + "access": "full", + "json_rpc_max_body_bytes": 0 + }], + "binaries": [] + } + } + }); + let (errors, _warnings) = validate_l7_policies(&data); + assert!( + errors + .iter() + .any(|e| e.contains("json_rpc_max_body_bytes must be a positive integer")), + "should reject non-positive JSON-RPC max body size, got errors: {errors:?}" + ); } #[test] diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 40b002535..dfc2ee389 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -178,6 +178,7 @@ where .into_diagnostic()?; Ok(()) } + L7Protocol::JsonRpc => relay_jsonrpc(config, &engine, client, upstream, ctx).await, } } @@ -297,6 +298,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: graphql_info.clone(), + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -341,7 +343,7 @@ where let engine_type = match config.protocol { L7Protocol::Graphql => "l7-graphql", L7Protocol::Websocket => "l7-websocket", - L7Protocol::Rest | L7Protocol::Sql => "l7", + L7Protocol::Rest | L7Protocol::Sql | L7Protocol::JsonRpc => "l7", }; emit_l7_request_log( ctx, @@ -694,6 +696,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: None, + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -885,6 +888,173 @@ fn close_if_stale(guard: &PolicyGenerationGuard, ctx: &L7EvalContext) -> bool { true } +async fn relay_jsonrpc( + config: &L7EndpointConfig, + engine: &TunnelPolicyEngine, + client: &mut C, + upstream: &mut U, + ctx: &L7EvalContext, +) -> Result<()> +where + C: AsyncRead + AsyncWrite + Unpin + Send, + U: AsyncRead + AsyncWrite + Unpin + Send, +{ + loop { + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let parsed = match crate::l7::jsonrpc::parse_jsonrpc_http_request( + client, + config.json_rpc_max_body_bytes, + crate::l7::path::CanonicalizeOptions { + allow_encoded_slash: config.allow_encoded_slash, + ..Default::default() + }, + ) + .await + { + Ok(Some(parsed)) => parsed, + Ok(None) => return Ok(()), + Err(e) => { + if is_benign_connection_error(&e) { + debug!( + host = %ctx.host, + port = ctx.port, + error = %e, + "JSON-RPC L7 connection closed" + ); + } else { + let detail = + parse_rejection_detail(&e.to_string(), ParseRejectionMode::L7Endpoint); + emit_parse_rejection(ctx, &detail, "l7-jsonrpc"); + } + return Ok(()); + } + }; + + let req = parsed.request; + let jsonrpc_info = parsed.info; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let redacted_target = req.target.clone(); + + let request_info = L7RequestInfo { + action: req.action.clone(), + target: redacted_target.clone(), + query_params: req.query_params.clone(), + graphql: None, + jsonrpc: Some(jsonrpc_info.clone()), + }; + + let parse_error_reason = jsonrpc_info + .error + .as_deref() + .map(|e| format!("JSON-RPC request rejected: {e}")); + let force_deny = parse_error_reason.is_some(); + let (allowed, reason, jsonrpc_log_info) = if let Some(reason) = parse_error_reason { + (false, reason, jsonrpc_info.clone()) + } else { + let evaluation = + evaluate_jsonrpc_l7_request_for_log(engine, ctx, &request_info, &jsonrpc_info)?; + (evaluation.allowed, evaluation.reason, evaluation.log_info) + }; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let decision_str = match (allowed, config.enforcement) { + (_, _) if force_deny => "deny", + (true, _) => "allow", + (false, EnforcementMode::Audit) => "audit", + (false, EnforcementMode::Enforce) => "deny", + }; + + { + let (action_id, disposition_id, severity) = match decision_str { + "deny" => (ActionId::Denied, DispositionId::Blocked, SeverityId::Medium), + _ => ( + ActionId::Allowed, + DispositionId::Allowed, + SeverityId::Informational, + ), + }; + let endpoint = format!("{}:{}{}", ctx.host, ctx.port, redacted_target); + let params_sha256 = jsonrpc_log_info + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let policy_version = engine.captured_generation(); + let event = HttpActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Other) + .action(action_id) + .disposition(disposition_id) + .severity(severity) + .http_request(HttpRequest::new( + &request_info.action, + OcsfUrl::new("http", &ctx.host, &redacted_target, ctx.port), + )) + .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) + .firewall_rule(&ctx.policy_name, "l7-jsonrpc") + .message(jsonrpc_log_message( + decision_str, + &request_info.action, + &endpoint, + &jsonrpc_log_info, + ¶ms_sha256, + policy_version, + &reason, + )) + .build(); + ocsf_emit!(event); + } + + if allowed || (config.enforcement == EnforcementMode::Audit && !force_deny) { + let outcome = crate::l7::rest::relay_http_request_with_resolver_guarded( + &req, + client, + upstream, + ctx.secret_resolver.as_deref(), + Some(engine.generation_guard()), + ) + .await?; + match outcome { + RelayOutcome::Reusable => {} + RelayOutcome::Consumed => { + debug!( + host = %ctx.host, + port = ctx.port, + "Upstream connection not reusable, closing JSON-RPC L7 relay" + ); + return Ok(()); + } + RelayOutcome::Upgraded { .. } => { + return Ok(()); + } + } + } else { + crate::l7::rest::RestProvider::default() + .deny_with_redacted_target( + &req, + &ctx.policy_name, + &reason, + client, + Some(&redacted_target), + Some(crate::l7::rest::DenyResponseContext { + host: Some(&ctx.host), + port: Some(ctx.port), + binary: Some(&ctx.binary_path), + }), + ) + .await?; + return Ok(()); + } + } +} + async fn relay_graphql( config: &L7EndpointConfig, engine: &TunnelPolicyEngine, @@ -962,6 +1132,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: Some(graphql_info.clone()), + jsonrpc: None, }; // Malformed or ambiguous GraphQL requests, such as duplicated GET @@ -1110,6 +1281,38 @@ fn graphql_log_summary(info: &crate::l7::graphql::GraphqlRequestInfo) -> String format!("graphql_ops={}", ops.join(";")) } +fn jsonrpc_log_message( + decision: &str, + http_method: &str, + endpoint: &str, + info: &crate::l7::jsonrpc::JsonRpcRequestInfo, + params_sha256: &str, + policy_version: u64, + reason: &str, +) -> String { + let rpc_methods = jsonrpc_methods_for_log(info); + format!( + "JSONRPC_L7_REQUEST decision={decision} http_method={http_method} endpoint={endpoint} rpc_methods={rpc_methods} params_sha256={params_sha256} policy_version={policy_version} reason={reason}" + ) +} + +fn jsonrpc_methods_for_log(info: &crate::l7::jsonrpc::JsonRpcRequestInfo) -> String { + if info.calls.is_empty() { + return "-".to_string(); + } + info.calls + .iter() + .map(|call| call.method.as_str()) + .collect::>() + .join(",") +} + +struct JsonRpcEvaluation { + allowed: bool, + reason: String, + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo, +} + /// Check if a miette error represents a benign connection close. /// /// TLS handshake EOF, missing `close_notify`, connection resets, and broken @@ -1135,6 +1338,88 @@ pub fn evaluate_l7_request( engine: &TunnelPolicyEngine, ctx: &L7EvalContext, request: &L7RequestInfo, +) -> Result<(bool, String)> { + if let Some(jsonrpc) = &request.jsonrpc + && jsonrpc.is_batch + && !jsonrpc.calls.is_empty() + { + for call in &jsonrpc.calls { + let item_request = jsonrpc_request_for_call(request, call); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + return Ok((false, reason)); + } + } + return Ok((true, String::new())); + } + + evaluate_l7_request_once(engine, ctx, request) +} + +fn evaluate_jsonrpc_l7_request_for_log( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, + jsonrpc: &crate::l7::jsonrpc::JsonRpcRequestInfo, +) -> Result { + if jsonrpc.is_batch && !jsonrpc.calls.is_empty() { + let mut denied_calls = Vec::new(); + let mut first_denied_reason = None; + for call in &jsonrpc.calls { + let item_request = jsonrpc_request_for_call(request, call); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + if first_denied_reason.is_none() { + first_denied_reason = Some(reason); + } + denied_calls.push(call.clone()); + } + } + + if denied_calls.is_empty() { + return Ok(JsonRpcEvaluation { + allowed: true, + reason: String::new(), + log_info: jsonrpc.clone(), + }); + } + + return Ok(JsonRpcEvaluation { + allowed: false, + reason: first_denied_reason.unwrap_or_else(|| "request denied by policy".to_string()), + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: denied_calls, + is_batch: true, + error: None, + }, + }); + } + + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, request)?; + Ok(JsonRpcEvaluation { + allowed, + reason, + log_info: jsonrpc.clone(), + }) +} + +fn jsonrpc_request_for_call( + request: &L7RequestInfo, + call: &crate::l7::jsonrpc::JsonRpcCallInfo, +) -> L7RequestInfo { + let mut item_request = request.clone(); + item_request.jsonrpc = Some(crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: vec![call.clone()], + is_batch: false, + error: None, + }); + item_request +} + +fn evaluate_l7_request_once( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, ) -> Result<(bool, String)> { if engine.is_stale() { return Err(miette!( @@ -1159,6 +1444,14 @@ pub fn evaluate_l7_request( "path": request.target, "query_params": request.query_params.clone(), "graphql": request.graphql.clone(), + "jsonrpc": request.jsonrpc.as_ref().map(|j| { + let call = if j.is_batch { None } else { j.calls.first() }; + serde_json::json!({ + "method": call.map(|call| call.method.as_str()), + "params": call.map(|call| call.params.clone()).unwrap_or_default(), + "error": j.error, + }) + }), } }); @@ -1792,6 +2085,7 @@ network_policies: target: "/ws".into(), query_params: std::collections::HashMap::new(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); @@ -1800,6 +2094,180 @@ network_policies: assert!(reason.contains("WEBSOCKET_TEXT /ws not permitted")); } + #[test] + fn jsonrpc_batch_evaluates_each_call() { + let data = r#" +network_policies: + jsonrpc_api: + name: jsonrpc_api + endpoints: + - host: api.example.test + port: 443 + protocol: json-rpc + enforcement: enforce + rules: + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/list" + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/call" + params: + name: read_status + deny_rules: + - rpc_method: "tools/call" + params: + name: blocked_action + - rpc_method: "tools/delete" + binaries: + - { path: /usr/bin/node } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let tunnel_engine = engine + .clone_engine_for_tunnel(engine.current_generation()) + .unwrap(); + let ctx = L7EvalContext { + host: "api.example.test".into(), + port: 443, + policy_name: "jsonrpc_api".into(), + binary_path: "/usr/bin/node".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + let mut request = L7RequestInfo { + action: "POST".into(), + target: "/mcp".into(), + query_params: std::collections::HashMap::new(), + graphql: None, + jsonrpc: Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#, + )), + }; + + let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(allowed, "{reason}"); + + request.jsonrpc = Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}}, + {"jsonrpc":"2.0","id":3,"method":"tools/delete","params":{"name":"purge_cache"}} + ]"#, + )); + let (allowed, _) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(!allowed); + + let jsonrpc = request.jsonrpc.as_ref().expect("jsonrpc request"); + let evaluation = + evaluate_jsonrpc_l7_request_for_log(&tunnel_engine, &ctx, &request, jsonrpc).unwrap(); + assert!(!evaluation.allowed); + assert!(evaluation.log_info.is_batch); + assert_eq!( + jsonrpc_methods_for_log(&evaluation.log_info), + "tools/call,tools/delete" + ); + + let full_params_sha256 = jsonrpc.params_sha256().expect("full batch params digest"); + let log_params_sha256 = evaluation + .log_info + .params_sha256() + .expect("logged batch params digest"); + assert_ne!(full_params_sha256, log_params_sha256); + let message = jsonrpc_log_message( + "deny", + "POST", + "api.example.test:443/mcp", + &evaluation.log_info, + &log_params_sha256, + 42, + &evaluation.reason, + ); + assert!(message.contains("rpc_methods=tools/call,tools/delete")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("tools/list")); + assert!(!message.contains("blocked_action")); + assert!(!message.contains("purge_cache")); + } + + #[test] + fn jsonrpc_log_records_digest_not_args() { + let info = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"delete_resource","arguments":{"scope":"secret-scope"}}}"#, + ); + let params_sha256 = info.params_sha256().expect("params digest"); + let message = jsonrpc_log_message( + "deny", + "POST", + "mcp.example.com:443/mcp", + &info, + ¶ms_sha256, + 42, + "request denied by policy", + ); + + assert!(message.contains("endpoint=mcp.example.com:443/mcp")); + assert!(message.contains("rpc_methods=tools/call")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("delete_resource")); + assert!(!message.contains("secret-scope")); + + let batch = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"delete_resource"}} + ]"#, + ); + let batch_params_sha256 = batch.params_sha256().expect("batch params digest"); + let batch_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &batch, + &batch_params_sha256, + 43, + "", + ); + + assert!(batch_message.starts_with("JSONRPC_L7_REQUEST ")); + assert!(batch_message.contains("rpc_methods=tools/list,tools/call")); + assert!(batch_message.contains("params_sha256=")); + assert!(!batch_message.contains("params_sha256=sha256:")); + assert!(batch_message.contains("policy_version=43")); + assert!(!batch_message.contains("rpc_method=")); + assert!(!batch_message.contains("delete_resource")); + + let no_params = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"initialize"}"#, + ); + let no_params_sha256 = no_params + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let no_params_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &no_params, + &no_params_sha256, + 44, + "", + ); + assert!(no_params_message.contains("rpc_methods=initialize")); + assert!(no_params_message.contains("params_sha256=")); + } + #[tokio::test] async fn route_selected_websocket_upgrade_rejects_invalid_accept_without_forwarding_101() { let data = r#" @@ -1828,6 +2296,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -1931,6 +2400,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2051,6 +2521,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2407,4 +2878,100 @@ network_policies: "stale passthrough request must not be forwarded upstream" ); } + + #[tokio::test] + async fn jsonrpc_relay_denies_method_not_in_allow_list() { + let data = r" +network_policies: + mcp_api: + name: mcp_api + endpoints: + - host: mcp.example.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: initialize + binaries: + - { path: /usr/bin/python3 } +"; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let input = NetworkInput { + host: "mcp.example.test".into(), + port: 8000, + binary_path: PathBuf::from("/usr/bin/python3"), + binary_sha256: "unused".into(), + ancestors: vec![], + cmdline_paths: vec![], + }; + let (endpoint_config, generation) = engine + .query_endpoint_config_with_generation(&input) + .unwrap(); + let config = crate::l7::parse_l7_config(&endpoint_config.unwrap()).unwrap(); + let tunnel_engine = engine.clone_engine_for_tunnel(generation).unwrap(); + let ctx = L7EvalContext { + host: "mcp.example.test".into(), + port: 8000, + policy_name: "mcp_api".into(), + binary_path: "/usr/bin/python3".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + + let (mut app, mut relay_client) = tokio::io::duplex(8192); + let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192); + let relay = tokio::spawn(async move { + relay_with_inspection( + &config, + tunnel_engine, + &mut relay_client, + &mut relay_upstream, + &ctx, + ) + .await + }); + + let body = + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_repos"}}"#; + let request = format!( + "POST /mcp HTTP/1.1\r\nHost: mcp.example.test:8000\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + app.write_all(request.as_bytes()).await.unwrap(); + app.write_all(body).await.unwrap(); + + let mut response = [0u8; 512]; + let n = tokio::time::timeout(std::time::Duration::from_secs(2), app.read(&mut response)) + .await + .expect("relay should respond without reaching upstream") + .unwrap(); + let response = String::from_utf8_lossy(&response[..n]); + assert!( + response.contains("403"), + "tools/call not in allow list must be denied with 403, got: {response:?}" + ); + + let mut upstream_buf = [0u8; 128]; + let n = tokio::time::timeout( + std::time::Duration::from_millis(100), + upstream.read(&mut upstream_buf), + ) + .await + .unwrap_or(Ok(0)) + .unwrap_or(0); + assert_eq!(n, 0, "denied request must not be forwarded to upstream"); + + drop(app); + tokio::time::timeout(std::time::Duration::from_secs(1), relay) + .await + .expect("relay should complete") + .unwrap() + .unwrap(); + } } diff --git a/crates/openshell-sandbox/src/l7/websocket.rs b/crates/openshell-sandbox/src/l7/websocket.rs index 79c820e26..70c11f330 100644 --- a/crates/openshell-sandbox/src/l7/websocket.rs +++ b/crates/openshell-sandbox/src/l7/websocket.rs @@ -545,6 +545,7 @@ fn inspect_websocket_text_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(inspector.engine, inspector.ctx, &request_info)?; let decision = match (allowed, inspector.enforcement) { @@ -581,6 +582,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; emit_websocket_l7_event( host, @@ -602,6 +604,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: Some(graphql.clone()), + jsonrpc: None, }; let parse_error_reason = graphql .error diff --git a/crates/openshell-sandbox/src/mechanistic_mapper.rs b/crates/openshell-sandbox/src/mechanistic_mapper.rs index ba7c51de9..bbe7b93b8 100644 --- a/crates/openshell-sandbox/src/mechanistic_mapper.rs +++ b/crates/openshell-sandbox/src/mechanistic_mapper.rs @@ -355,6 +355,8 @@ fn build_l7_rules(samples: &HashMap<(String, String), u32>) -> Vec { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }), }); } diff --git a/crates/openshell-sandbox/src/opa.rs b/crates/openshell-sandbox/src/opa.rs index f73f3bc14..ac50340d1 100644 --- a/crates/openshell-sandbox/src/opa.rs +++ b/crates/openshell-sandbox/src/opa.rs @@ -923,6 +923,24 @@ fn resolve_binary_in_container(_policy_path: &str, _entrypoint_pid: u32) -> Opti None } +fn l7_matchers_to_json( + matchers: &std::collections::HashMap, +) -> serde_json::Map { + matchers + .iter() + .map(|(key, matcher)| { + let mut matcher_json = serde_json::json!({}); + if !matcher.glob.is_empty() { + matcher_json["glob"] = matcher.glob.clone().into(); + } + if !matcher.any.is_empty() { + matcher_json["any"] = matcher.any.clone().into(); + } + (key.clone(), matcher_json) + }) + .collect() +} + /// Convert typed proto policy fields to JSON suitable for `engine.add_data_json()`. /// /// The rego rules reference `data.*` directly, so the JSON structure has @@ -1021,35 +1039,25 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St "command": a.map_or("", |a| &a.command), "operation_type": a.map_or("", |a| &a.operation_type), "operation_name": a.map_or("", |a| &a.operation_name), + "rpc_method": a.map_or("", |a| &a.rpc_method), }); if let Some(a) = a && !a.fields.is_empty() { allow["fields"] = a.fields.clone().into(); } - let query: serde_json::Map = a - .map(|allow| { - allow - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = - matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = - matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect() - }) - .unwrap_or_default(); + let query = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.query) + }); if !query.is_empty() { allow["query"] = query.into(); } + let params = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.params) + }); + if !params.is_empty() { + allow["params"] = params.into(); + } serde_json::json!({ "allow": allow }) }) .collect(); @@ -1085,23 +1093,17 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if !d.fields.is_empty() { deny["fields"] = d.fields.clone().into(); } - let query: serde_json::Map = d - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect(); + if !d.rpc_method.is_empty() { + deny["rpc_method"] = d.rpc_method.clone().into(); + } + let query = l7_matchers_to_json(&d.query); if !query.is_empty() { deny["query"] = query.into(); } + let params = l7_matchers_to_json(&d.params); + if !params.is_empty() { + deny["params"] = params.into(); + } deny }) .collect(); @@ -1139,6 +1141,9 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if e.graphql_max_body_bytes > 0 { ep["graphql_max_body_bytes"] = e.graphql_max_body_bytes.into(); } + if e.json_rpc_max_body_bytes > 0 { + ep["json_rpc_max_body_bytes"] = e.json_rpc_max_body_bytes.into(); + } ep }) .collect(); @@ -1946,6 +1951,36 @@ process: }) } + fn l7_jsonrpc_input(host: &str, port: u16, path: &str, rpc_method: &str) -> serde_json::Value { + l7_jsonrpc_input_with_params(host, port, path, rpc_method, serde_json::json!({})) + } + + fn l7_jsonrpc_input_with_params( + host: &str, + port: u16, + path: &str, + rpc_method: &str, + params: serde_json::Value, + ) -> serde_json::Value { + serde_json::json!({ + "network": { "host": host, "port": port }, + "exec": { + "path": "/usr/bin/curl", + "ancestors": [], + "cmdline_paths": [] + }, + "request": { + "method": "POST", + "path": path, + "query_params": {}, + "jsonrpc": { + "method": rpc_method, + "params": params + } + } + }) + } + fn l7_graphql_input(host: &str, operations: serde_json::Value) -> serde_json::Value { serde_json::json!({ "network": { "host": host, "port": 443 }, @@ -2492,6 +2527,8 @@ network_policies: operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: std::collections::HashMap::new(), }), }], ..Default::default() @@ -2540,6 +2577,140 @@ network_policies: assert!(!eval_l7(&engine, &deny_input)); } + #[test] + fn l7_jsonrpc_rpc_method_from_proto_is_enforced() { + let mut network_policies = std::collections::HashMap::new(); + network_policies.insert( + "jsonrpc_proto".to_string(), + NetworkPolicyRule { + name: "jsonrpc_proto".to_string(), + endpoints: vec![NetworkEndpoint { + host: "mcp.proto.com".to_string(), + port: 8000, + path: "/mcp".to_string(), + protocol: "json-rpc".to_string(), + enforcement: "enforce".to_string(), + rules: vec![L7Rule { + allow: Some(L7Allow { + method: String::new(), + path: String::new(), + command: String::new(), + query: std::collections::HashMap::new(), + operation_type: String::new(), + operation_name: String::new(), + fields: Vec::new(), + rpc_method: "initialize".to_string(), + params: std::collections::HashMap::new(), + }), + }], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }, + ); + + let proto = ProtoSandboxPolicy { + version: 1, + filesystem: Some(ProtoFs { + include_workdir: true, + read_only: vec![], + read_write: vec![], + }), + landlock: Some(openshell_core::proto::LandlockPolicy { + compatibility: "best_effort".to_string(), + }), + process: Some(ProtoProc { + run_as_user: "sandbox".to_string(), + run_as_group: "sandbox".to_string(), + }), + network_policies, + }; + + let engine = OpaEngine::from_proto(&proto).expect("engine from proto"); + let allow_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "initialize"); + assert!(eval_l7(&engine, &allow_input)); + + let deny_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "tools/list"); + assert!(!eval_l7(&engine, &deny_input)); + } + + #[test] + fn l7_jsonrpc_params_rules_filter_tools_call() { + let data = r#" +network_policies: + jsonrpc_params: + name: jsonrpc_params + endpoints: + - host: mcp.params.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - { path: /usr/bin/curl } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).expect("engine from yaml"); + + let read_status = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "read_status"}), + ); + assert!(eval_l7(&engine, &read_status)); + + let submit_report = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "submit_report", + "arguments.scope": "workspace/main" + }), + ); + assert!(eval_l7(&engine, &submit_report)); + + let blocked_without_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "blocked_action"}), + ); + assert!(!eval_l7(&engine, &blocked_without_args)); + + let blocked_with_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "blocked_action", + "arguments.reason": "test" + }), + ); + assert!(!eval_l7(&engine, &blocked_with_args)); + } + #[test] fn l7_no_request_on_l4_only_endpoint() { // L4-only endpoint should not match L7 allow_request diff --git a/crates/openshell-sandbox/src/policy_local.rs b/crates/openshell-sandbox/src/policy_local.rs index d65bce324..aa270d017 100644 --- a/crates/openshell-sandbox/src/policy_local.rs +++ b/crates/openshell-sandbox/src/policy_local.rs @@ -1083,6 +1083,8 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }), }) .collect(); @@ -1097,6 +1099,8 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), + params: HashMap::new(), }) .collect(); @@ -1120,6 +1124,7 @@ fn network_endpoint_from_json( persisted_queries: String::new(), graphql_persisted_queries: HashMap::new(), graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: 0, path: String::new(), }) } diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 39620657d..a45f1bc60 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -3394,11 +3394,63 @@ async fn handle_forward_proxy( } else { None }; + let jsonrpc = if l7_config.config.protocol == crate::l7::L7Protocol::JsonRpc { + let header_end = forward_request_bytes + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(forward_request_bytes.len(), |p| p + 4); + let header_str = std::str::from_utf8(&forward_request_bytes[..header_end]) + .map_err(|_| miette::miette!("Forward JSON-RPC headers contain invalid UTF-8"))?; + let body_length = crate::l7::rest::parse_body_length(header_str)?; + let mut jsonrpc_request = crate::l7::provider::L7Request { + action: method.to_string(), + target: path.clone(), + query_params: query_params.clone(), + raw_header: forward_request_bytes, + body_length, + }; + let body = match crate::l7::http::read_body_for_inspection( + client, + &mut jsonrpc_request, + l7_config.config.json_rpc_max_body_bytes, + ) + .await + { + Ok(body) => body, + Err(e) => { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .dst_endpoint(Endpoint::from_domain(&host_lc, port)) + .message(format!("FORWARD_JSONRPC_L7 request rejected: {e}")) + .build(); + ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); + respond( + client, + &build_json_error_response( + 400, + "Bad Request", + "invalid_jsonrpc_request", + &format!("JSON-RPC request rejected before policy evaluation: {e}"), + ), + ) + .await?; + return Ok(()); + } + }; + forward_request_bytes = jsonrpc_request.raw_header; + Some(crate::l7::jsonrpc::parse_jsonrpc_body(&body)) + } else { + None + }; let request_info = crate::l7::L7RequestInfo { action: method.to_string(), target: path.clone(), query_params, graphql, + jsonrpc, }; let parse_error_reason = request_info @@ -4090,6 +4142,7 @@ mod tests { tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite, request_body_credential_rewrite: false, @@ -4689,6 +4742,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, @@ -4702,6 +4756,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2e2210f44..ebae4809a 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -8049,6 +8049,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; @@ -8444,6 +8446,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }]; @@ -8590,6 +8594,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; diff --git a/docs/reference/policy-schema.mdx b/docs/reference/policy-schema.mdx index 59f72c9f7..fa540e4dd 100644 --- a/docs/reference/policy-schema.mdx +++ b/docs/reference/policy-schema.mdx @@ -155,7 +155,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `host` | string | Yes | Hostname or IP address. Supports a `*` wildcard inside the first DNS label only: `*.example.com`, `**.example.com`, and intra-label patterns like `*-aiplatform.googleapis.com` are accepted; bare `*`/`**`, TLD wildcards (`*.com`), and wildcards outside the first label are rejected at load time. | | `port` | integer | Yes | TCP port number. | | `path` | string | No | Optional HTTP path glob used to select between L7 endpoints that share the same host and port. Empty means all paths. Use this when REST and GraphQL live under the same host, such as `/repos/**` and `/graphql`. | -| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, or `graphql` for GraphQL-over-HTTP operation inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | +| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, `graphql` for GraphQL-over-HTTP operation inspection, or `json-rpc` for sandbox-to-server JSON-RPC-over-HTTP method and params inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | | `tls` | string | No | TLS handling mode. The proxy auto-detects TLS by peeking the first bytes of each connection and terminates it for inspected HTTPS traffic, so this field is optional in most cases. Set to `skip` to disable auto-detection for edge cases such as client-certificate mTLS or non-standard protocols. The values `terminate` and `passthrough` are deprecated and log a warning; they are still accepted for backward compatibility but have no effect on behavior. | | `enforcement` | string | No | `enforce` actively blocks disallowed requests. `audit` logs violations but allows traffic through. | | `access` | string | No | Access preset. One of `read-only`, `read-write`, or `full`. Mutually exclusive with `rules`. | @@ -168,6 +168,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `persisted_queries` | string | No | GraphQL hash-only behavior for `protocol: graphql` and GraphQL-over-WebSocket operation policy. Default is `deny`; use `allow_registered` only with `graphql_persisted_queries`. | | `graphql_persisted_queries` | map | No | Trusted GraphQL persisted-query registry keyed by hash or saved-query ID. Values contain `operation_type`, optional `operation_name`, and optional root `fields`. | | `graphql_max_body_bytes` | integer | No | Maximum GraphQL-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | +| `json_rpc` | object | No | JSON-RPC endpoint options. For `protocol: json-rpc`, `json_rpc.max_body_bytes` sets the maximum JSON-RPC-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placeholder form and whole-token provider-shaped aliases such as `provider-OPENSHELL-RESOLVE-ENV-API_TOKEN` when the referenced environment key exists in the configured provider credentials. @@ -175,11 +176,13 @@ Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placehol The `access` field accepts one of the following values: -| Value | REST expansion | WebSocket expansion | GraphQL expansion | -|---|---|---|---| -| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | -| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | -| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | +| Value | REST expansion | WebSocket expansion | GraphQL expansion | JSON-RPC behavior | +|---|---|---|---|---| +| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | Allows matching HTTP requests without constraining JSON-RPC methods. | +| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | Expands to HTTP read methods and does not allow typical JSON-RPC `POST` calls. | +| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | Allows matching HTTP write requests without constraining JSON-RPC methods. | + +For JSON-RPC endpoints, prefer explicit `rules` with `rpc_method` and optional `params` when you need method-level control. #### Allow Rule Objects @@ -274,6 +277,42 @@ rules: Do not combine `method`, `path`, or `query` with `operation_type`, `operation_name`, or `fields` inside the same WebSocket rule. When a WebSocket endpoint has GraphQL operation policy, use GraphQL rules for client messages instead of a raw `WEBSOCKET_TEXT` allow rule. +##### JSON-RPC Allow Rule (`protocol: json-rpc`) + +JSON-RPC allow rules match sandbox-to-server JSON-RPC-over-HTTP request objects by RPC method and optional params. They apply to single JSON-RPC requests and batch requests. For a batch, OpenShell evaluates each call independently. JSON-RPC responses and server-to-client messages on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob, such as `initialize`, `tools/list`, or `tools/*`. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Use dot-separated keys for nested object params, such as `arguments.scope`. Matcher value can be a glob string or an object with `any`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. | + +Example JSON-RPC allow rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main +``` + #### Deny Rule Objects Blocks specific operations on endpoints that otherwise have broad access. Deny rules are evaluated after allow rules and take precedence: if a request matches any deny rule, it is blocked regardless of what the allow rules or access preset permit. @@ -356,6 +395,33 @@ endpoints: operation_name: Admin* ``` +##### JSON-RPC Deny Rule (`protocol: json-rpc`) + +JSON-RPC deny rules use the same field names as JSON-RPC allow rules, but they appear directly under each `deny_rules` entry instead of under an `allow` wrapper. Deny rules take precedence over allow rules. In a batch request, one denied call denies the full batch. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob to deny. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Omit to deny every call matching `rpc_method`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. | + +Example JSON-RPC deny rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/* + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource +``` + ### Binary Object Identifies an executable that is permitted to use the associated endpoints. diff --git a/docs/sandboxes/policies.mdx b/docs/sandboxes/policies.mdx index 406ed12b8..bae6fa279 100644 --- a/docs/sandboxes/policies.mdx +++ b/docs/sandboxes/policies.mdx @@ -148,7 +148,7 @@ The following steps outline the hot-reload policy update workflow. To inspect a stored sandbox-authored revision instead of the current effective policy, pass `--rev `. -5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, or `rules`. +5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, `rules`, or protocol-specific matchers such as GraphQL operation fields and JSON-RPC `rpc_method` / `params` rules. 6. Push the updated policy when you need a full replacement. Exit codes: 0 = loaded, 1 = validation failed, 124 = timeout. @@ -173,7 +173,7 @@ Use `openshell policy update` when you want to merge network policy changes into - remove one endpoint or one named rule without rewriting the rest of the file. - preview a merged result locally with `--dry-run` before you send it to the gateway. -Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. +Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. Use full YAML for GraphQL and JSON-RPC rule shapes. ### Update Commands @@ -210,6 +210,7 @@ This is the practical difference: Current constraints: - `--add-allow` and `--add-deny` work on `protocol: rest` and `protocol: websocket` endpoints. +- GraphQL and JSON-RPC fine-grained rules require full policy YAML applied with `openshell policy set`. - `--add-deny` requires the endpoint to already have an allow base, either an `access` preset or explicit allow `rules`. - `protocol: sql` is not a practical incremental workflow today. OpenShell does not do full SQL parsing, and SQL enforcement is not meaningfully supported yet. @@ -228,7 +229,7 @@ Each segment has a fixed meaning: | `host` | Yes | Destination hostname. | | `port` | Yes | Destination port, `1` through `65535`. | | `access` | No | Access preset for L7 endpoints: `read-only`, `read-write`, or `full`. Incremental updates expand presets into protocol-specific method/path rules for REST and WebSocket endpoints. | -| `protocol` | No | L7 inspection mode: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. | +| `protocol` | No | L7 inspection mode accepted by `openshell policy update`: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. Full policy YAML also supports `graphql` and `json-rpc`. | | `enforcement` | No | Enforcement mode for inspected traffic: `enforce` or `audit`. | | `options` | No | Comma-separated endpoint options. Use `websocket-credential-rewrite` with `protocol: websocket` or REST compatibility endpoints that perform a WebSocket upgrade. Use `request-body-credential-rewrite` only with `protocol: rest`. | @@ -548,7 +549,7 @@ For an end-to-end walkthrough that combines this policy with a GitHub credential - { path: /usr/bin/gh } ``` -Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. +Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. Endpoints with `protocol: json-rpc` parse JSON-RPC-over-HTTP request bodies and evaluate `rpc_method` and optional params rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. @@ -579,6 +580,51 @@ REST rules can also constrain query parameter values: `query` matchers are case-sensitive and run on decoded values. If a request has duplicate keys (for example, `tag=a&tag=b`), every value for that key must match the configured glob(s). +### JSON-RPC matching + +JSON-RPC endpoints use `protocol: json-rpc`. The proxy parses sandbox-to-server JSON-RPC-over-HTTP request bodies, evaluates the `method` field against `rpc_method`, and can match object params through dot-separated `params` keys. + +JSON-RPC policy enforcement is directional. It applies to HTTP request bodies sent by the sandboxed process to the configured endpoint. JSON-RPC responses and server-to-client messages carried on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +JSON-RPC endpoint policies currently require full policy YAML applied with `openshell policy set`; the incremental `openshell policy update --add-endpoint` parser does not accept `json-rpc` as a protocol. + +```yaml showLineNumbers={false} + mcp_server: + name: mcp_server + endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource + binaries: + - { path: /usr/bin/python3 } +``` + +`json_rpc.max_body_bytes` controls how many JSON-RPC-over-HTTP request body bytes OpenShell buffers for inspection. It defaults to `65536`. + +`params` matchers are case-sensitive and use the same string glob or `{ any: [...] }` matcher syntax as REST query parameters. They match scalar leaf values from object params: strings, numbers, and booleans are converted to strings, and nested JSON object params are flattened with dot-separated keys before matching. Arrays, `null`, and non-object top-level params do not produce matcher keys. This is useful for controls such as matching MCP `tools/call` by `params.name`, but it is not a complete MCP payload policy for rich nested content. For batch requests, OpenShell evaluates each JSON-RPC call independently and denies the whole batch if any call is denied. + ### GraphQL matching GraphQL endpoints use `protocol: graphql`. The proxy parses GraphQL-over-HTTP `GET` and `POST` requests, classifies each operation, and evaluates rules against the operation type, optional operation name, and selected root fields. diff --git a/e2e/rust/Cargo.toml b/e2e/rust/Cargo.toml index 083c622df..2f61f2d86 100644 --- a/e2e/rust/Cargo.toml +++ b/e2e/rust/Cargo.toml @@ -97,6 +97,11 @@ name = "forward_proxy_graphql_l7" path = "tests/forward_proxy_graphql_l7.rs" required-features = ["e2e-host-gateway"] +[[test]] +name = "forward_proxy_jsonrpc_l7" +path = "tests/forward_proxy_jsonrpc_l7.rs" +required-features = ["e2e-host-gateway"] + [[test]] name = "gpu_device_selection" path = "tests/gpu_device_selection.rs" diff --git a/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs new file mode 100644 index 000000000..feba98ceb --- /dev/null +++ b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs @@ -0,0 +1,373 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! E2E tests for JSON-RPC L7 inspection across both proxy entry points. +//! +//! The upstream server deliberately does not implement JSON-RPC. `OpenShell` +//! parses and enforces JSON-RPC before forwarding, so any HTTP server that +//! accepts POST /mcp is enough to prove allowed requests reach upstream +//! and denied requests are stopped by the sandbox proxy. + +#![cfg(feature = "e2e")] + +use std::io::Write; + +use openshell_e2e::harness::container::ContainerHttpServer; +use openshell_e2e::harness::sandbox::SandboxGuard; +use tempfile::NamedTempFile; + +const TEST_SERVER_ALIAS: &str = "jsonrpc-l7.openshell.test"; + +async fn start_test_server() -> Result { + let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer + +class Handler(BaseHTTPRequestHandler): + def read_body(self): + if self.headers.get("Transfer-Encoding", "").lower() == "chunked": + data = b"" + while True: + size_line = self.rfile.readline() + if not size_line: + break + size = int(size_line.split(b";", 1)[0].strip(), 16) + if size == 0: + while self.rfile.readline().strip(): + pass + break + data += self.rfile.read(size) + self.rfile.read(2) + return data + return self.rfile.read(int(self.headers.get("Content-Length", "0"))) + + def do_GET(self): + self.send_response(200) + self.end_headers() + + def do_POST(self): + self.read_body() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"jsonrpc":"2.0","id":1,"result":{}}') + + def log_message(self, format, *args): + pass + +HTTPServer(("0.0.0.0", 8000), Handler).serve_forever() +"#; + + ContainerHttpServer::start_python(TEST_SERVER_ALIAS, script).await +} + +fn write_jsonrpc_policy(host: &str, port: u16) -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + let policy = format!( + r#"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: + test_jsonrpc_l7: + name: test_jsonrpc_l7 + endpoints: + - host: {host} + port: {port} + path: /mcp + protocol: json-rpc + enforcement: enforce + allowed_ips: + - "10.0.0.0/8" + - "172.0.0.0/8" + - "192.168.0.0/16" + - "fc00::/7" + json_rpc: + max_body_bytes: 65536 + on_parse_error: deny + batch_policy: deny_if_any_denied + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - path: /usr/bin/python* + - path: /usr/local/bin/python* + - path: /sandbox/.uv/python/*/bin/python* +"# + ); + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +#[tokio::test] +#[allow(clippy::too_many_lines)] +async fn jsonrpc_l7_enforces_method_and_params_rules_on_forward_and_connect_paths() { + let server = start_test_server().await.expect("start test server"); + let policy = write_jsonrpc_policy(&server.host, server.port).expect("write custom policy"); + let policy_path = policy + .path() + .to_str() + .expect("temp policy path should be utf-8") + .to_string(); + + let script = format!( + r#" +import json +import os +import socket +import time +import urllib.error +import urllib.parse +import urllib.request + +HOST = {host:?} +PORT = {port} +DETAILS = {{}} + +def post_jsonrpc(method, params=None, req_id=1): + body = {{"jsonrpc": "2.0", "id": req_id, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_jsonrpc_batch(requests): + encoded = json.dumps(requests).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_invalid_json(): + encoded = b"not valid json {{" + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json", "Content-Length": str(len(encoded))}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def proxy_parts(*names): + proxy_url = next((os.environ.get(name) for name in names if os.environ.get(name)), None) + parsed = urllib.parse.urlparse(proxy_url) + return parsed.hostname, parsed.port or 80 + +def read_until(sock, marker): + data = b"" + while marker not in data: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + return data + +def read_response(sock): + response = read_until(sock, b"\r\n\r\n") + headers, _, body = response.partition(b"\r\n\r\n") + content_length = 0 + for line in headers.split(b"\r\n")[1:]: + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":", 1)[1].strip()) + break + while len(body) < content_length: + chunk = sock.recv(4096) + if not chunk: + break + body += chunk + return response, body + +def status_code(response, label): + parts = response.split() + if len(parts) < 2: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: malformed HTTP response: {{response!r}}") + try: + return int(parts[1]) + except ValueError as error: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: non-numeric HTTP status: {{response!r}}") from error + +def connect_http_status(label, request): + proxy_host, proxy_port = proxy_parts("HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy") + target = f"{{HOST}}:{{PORT}}" + + last_error = None + for attempt in range(5): + try: + with socket.create_connection((proxy_host, proxy_port), timeout=15) as sock: + sock.sendall( + f"CONNECT {{target}} HTTP/1.1\r\nHost: {{target}}\r\n\r\n".encode() + ) + connect_response = read_until(sock, b"\r\n\r\n") + connect_code = status_code(connect_response, f"{{label}}_connect") + if connect_code != 200: + return connect_code + sock.sendall(request) + sock.shutdown(socket.SHUT_WR) + response = read_until(sock, b"\r\n\r\n") + return status_code(response, f"{{label}}_response") + except (OSError, RuntimeError) as error: + last_error = error + DETAILS[f"{{label}}_attempt_{{attempt + 1}}_error"] = str(error) + time.sleep(0.2) + + raise RuntimeError(f"{{label}}: failed after 5 attempts: {{last_error}}") + +def connect_jsonrpc_status(method, params, label): + target = f"{{HOST}}:{{PORT}}" + body = {{"jsonrpc": "2.0", "id": 1, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = ( + f"POST /mcp HTTP/1.1\r\n" + f"Host: {{target}}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {{len(encoded)}}\r\n" + f"Connection: close\r\n" + f"\r\n" + ).encode() + encoded + return connect_http_status(label, request) + +results = {{ + # forward proxy — method-only allow rules + "forward_method_initialize_allowed": post_jsonrpc("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}), + "forward_method_tools_list_allowed": post_jsonrpc("tools/list"), + + # forward proxy — params allow rules + "forward_tools_call_params_name_no_args_allowed": post_jsonrpc("tools/call", {{"name": "read_status"}}), + "forward_tools_call_params_nested_args_allowed": post_jsonrpc("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main", "title": "test"}}}}), + + # forward proxy — params denied + "forward_tools_call_params_name_no_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action"}}), + "forward_tools_call_params_name_with_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}), + + # forward proxy — batch: all requests allowed + "forward_batch_all_allowed": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "read_status"}}}}, + ]), + + # forward proxy — batch: one denied request causes full batch denial + "forward_batch_one_denied": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "blocked_action"}}}}, + ]), + + # forward proxy — invalid JSON body denied by on_parse_error: deny + "forward_invalid_json_denied": post_invalid_json(), + + # CONNECT path — representative allowed and denied cases + "connect_method_initialize_allowed": connect_jsonrpc_status("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}, "connect_method_initialize_allowed"), + "connect_method_tools_list_allowed": connect_jsonrpc_status("tools/list", None, "connect_method_tools_list_allowed"), + "connect_tools_call_params_name_no_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "read_status"}}, "connect_tools_call_params_name_no_args_allowed"), + "connect_tools_call_params_nested_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main"}}}}, "connect_tools_call_params_nested_args_allowed"), + "connect_tools_call_params_name_no_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action"}}, "connect_tools_call_params_name_no_args_denied"), + "connect_tools_call_params_name_with_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}, "connect_tools_call_params_name_with_args_denied"), +}} +results.update(DETAILS) +print(json.dumps(results, sort_keys=True)) +"#, + host = server.host, + port = server.port, + ); + + let guard = SandboxGuard::create(&["--policy", &policy_path, "--", "python3", "-c", &script]) + .await + .expect("sandbox create"); + + for (key, expected) in [ + // forward proxy — allowed + ("forward_method_initialize_allowed", 200), + ("forward_method_tools_list_allowed", 200), + ("forward_tools_call_params_name_no_args_allowed", 200), + ("forward_tools_call_params_nested_args_allowed", 200), + // forward proxy — params denied + ("forward_tools_call_params_name_no_args_denied", 403), + ("forward_tools_call_params_name_with_args_denied", 403), + // forward proxy — batch + ("forward_batch_all_allowed", 200), + ("forward_batch_one_denied", 403), + // forward proxy — parse error + ("forward_invalid_json_denied", 403), + // CONNECT path — allowed + ("connect_method_initialize_allowed", 200), + ("connect_method_tools_list_allowed", 200), + ("connect_tools_call_params_name_no_args_allowed", 200), + ("connect_tools_call_params_nested_args_allowed", 200), + // CONNECT path — params denied + ("connect_tools_call_params_name_no_args_denied", 403), + ("connect_tools_call_params_name_with_args_denied", 403), + ] { + let expected_fragment = format!(r#""{key}": {expected}"#); + assert!( + guard.create_output.contains(&expected_fragment), + "expected {key}={expected}, got:\n{}", + guard.create_output + ); + } +} diff --git a/proto/sandbox.proto b/proto/sandbox.proto index ef0b0540f..afe1d3301 100644 --- a/proto/sandbox.proto +++ b/proto/sandbox.proto @@ -128,6 +128,9 @@ message NetworkEndpoint { // Advisor-proposed endpoints must not satisfy exact-host SSRF trust unless // they are converted through an explicit user-authored policy path. bool advisor_proposed = 18; + // Maximum JSON-RPC-over-HTTP request body bytes to buffer for inspection. + // Defaults to 65536 when unset. + uint32 json_rpc_max_body_bytes = 19; } // Trusted GraphQL operation classification. @@ -160,6 +163,11 @@ message L7DenyRule { // GraphQL root field globs. Deny rules match when any selected root field // matches any configured glob. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // An L7 policy rule (allow-only). @@ -186,6 +194,11 @@ message L7Allow { // GraphQL root field globs. Allow rules match only when every selected root // field matches one of the configured globs. Omit to match all fields. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // Query value matcher for one query parameter key.