From ed12716975634fb8b4fb42213fa76692b743cabd Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 08:56:17 -0700 Subject: [PATCH 1/9] test(e2e): add JSON-RPC L7 proxy coverage Add a Rust e2e test that drives MCP-style JSON-RPC requests through both the forward proxy and CONNECT tunnel paths. Cover method rules, params rules, batch handling, and invalid JSON denial expectations so the JSON-RPC implementation can be built against one failing scenario. Signed-off-by: Kris Hicks --- e2e/rust/Cargo.toml | 5 + e2e/rust/tests/forward_proxy_jsonrpc_l7.rs | 373 +++++++++++++++++++++ 2 files changed, 378 insertions(+) create mode 100644 e2e/rust/tests/forward_proxy_jsonrpc_l7.rs diff --git a/e2e/rust/Cargo.toml b/e2e/rust/Cargo.toml index 083c622df..2f61f2d86 100644 --- a/e2e/rust/Cargo.toml +++ b/e2e/rust/Cargo.toml @@ -97,6 +97,11 @@ name = "forward_proxy_graphql_l7" path = "tests/forward_proxy_graphql_l7.rs" required-features = ["e2e-host-gateway"] +[[test]] +name = "forward_proxy_jsonrpc_l7" +path = "tests/forward_proxy_jsonrpc_l7.rs" +required-features = ["e2e-host-gateway"] + [[test]] name = "gpu_device_selection" path = "tests/gpu_device_selection.rs" diff --git a/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs new file mode 100644 index 000000000..feba98ceb --- /dev/null +++ b/e2e/rust/tests/forward_proxy_jsonrpc_l7.rs @@ -0,0 +1,373 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! E2E tests for JSON-RPC L7 inspection across both proxy entry points. +//! +//! The upstream server deliberately does not implement JSON-RPC. `OpenShell` +//! parses and enforces JSON-RPC before forwarding, so any HTTP server that +//! accepts POST /mcp is enough to prove allowed requests reach upstream +//! and denied requests are stopped by the sandbox proxy. + +#![cfg(feature = "e2e")] + +use std::io::Write; + +use openshell_e2e::harness::container::ContainerHttpServer; +use openshell_e2e::harness::sandbox::SandboxGuard; +use tempfile::NamedTempFile; + +const TEST_SERVER_ALIAS: &str = "jsonrpc-l7.openshell.test"; + +async fn start_test_server() -> Result { + let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer + +class Handler(BaseHTTPRequestHandler): + def read_body(self): + if self.headers.get("Transfer-Encoding", "").lower() == "chunked": + data = b"" + while True: + size_line = self.rfile.readline() + if not size_line: + break + size = int(size_line.split(b";", 1)[0].strip(), 16) + if size == 0: + while self.rfile.readline().strip(): + pass + break + data += self.rfile.read(size) + self.rfile.read(2) + return data + return self.rfile.read(int(self.headers.get("Content-Length", "0"))) + + def do_GET(self): + self.send_response(200) + self.end_headers() + + def do_POST(self): + self.read_body() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"jsonrpc":"2.0","id":1,"result":{}}') + + def log_message(self, format, *args): + pass + +HTTPServer(("0.0.0.0", 8000), Handler).serve_forever() +"#; + + ContainerHttpServer::start_python(TEST_SERVER_ALIAS, script).await +} + +fn write_jsonrpc_policy(host: &str, port: u16) -> Result { + let mut file = NamedTempFile::new().map_err(|e| format!("create temp policy file: {e}"))?; + let policy = format!( + r#"version: 1 + +filesystem_policy: + include_workdir: true + read_only: + - /usr + - /lib + - /proc + - /dev/urandom + - /app + - /etc + - /var/log + read_write: + - /sandbox + - /tmp + - /dev/null + +landlock: + compatibility: best_effort + +process: + run_as_user: sandbox + run_as_group: sandbox + +network_policies: + test_jsonrpc_l7: + name: test_jsonrpc_l7 + endpoints: + - host: {host} + port: {port} + path: /mcp + protocol: json-rpc + enforcement: enforce + allowed_ips: + - "10.0.0.0/8" + - "172.0.0.0/8" + - "192.168.0.0/16" + - "fc00::/7" + json_rpc: + max_body_bytes: 65536 + on_parse_error: deny + batch_policy: deny_if_any_denied + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - path: /usr/bin/python* + - path: /usr/local/bin/python* + - path: /sandbox/.uv/python/*/bin/python* +"# + ); + file.write_all(policy.as_bytes()) + .map_err(|e| format!("write temp policy file: {e}"))?; + file.flush() + .map_err(|e| format!("flush temp policy file: {e}"))?; + Ok(file) +} + +#[tokio::test] +#[allow(clippy::too_many_lines)] +async fn jsonrpc_l7_enforces_method_and_params_rules_on_forward_and_connect_paths() { + let server = start_test_server().await.expect("start test server"); + let policy = write_jsonrpc_policy(&server.host, server.port).expect("write custom policy"); + let policy_path = policy + .path() + .to_str() + .expect("temp policy path should be utf-8") + .to_string(); + + let script = format!( + r#" +import json +import os +import socket +import time +import urllib.error +import urllib.parse +import urllib.request + +HOST = {host:?} +PORT = {port} +DETAILS = {{}} + +def post_jsonrpc(method, params=None, req_id=1): + body = {{"jsonrpc": "2.0", "id": req_id, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_jsonrpc_batch(requests): + encoded = json.dumps(requests).encode() + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json"}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def post_invalid_json(): + encoded = b"not valid json {{" + request = urllib.request.Request( + f"http://{{HOST}}:{{PORT}}/mcp", + data=encoded, + headers={{"Content-Type": "application/json", "Content-Length": str(len(encoded))}}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + response.read() + return response.status + except urllib.error.HTTPError as error: + error.read() + return error.code + +def proxy_parts(*names): + proxy_url = next((os.environ.get(name) for name in names if os.environ.get(name)), None) + parsed = urllib.parse.urlparse(proxy_url) + return parsed.hostname, parsed.port or 80 + +def read_until(sock, marker): + data = b"" + while marker not in data: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + return data + +def read_response(sock): + response = read_until(sock, b"\r\n\r\n") + headers, _, body = response.partition(b"\r\n\r\n") + content_length = 0 + for line in headers.split(b"\r\n")[1:]: + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":", 1)[1].strip()) + break + while len(body) < content_length: + chunk = sock.recv(4096) + if not chunk: + break + body += chunk + return response, body + +def status_code(response, label): + parts = response.split() + if len(parts) < 2: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: malformed HTTP response: {{response!r}}") + try: + return int(parts[1]) + except ValueError as error: + DETAILS[f"{{label}}_raw"] = response.decode(errors="replace") + raise RuntimeError(f"{{label}}: non-numeric HTTP status: {{response!r}}") from error + +def connect_http_status(label, request): + proxy_host, proxy_port = proxy_parts("HTTP_PROXY", "http_proxy", "HTTPS_PROXY", "https_proxy") + target = f"{{HOST}}:{{PORT}}" + + last_error = None + for attempt in range(5): + try: + with socket.create_connection((proxy_host, proxy_port), timeout=15) as sock: + sock.sendall( + f"CONNECT {{target}} HTTP/1.1\r\nHost: {{target}}\r\n\r\n".encode() + ) + connect_response = read_until(sock, b"\r\n\r\n") + connect_code = status_code(connect_response, f"{{label}}_connect") + if connect_code != 200: + return connect_code + sock.sendall(request) + sock.shutdown(socket.SHUT_WR) + response = read_until(sock, b"\r\n\r\n") + return status_code(response, f"{{label}}_response") + except (OSError, RuntimeError) as error: + last_error = error + DETAILS[f"{{label}}_attempt_{{attempt + 1}}_error"] = str(error) + time.sleep(0.2) + + raise RuntimeError(f"{{label}}: failed after 5 attempts: {{last_error}}") + +def connect_jsonrpc_status(method, params, label): + target = f"{{HOST}}:{{PORT}}" + body = {{"jsonrpc": "2.0", "id": 1, "method": method}} + if params is not None: + body["params"] = params + encoded = json.dumps(body).encode() + request = ( + f"POST /mcp HTTP/1.1\r\n" + f"Host: {{target}}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {{len(encoded)}}\r\n" + f"Connection: close\r\n" + f"\r\n" + ).encode() + encoded + return connect_http_status(label, request) + +results = {{ + # forward proxy — method-only allow rules + "forward_method_initialize_allowed": post_jsonrpc("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}), + "forward_method_tools_list_allowed": post_jsonrpc("tools/list"), + + # forward proxy — params allow rules + "forward_tools_call_params_name_no_args_allowed": post_jsonrpc("tools/call", {{"name": "read_status"}}), + "forward_tools_call_params_nested_args_allowed": post_jsonrpc("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main", "title": "test"}}}}), + + # forward proxy — params denied + "forward_tools_call_params_name_no_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action"}}), + "forward_tools_call_params_name_with_args_denied": post_jsonrpc("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}), + + # forward proxy — batch: all requests allowed + "forward_batch_all_allowed": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "read_status"}}}}, + ]), + + # forward proxy — batch: one denied request causes full batch denial + "forward_batch_one_denied": post_jsonrpc_batch([ + {{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}}, + {{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {{"name": "blocked_action"}}}}, + ]), + + # forward proxy — invalid JSON body denied by on_parse_error: deny + "forward_invalid_json_denied": post_invalid_json(), + + # CONNECT path — representative allowed and denied cases + "connect_method_initialize_allowed": connect_jsonrpc_status("initialize", {{"protocolVersion": "2025-11-25", "capabilities": {{}}}}, "connect_method_initialize_allowed"), + "connect_method_tools_list_allowed": connect_jsonrpc_status("tools/list", None, "connect_method_tools_list_allowed"), + "connect_tools_call_params_name_no_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "read_status"}}, "connect_tools_call_params_name_no_args_allowed"), + "connect_tools_call_params_nested_args_allowed": connect_jsonrpc_status("tools/call", {{"name": "submit_report", "arguments": {{"scope": "workspace/main"}}}}, "connect_tools_call_params_nested_args_allowed"), + "connect_tools_call_params_name_no_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action"}}, "connect_tools_call_params_name_no_args_denied"), + "connect_tools_call_params_name_with_args_denied": connect_jsonrpc_status("tools/call", {{"name": "blocked_action", "arguments": {{"reason": "test"}}}}, "connect_tools_call_params_name_with_args_denied"), +}} +results.update(DETAILS) +print(json.dumps(results, sort_keys=True)) +"#, + host = server.host, + port = server.port, + ); + + let guard = SandboxGuard::create(&["--policy", &policy_path, "--", "python3", "-c", &script]) + .await + .expect("sandbox create"); + + for (key, expected) in [ + // forward proxy — allowed + ("forward_method_initialize_allowed", 200), + ("forward_method_tools_list_allowed", 200), + ("forward_tools_call_params_name_no_args_allowed", 200), + ("forward_tools_call_params_nested_args_allowed", 200), + // forward proxy — params denied + ("forward_tools_call_params_name_no_args_denied", 403), + ("forward_tools_call_params_name_with_args_denied", 403), + // forward proxy — batch + ("forward_batch_all_allowed", 200), + ("forward_batch_one_denied", 403), + // forward proxy — parse error + ("forward_invalid_json_denied", 403), + // CONNECT path — allowed + ("connect_method_initialize_allowed", 200), + ("connect_method_tools_list_allowed", 200), + ("connect_tools_call_params_name_no_args_allowed", 200), + ("connect_tools_call_params_nested_args_allowed", 200), + // CONNECT path — params denied + ("connect_tools_call_params_name_no_args_denied", 403), + ("connect_tools_call_params_name_with_args_denied", 403), + ] { + let expected_fragment = format!(r#""{key}": {expected}"#); + assert!( + guard.create_output.contains(&expected_fragment), + "expected {key}={expected}, got:\n{}", + guard.create_output + ); + } +} From 4705a6780c97feb5ce62ea7bb566b26042f67715 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 09:03:21 -0700 Subject: [PATCH 2/9] feat(policy): recognize JSON-RPC L7 endpoints Add json-rpc as a policy protocol and carry JSON-RPC rule fields through policy parsing and validation. Wire the protocol into the L7 dispatcher with a passthrough placeholder so later commits can add enforcement without changing endpoint recognition. Signed-off-by: Kris Hicks --- crates/openshell-policy/src/lib.rs | 26 ++++++++++++++++++++++++ crates/openshell-sandbox/src/l7/mod.rs | 4 +++- crates/openshell-sandbox/src/l7/relay.rs | 21 ++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/crates/openshell-policy/src/lib.rs b/crates/openshell-policy/src/lib.rs index 26c8fc9d3..a91223ede 100644 --- a/crates/openshell-policy/src/lib.rs +++ b/crates/openshell-policy/src/lib.rs @@ -135,6 +135,8 @@ struct NetworkEndpointDef { graphql_persisted_queries: BTreeMap, #[serde(default, skip_serializing_if = "is_zero_u32")] graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + json_rpc: Option, } // Signature dictated by serde's `skip_serializing_if`, which requires `&T`. @@ -149,6 +151,17 @@ fn is_zero_u32(v: &u32) -> bool { *v == 0 } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct JsonRpcConfigDef { + #[serde(default, skip_serializing_if = "is_zero_u32")] + max_body_bytes: u32, + #[serde(default, skip_serializing_if = "String::is_empty")] + on_parse_error: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + batch_policy: String, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct GraphqlOperationDef { @@ -183,6 +196,10 @@ struct L7AllowDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -216,6 +233,10 @@ struct L7DenyRuleDef { operation_name: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] fields: Vec, + #[serde(default, skip_serializing_if = "String::is_empty")] + rpc_method: String, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + params: BTreeMap, } #[derive(Debug, Serialize, Deserialize)] @@ -462,6 +483,8 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { (key, yaml_matcher) }) .collect(), + rpc_method: String::new(), + params: BTreeMap::new(), }, } }) @@ -491,6 +514,8 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { (key.clone(), yaml_matcher) }) .collect(), + rpc_method: String::new(), + params: BTreeMap::new(), }) .collect(), allow_encoded_slash: e.allow_encoded_slash, @@ -512,6 +537,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc: None, } }) .collect(), diff --git a/crates/openshell-sandbox/src/l7/mod.rs b/crates/openshell-sandbox/src/l7/mod.rs index 365bb379a..ce6747c6d 100644 --- a/crates/openshell-sandbox/src/l7/mod.rs +++ b/crates/openshell-sandbox/src/l7/mod.rs @@ -25,6 +25,7 @@ pub enum L7Protocol { Websocket, Graphql, Sql, + JsonRpc, } impl L7Protocol { @@ -34,6 +35,7 @@ impl L7Protocol { "websocket" => Some(Self::Websocket), "graphql" => Some(Self::Graphql), "sql" => Some(Self::Sql), + "json-rpc" => Some(Self::JsonRpc), _ => None, } } @@ -598,7 +600,7 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< if !protocol.is_empty() && L7Protocol::parse(protocol).is_none() { errors.push(format!( - "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, or sql)" + "{loc}: unknown protocol '{protocol}' (expected rest, websocket, graphql, sql, or json-rpc)" )); } diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 40b002535..6baa2ab03 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -178,6 +178,25 @@ where .into_diagnostic()?; Ok(()) } + L7Protocol::JsonRpc => { + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + // JSON-RPC provider not yet implemented — fall through to passthrough + { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Other) + .severity(SeverityId::Low) + .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) + .message("JSON-RPC L7 provider not yet implemented, falling back to passthrough") + .build(); + ocsf_emit!(event); + } + tokio::io::copy_bidirectional(client, upstream) + .await + .into_diagnostic()?; + Ok(()) + } } } @@ -341,7 +360,7 @@ where let engine_type = match config.protocol { L7Protocol::Graphql => "l7-graphql", L7Protocol::Websocket => "l7-websocket", - L7Protocol::Rest | L7Protocol::Sql => "l7", + L7Protocol::Rest | L7Protocol::Sql | L7Protocol::JsonRpc => "l7", }; emit_l7_request_log( ctx, From 5db7efa35a2c1f3fc2e18b5062f6319de439699b Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 16:15:06 -0700 Subject: [PATCH 3/9] refactor(l7): share HTTP body inspection helper Move HTTP request body buffering and chunked-body normalization out of the GraphQL module so other HTTP-carried L7 protocols can inspect request bodies without depending on GraphQL internals. Signed-off-by: Kris Hicks --- crates/openshell-sandbox/src/l7/graphql.rs | 198 +------------------- crates/openshell-sandbox/src/l7/http.rs | 199 +++++++++++++++++++++ crates/openshell-sandbox/src/l7/mod.rs | 1 + 3 files changed, 205 insertions(+), 193 deletions(-) create mode 100644 crates/openshell-sandbox/src/l7/http.rs diff --git a/crates/openshell-sandbox/src/l7/graphql.rs b/crates/openshell-sandbox/src/l7/graphql.rs index 82c35720e..f548f2a10 100644 --- a/crates/openshell-sandbox/src/l7/graphql.rs +++ b/crates/openshell-sandbox/src/l7/graphql.rs @@ -3,14 +3,14 @@ //! GraphQL-over-HTTP L7 inspection. -use crate::l7::provider::{BodyLength, L7Provider, L7Request}; +use crate::l7::provider::{L7Provider, L7Request}; use apollo_parser::Parser; use apollo_parser::cst; -use miette::{IntoDiagnostic, Result, miette}; +use miette::{Result, miette}; use serde::Serialize; use serde_json::Value; use std::collections::{HashMap, HashSet}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024; @@ -61,7 +61,7 @@ pub(crate) async fn inspect_graphql_request( ) -> Result { let header_str = header_str(request)?; reject_unsupported_headers(header_str)?; - let body = read_body_for_inspection(client, request, max_body_bytes).await?; + let body = crate::l7::http::read_body_for_inspection(client, request, max_body_bytes).await?; Ok(classify_request(request, &body)) } @@ -355,195 +355,6 @@ fn unique_persisted_query_id( Ok(selected.map(|(_, value)| value)) } -async fn read_body_for_inspection( - client: &mut C, - request: &mut L7Request, - max_body_bytes: usize, -) -> Result> { - let header_end = request - .raw_header - .windows(4) - .position(|w| w == b"\r\n\r\n") - .map_or(request.raw_header.len(), |p| p + 4); - let overflow = request.raw_header[header_end..].to_vec(); - - match request.body_length { - BodyLength::None => Ok(Vec::new()), - BodyLength::ContentLength(len) => { - let len = usize::try_from(len) - .map_err(|_| miette!("GraphQL request body length exceeds platform limit"))?; - if len > max_body_bytes { - return Err(miette!( - "GraphQL request body exceeds {max_body_bytes} byte inspection limit" - )); - } - if overflow.len() > len { - return Err(miette!( - "GraphQL request contains more body bytes than Content-Length" - )); - } - let remaining = len - overflow.len(); - let mut body = overflow; - if remaining > 0 { - let start = body.len(); - body.resize(len, 0); - client - .read_exact(&mut body[start..]) - .await - .into_diagnostic()?; - } - request.raw_header.truncate(header_end); - request.raw_header.extend_from_slice(&body); - Ok(body) - } - BodyLength::Chunked => { - let body = read_chunked_body_for_inspection( - client, - request, - header_end, - overflow, - max_body_bytes, - ) - .await?; - normalize_chunked_request_to_content_length(request, header_end, &body)?; - Ok(body) - } - } -} - -fn normalize_chunked_request_to_content_length( - request: &mut L7Request, - header_end: usize, - body: &[u8], -) -> Result<()> { - let header_str = std::str::from_utf8(&request.raw_header[..header_end]) - .map_err(|_| miette!("GraphQL HTTP headers contain invalid UTF-8"))?; - let header_str = header_str - .strip_suffix("\r\n\r\n") - .ok_or_else(|| miette!("GraphQL HTTP headers missing terminator"))?; - - let mut normalized = Vec::with_capacity(header_str.len() + body.len() + 32); - for (idx, line) in header_str.split("\r\n").enumerate() { - if idx > 0 { - let name = line - .split_once(':') - .map(|(name, _)| name.trim().to_ascii_lowercase()); - if matches!( - name.as_deref(), - Some("transfer-encoding" | "content-length" | "trailer") - ) { - continue; - } - } - normalized.extend_from_slice(line.as_bytes()); - normalized.extend_from_slice(b"\r\n"); - } - normalized.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()); - normalized.extend_from_slice(body); - - request.raw_header = normalized; - request.body_length = BodyLength::ContentLength(body.len() as u64); - Ok(()) -} - -async fn read_chunked_body_for_inspection( - client: &mut C, - request: &mut L7Request, - header_end: usize, - overflow: Vec, - max_body_bytes: usize, -) -> Result> { - let mut raw = overflow; - let mut decoded = Vec::new(); - let mut pos = 0usize; - - loop { - let size_line_end = loop { - if let Some(end) = find_crlf(&raw, pos) { - break end; - } - read_more(client, &mut raw, max_body_bytes).await?; - }; - let size_line = std::str::from_utf8(&raw[pos..size_line_end]) - .into_diagnostic() - .map_err(|_| miette!("Invalid UTF-8 in GraphQL chunk-size line"))?; - let size_token = size_line - .split(';') - .next() - .map(str::trim) - .unwrap_or_default(); - let chunk_size = usize::from_str_radix(size_token, 16) - .into_diagnostic() - .map_err(|_| miette!("Invalid GraphQL chunk size token: {size_token:?}"))?; - pos = size_line_end + 2; - - if decoded.len().saturating_add(chunk_size) > max_body_bytes { - return Err(miette!( - "GraphQL request body exceeds {max_body_bytes} byte inspection limit" - )); - } - - if chunk_size == 0 { - loop { - let trailer_end = loop { - if let Some(end) = find_crlf(&raw, pos) { - break end; - } - read_more(client, &mut raw, max_body_bytes).await?; - }; - let trailer_line = &raw[pos..trailer_end]; - pos = trailer_end + 2; - if trailer_line.is_empty() { - request.raw_header.truncate(header_end); - request.raw_header.extend_from_slice(&raw[..pos]); - return Ok(decoded); - } - } - } - - let chunk_end = pos - .checked_add(chunk_size) - .ok_or_else(|| miette!("GraphQL chunk size overflow"))?; - let chunk_with_crlf_end = chunk_end - .checked_add(2) - .ok_or_else(|| miette!("GraphQL chunk size overflow"))?; - while raw.len() < chunk_with_crlf_end { - read_more(client, &mut raw, max_body_bytes).await?; - } - decoded.extend_from_slice(&raw[pos..chunk_end]); - if raw.get(chunk_end..chunk_with_crlf_end) != Some(&b"\r\n"[..]) { - return Err(miette!("GraphQL chunk payload missing terminating CRLF")); - } - pos = chunk_with_crlf_end; - } -} - -async fn read_more( - client: &mut C, - raw: &mut Vec, - max_body_bytes: usize, -) -> Result<()> { - if raw.len() > max_body_bytes.saturating_mul(2).max(max_body_bytes) { - return Err(miette!( - "GraphQL chunked request body exceeds inspection framing limit" - )); - } - let mut buf = [0u8; 8192]; - let n = client.read(&mut buf).await.into_diagnostic()?; - if n == 0 { - return Err(miette!("GraphQL chunked body ended before terminator")); - } - raw.extend_from_slice(&buf[..n]); - Ok(()) -} - -fn find_crlf(buf: &[u8], start: usize) -> Option { - buf.get(start..)? - .windows(2) - .position(|w| w == b"\r\n") - .map(|p| start + p) -} - fn header_str(request: &L7Request) -> Result<&str> { let header_end = request .raw_header @@ -578,6 +389,7 @@ fn reject_unsupported_headers(headers: &str) -> Result<()> { #[cfg(test)] mod tests { use super::*; + use crate::l7::provider::BodyLength; fn request(method: &str, target: &str) -> L7Request { L7Request { diff --git a/crates/openshell-sandbox/src/l7/http.rs b/crates/openshell-sandbox/src/l7/http.rs new file mode 100644 index 000000000..66269f6ba --- /dev/null +++ b/crates/openshell-sandbox/src/l7/http.rs @@ -0,0 +1,199 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared HTTP/1.1 request helpers for L7 protocols carried over HTTP. + +use crate::l7::provider::{BodyLength, L7Request}; +use miette::{IntoDiagnostic, Result, miette}; +use tokio::io::{AsyncRead, AsyncReadExt}; + +const READ_BUF_SIZE: usize = 8192; + +pub async fn read_body_for_inspection( + client: &mut C, + request: &mut L7Request, + max_body_bytes: usize, +) -> Result> { + let header_end = request + .raw_header + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(request.raw_header.len(), |p| p + 4); + let overflow = request.raw_header[header_end..].to_vec(); + + match request.body_length { + BodyLength::None => Ok(Vec::new()), + BodyLength::ContentLength(len) => { + let len = usize::try_from(len) + .map_err(|_| miette!("HTTP request body length exceeds platform limit"))?; + if len > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + if overflow.len() > len { + return Err(miette!( + "HTTP request contains more body bytes than Content-Length" + )); + } + let remaining = len - overflow.len(); + let mut body = overflow; + if remaining > 0 { + let start = body.len(); + body.resize(len, 0); + client + .read_exact(&mut body[start..]) + .await + .into_diagnostic()?; + } + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&body); + Ok(body) + } + BodyLength::Chunked => { + let body = read_chunked_body_for_inspection( + client, + request, + header_end, + overflow, + max_body_bytes, + ) + .await?; + normalize_chunked_request_to_content_length(request, header_end, &body)?; + Ok(body) + } + } +} + +fn normalize_chunked_request_to_content_length( + request: &mut L7Request, + header_end: usize, + body: &[u8], +) -> Result<()> { + let header_str = std::str::from_utf8(&request.raw_header[..header_end]) + .map_err(|_| miette!("HTTP headers contain invalid UTF-8"))?; + let header_str = header_str + .strip_suffix("\r\n\r\n") + .ok_or_else(|| miette!("HTTP headers missing terminator"))?; + + let mut normalized = Vec::with_capacity(header_str.len() + body.len() + 32); + for (idx, line) in header_str.split("\r\n").enumerate() { + if idx > 0 { + let name = line + .split_once(':') + .map(|(name, _)| name.trim().to_ascii_lowercase()); + if matches!( + name.as_deref(), + Some("transfer-encoding" | "content-length" | "trailer") + ) { + continue; + } + } + normalized.extend_from_slice(line.as_bytes()); + normalized.extend_from_slice(b"\r\n"); + } + normalized.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()); + normalized.extend_from_slice(body); + + request.raw_header = normalized; + request.body_length = BodyLength::ContentLength(body.len() as u64); + Ok(()) +} + +async fn read_chunked_body_for_inspection( + client: &mut C, + request: &mut L7Request, + header_end: usize, + overflow: Vec, + max_body_bytes: usize, +) -> Result> { + let mut raw = overflow; + let mut decoded = Vec::new(); + let mut pos = 0usize; + + loop { + let size_line_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let size_line = std::str::from_utf8(&raw[pos..size_line_end]) + .into_diagnostic() + .map_err(|_| miette!("Invalid UTF-8 in HTTP chunk-size line"))?; + let size_token = size_line + .split(';') + .next() + .map(str::trim) + .unwrap_or_default(); + let chunk_size = usize::from_str_radix(size_token, 16) + .into_diagnostic() + .map_err(|_| miette!("Invalid HTTP chunk size token: {size_token:?}"))?; + pos = size_line_end + 2; + + if decoded.len().saturating_add(chunk_size) > max_body_bytes { + return Err(miette!( + "HTTP request body exceeds {max_body_bytes} byte inspection limit" + )); + } + + if chunk_size == 0 { + loop { + let trailer_end = loop { + if let Some(end) = find_crlf(&raw, pos) { + break end; + } + read_more(client, &mut raw, max_body_bytes).await?; + }; + let trailer_line = &raw[pos..trailer_end]; + pos = trailer_end + 2; + if trailer_line.is_empty() { + request.raw_header.truncate(header_end); + request.raw_header.extend_from_slice(&raw[..pos]); + return Ok(decoded); + } + } + } + + let chunk_end = pos + .checked_add(chunk_size) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + let chunk_with_crlf_end = chunk_end + .checked_add(2) + .ok_or_else(|| miette!("HTTP chunk size overflow"))?; + while raw.len() < chunk_with_crlf_end { + read_more(client, &mut raw, max_body_bytes).await?; + } + decoded.extend_from_slice(&raw[pos..chunk_end]); + if raw.get(chunk_end..chunk_with_crlf_end) != Some(&b"\r\n"[..]) { + return Err(miette!("HTTP chunk payload missing terminating CRLF")); + } + pos = chunk_with_crlf_end; + } +} + +async fn read_more( + client: &mut C, + raw: &mut Vec, + max_body_bytes: usize, +) -> Result<()> { + if raw.len() > max_body_bytes.saturating_mul(2).max(max_body_bytes) { + return Err(miette!( + "HTTP chunked request body exceeds inspection framing limit" + )); + } + let mut buf = [0u8; READ_BUF_SIZE]; + let n = client.read(&mut buf).await.into_diagnostic()?; + if n == 0 { + return Err(miette!("HTTP chunked body ended before terminator")); + } + raw.extend_from_slice(&buf[..n]); + Ok(()) +} + +fn find_crlf(buf: &[u8], start: usize) -> Option { + buf.get(start..)? + .windows(2) + .position(|w| w == b"\r\n") + .map(|p| start + p) +} diff --git a/crates/openshell-sandbox/src/l7/mod.rs b/crates/openshell-sandbox/src/l7/mod.rs index ce6747c6d..17d2e4c5e 100644 --- a/crates/openshell-sandbox/src/l7/mod.rs +++ b/crates/openshell-sandbox/src/l7/mod.rs @@ -9,6 +9,7 @@ //! evaluated against OPA policy, and either forwarded or denied. pub mod graphql; +pub(crate) mod http; pub mod inference; pub mod path; pub mod provider; From bc7a4a3d5650680cb6e24f21ff80d899320931d5 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 09:46:04 -0700 Subject: [PATCH 4/9] feat(l7): enforce JSON-RPC method rules Add the JSON-RPC HTTP parser and relay path, extract request methods, and pass JSON-RPC metadata into L7 policy evaluation. Wire rpc_method through proto and policy conversion, add Rego matching for JSON-RPC methods, and inspect forward-proxy JSON-RPC bodies before relaying upstream. Signed-off-by: Kris Hicks --- crates/openshell-cli/src/policy_update.rs | 2 + crates/openshell-policy/src/lib.rs | 6 +- crates/openshell-policy/src/merge.rs | 2 + crates/openshell-providers/src/profiles.rs | 2 + .../data/sandbox-policy.rego | 12 + crates/openshell-sandbox/src/l7/graphql.rs | 1 + crates/openshell-sandbox/src/l7/jsonrpc.rs | 99 +++++++ crates/openshell-sandbox/src/l7/mod.rs | 3 + crates/openshell-sandbox/src/l7/relay.rs | 280 ++++++++++++++++-- crates/openshell-sandbox/src/l7/websocket.rs | 3 + .../src/mechanistic_mapper.rs | 1 + crates/openshell-sandbox/src/opa.rs | 80 +++++ crates/openshell-sandbox/src/policy_local.rs | 2 + crates/openshell-sandbox/src/proxy.rs | 52 ++++ proto/sandbox.proto | 4 + 15 files changed, 528 insertions(+), 21 deletions(-) create mode 100644 crates/openshell-sandbox/src/l7/jsonrpc.rs diff --git a/crates/openshell-cli/src/policy_update.rs b/crates/openshell-cli/src/policy_update.rs index 57656b878..03695d48e 100644 --- a/crates/openshell-cli/src/policy_update.rs +++ b/crates/openshell-cli/src/policy_update.rs @@ -205,6 +205,7 @@ fn group_allow_rules(specs: &[String]) -> Result Result SandboxPolicy { operation_type: r.allow.operation_type, operation_name: r.allow.operation_name, fields: r.allow.fields, + rpc_method: r.allow.rpc_method, query: r .allow .query @@ -328,6 +329,7 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { operation_type: d.operation_type, operation_name: d.operation_name, fields: d.fields, + rpc_method: d.rpc_method, query: d .query .into_iter() @@ -469,6 +471,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: a.operation_type, operation_name: a.operation_name, fields: a.fields, + rpc_method: a.rpc_method, query: a .query .into_iter() @@ -483,7 +486,6 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { (key, yaml_matcher) }) .collect(), - rpc_method: String::new(), params: BTreeMap::new(), }, } @@ -500,6 +502,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { operation_type: d.operation_type.clone(), operation_name: d.operation_name.clone(), fields: d.fields.clone(), + rpc_method: d.rpc_method.clone(), query: d .query .iter() @@ -514,7 +517,6 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { (key.clone(), yaml_matcher) }) .collect(), - rpc_method: String::new(), params: BTreeMap::new(), }) .collect(), diff --git a/crates/openshell-policy/src/merge.rs b/crates/openshell-policy/src/merge.rs index f191cd272..73c40316e 100644 --- a/crates/openshell-policy/src/merge.rs +++ b/crates/openshell-policy/src/merge.rs @@ -747,6 +747,7 @@ fn expand_access_preset(protocol: &str, access: &str) -> Option> { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }), }) .collect(), @@ -961,6 +962,7 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }), } } diff --git a/crates/openshell-providers/src/profiles.rs b/crates/openshell-providers/src/profiles.rs index d2a35ca80..a6d282256 100644 --- a/crates/openshell-providers/src/profiles.rs +++ b/crates/openshell-providers/src/profiles.rs @@ -816,6 +816,7 @@ fn allow_to_proto(allow: &L7AllowProfile) -> L7Allow { operation_type: allow.operation_type.clone(), operation_name: allow.operation_name.clone(), fields: allow.fields.clone(), + rpc_method: String::new(), } } @@ -848,6 +849,7 @@ fn deny_rule_to_proto(rule: &L7DenyRuleProfile) -> L7DenyRule { operation_type: rule.operation_type.clone(), operation_name: rule.operation_name.clone(), fields: rule.fields.clone(), + rpc_method: String::new(), } } diff --git a/crates/openshell-sandbox/data/sandbox-policy.rego b/crates/openshell-sandbox/data/sandbox-policy.rego index afcd28863..c25b42af0 100644 --- a/crates/openshell-sandbox/data/sandbox-policy.rego +++ b/crates/openshell-sandbox/data/sandbox-policy.rego @@ -417,6 +417,18 @@ request_allowed_for_endpoint(request, endpoint) if { command_matches(request.command, rule.allow.command) } +# --- L7 rule matching: JSON-RPC method --- + +request_allowed_for_endpoint(request, endpoint) if { + some rule + rule := endpoint.rules[_] + rule.allow.rpc_method + jsonrpc := object.get(request, "jsonrpc", {}) + method := object.get(jsonrpc, "method", null) + method != null + glob.match(rule.allow.rpc_method, [], method) +} + # --- L7 rule matching: GraphQL operation --- request_allowed_for_endpoint(request, endpoint) if { diff --git a/crates/openshell-sandbox/src/l7/graphql.rs b/crates/openshell-sandbox/src/l7/graphql.rs index f548f2a10..77ec3b6fd 100644 --- a/crates/openshell-sandbox/src/l7/graphql.rs +++ b/crates/openshell-sandbox/src/l7/graphql.rs @@ -622,6 +622,7 @@ network_policies: target: req.target, query_params: req.query_params, graphql: Some(info), + jsonrpc: None, }; let tunnel_engine = engine diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs new file mode 100644 index 000000000..977c8046f --- /dev/null +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -0,0 +1,99 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! JSON-RPC 2.0 over HTTP L7 inspection. + +use miette::Result; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::l7::provider::{L7Provider, L7Request}; + +pub struct JsonRpcHttpRequest { + pub request: L7Request, + pub info: JsonRpcRequestInfo, +} + +pub(crate) async fn parse_jsonrpc_http_request( + client: &mut C, + max_body_bytes: usize, + canonicalize_options: crate::l7::path::CanonicalizeOptions, +) -> Result> { + let provider = crate::l7::rest::RestProvider::with_options(canonicalize_options); + let Some(mut request) = provider.parse_request(client).await? else { + return Ok(None); + }; + let body = + crate::l7::http::read_body_for_inspection(client, &mut request, max_body_bytes).await?; + let info = parse_jsonrpc_body(&body); + Ok(Some(JsonRpcHttpRequest { request, info })) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcRequestInfo { + pub method: Option, + pub error: Option, +} + +/// Returns true if the parsed request's method matches the given `rpc_method` rule pattern. +/// +/// An empty `rpc_method` pattern matches any method. +pub fn rpc_method_rule_matches(info: &JsonRpcRequestInfo, rpc_method: &str) -> bool { + if rpc_method.is_empty() { + return true; + } + info.method.as_deref() == Some(rpc_method) +} + +/// Parse a JSON-RPC 2.0 request body and extract the `method` field. +/// +/// Returns an info struct with `method` set on success, or `error` set if the +/// body is not valid JSON-RPC 2.0. +pub fn parse_jsonrpc_body(body: &[u8]) -> JsonRpcRequestInfo { + let Ok(value) = serde_json::from_slice::(body) else { + return JsonRpcRequestInfo { + method: None, + error: Some("invalid JSON".to_string()), + }; + }; + let Some(method) = value.get("method").and_then(|m| m.as_str()) else { + return JsonRpcRequestInfo { + method: None, + error: Some("missing or non-string 'method' field".to_string()), + }; + }; + JsonRpcRequestInfo { + method: Some(method.to_string()), + error: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_method_from_request_body() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#; + let info = parse_jsonrpc_body(body); + assert_eq!(info.method.as_deref(), Some("initialize")); + assert!(info.error.is_none()); + } + + #[test] + fn rpc_method_rule_empty_matches_any() { + let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"tools/call"}"#); + assert!(rpc_method_rule_matches(&info, "")); + } + + #[test] + fn rpc_method_rule_matches_exact_method() { + let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"initialize"}"#); + assert!(rpc_method_rule_matches(&info, "initialize")); + } + + #[test] + fn rpc_method_rule_does_not_match_different_method() { + let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"tools/call"}"#); + assert!(!rpc_method_rule_matches(&info, "initialize")); + } +} diff --git a/crates/openshell-sandbox/src/l7/mod.rs b/crates/openshell-sandbox/src/l7/mod.rs index 17d2e4c5e..d83a4dfa5 100644 --- a/crates/openshell-sandbox/src/l7/mod.rs +++ b/crates/openshell-sandbox/src/l7/mod.rs @@ -11,6 +11,7 @@ pub mod graphql; pub(crate) mod http; pub mod inference; +pub mod jsonrpc; pub mod path; pub mod provider; pub mod relay; @@ -113,6 +114,8 @@ pub struct L7RequestInfo { pub query_params: std::collections::HashMap>, /// Parsed GraphQL operation metadata for GraphQL endpoints. pub graphql: Option, + /// Parsed JSON-RPC request metadata for JSON-RPC endpoints. + pub jsonrpc: Option, } /// Parse an L7 endpoint config from a regorus Value (returned by Rego query). diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 6baa2ab03..ec723df50 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -178,25 +178,7 @@ where .into_diagnostic()?; Ok(()) } - L7Protocol::JsonRpc => { - if close_if_stale(engine.generation_guard(), ctx) { - return Ok(()); - } - // JSON-RPC provider not yet implemented — fall through to passthrough - { - let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) - .activity(ActivityId::Other) - .severity(SeverityId::Low) - .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) - .message("JSON-RPC L7 provider not yet implemented, falling back to passthrough") - .build(); - ocsf_emit!(event); - } - tokio::io::copy_bidirectional(client, upstream) - .await - .into_diagnostic()?; - Ok(()) - } + L7Protocol::JsonRpc => relay_jsonrpc(config, &engine, client, upstream, ctx).await, } } @@ -316,6 +298,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: graphql_info.clone(), + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -713,6 +696,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: None, + jsonrpc: None, }; let websocket_request = crate::l7::rest::request_is_websocket_upgrade(&req.raw_header); if config.protocol == L7Protocol::Websocket && !websocket_request { @@ -904,6 +888,162 @@ fn close_if_stale(guard: &PolicyGenerationGuard, ctx: &L7EvalContext) -> bool { true } +async fn relay_jsonrpc( + config: &L7EndpointConfig, + engine: &TunnelPolicyEngine, + client: &mut C, + upstream: &mut U, + ctx: &L7EvalContext, +) -> Result<()> +where + C: AsyncRead + AsyncWrite + Unpin + Send, + U: AsyncRead + AsyncWrite + Unpin + Send, +{ + loop { + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let parsed = match crate::l7::jsonrpc::parse_jsonrpc_http_request( + client, + 64 * 1024, + crate::l7::path::CanonicalizeOptions { + allow_encoded_slash: config.allow_encoded_slash, + ..Default::default() + }, + ) + .await + { + Ok(Some(parsed)) => parsed, + Ok(None) => return Ok(()), + Err(e) => { + if is_benign_connection_error(&e) { + debug!( + host = %ctx.host, + port = ctx.port, + error = %e, + "JSON-RPC L7 connection closed" + ); + } else { + let detail = + parse_rejection_detail(&e.to_string(), ParseRejectionMode::L7Endpoint); + emit_parse_rejection(ctx, &detail, "l7-jsonrpc"); + } + return Ok(()); + } + }; + + let req = parsed.request; + let jsonrpc_info = parsed.info; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let redacted_target = req.target.clone(); + + let request_info = L7RequestInfo { + action: req.action.clone(), + target: redacted_target.clone(), + query_params: req.query_params.clone(), + graphql: None, + jsonrpc: Some(jsonrpc_info.clone()), + }; + + let parse_error_reason = jsonrpc_info + .error + .as_deref() + .map(|e| format!("JSON-RPC request rejected: {e}")); + let force_deny = parse_error_reason.is_some(); + let (allowed, reason) = if let Some(reason) = parse_error_reason { + (false, reason) + } else { + evaluate_l7_request(engine, ctx, &request_info)? + }; + + if close_if_stale(engine.generation_guard(), ctx) { + return Ok(()); + } + + let decision_str = match (allowed, config.enforcement) { + (_, _) if force_deny => "deny", + (true, _) => "allow", + (false, EnforcementMode::Audit) => "audit", + (false, EnforcementMode::Enforce) => "deny", + }; + + { + let (action_id, disposition_id, severity) = match decision_str { + "deny" => (ActionId::Denied, DispositionId::Blocked, SeverityId::Medium), + _ => ( + ActionId::Allowed, + DispositionId::Allowed, + SeverityId::Informational, + ), + }; + let rpc_method = jsonrpc_info.method.as_deref().unwrap_or("-"); + let event = HttpActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Other) + .action(action_id) + .disposition(disposition_id) + .severity(severity) + .http_request(HttpRequest::new( + &request_info.action, + OcsfUrl::new("http", &ctx.host, &redacted_target, ctx.port), + )) + .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) + .firewall_rule(&ctx.policy_name, "l7-jsonrpc") + .message(format!( + "JSONRPC_L7_REQUEST {decision_str} {} {}:{}{} rpc_method={rpc_method} reason={}", + request_info.action, ctx.host, ctx.port, redacted_target, reason, + )) + .build(); + ocsf_emit!(event); + } + + if allowed || (config.enforcement == EnforcementMode::Audit && !force_deny) { + let outcome = crate::l7::rest::relay_http_request_with_resolver_guarded( + &req, + client, + upstream, + ctx.secret_resolver.as_deref(), + Some(engine.generation_guard()), + ) + .await?; + match outcome { + RelayOutcome::Reusable => {} + RelayOutcome::Consumed => { + debug!( + host = %ctx.host, + port = ctx.port, + "Upstream connection not reusable, closing JSON-RPC L7 relay" + ); + return Ok(()); + } + RelayOutcome::Upgraded { .. } => { + return Ok(()); + } + } + } else { + crate::l7::rest::RestProvider::default() + .deny_with_redacted_target( + &req, + &ctx.policy_name, + &reason, + client, + Some(&redacted_target), + Some(crate::l7::rest::DenyResponseContext { + host: Some(&ctx.host), + port: Some(ctx.port), + binary: Some(&ctx.binary_path), + }), + ) + .await?; + return Ok(()); + } + } +} + async fn relay_graphql( config: &L7EndpointConfig, engine: &TunnelPolicyEngine, @@ -981,6 +1121,7 @@ where target: redacted_target.clone(), query_params: req.query_params.clone(), graphql: Some(graphql_info.clone()), + jsonrpc: None, }; // Malformed or ambiguous GraphQL requests, such as duplicated GET @@ -1178,6 +1319,10 @@ pub fn evaluate_l7_request( "path": request.target, "query_params": request.query_params.clone(), "graphql": request.graphql.clone(), + "jsonrpc": request.jsonrpc.as_ref().map(|j| serde_json::json!({ + "method": j.method, + "error": j.error, + })), } }); @@ -1811,6 +1956,7 @@ network_policies: target: "/ws".into(), query_params: std::collections::HashMap::new(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); @@ -2426,4 +2572,100 @@ network_policies: "stale passthrough request must not be forwarded upstream" ); } + + #[tokio::test] + async fn jsonrpc_relay_denies_method_not_in_allow_list() { + let data = r" +network_policies: + mcp_api: + name: mcp_api + endpoints: + - host: mcp.example.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: initialize + binaries: + - { path: /usr/bin/python3 } +"; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let input = NetworkInput { + host: "mcp.example.test".into(), + port: 8000, + binary_path: PathBuf::from("/usr/bin/python3"), + binary_sha256: "unused".into(), + ancestors: vec![], + cmdline_paths: vec![], + }; + let (endpoint_config, generation) = engine + .query_endpoint_config_with_generation(&input) + .unwrap(); + let config = crate::l7::parse_l7_config(&endpoint_config.unwrap()).unwrap(); + let tunnel_engine = engine.clone_engine_for_tunnel(generation).unwrap(); + let ctx = L7EvalContext { + host: "mcp.example.test".into(), + port: 8000, + policy_name: "mcp_api".into(), + binary_path: "/usr/bin/python3".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + + let (mut app, mut relay_client) = tokio::io::duplex(8192); + let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192); + let relay = tokio::spawn(async move { + relay_with_inspection( + &config, + tunnel_engine, + &mut relay_client, + &mut relay_upstream, + &ctx, + ) + .await + }); + + let body = + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_repos"}}"#; + let request = format!( + "POST /mcp HTTP/1.1\r\nHost: mcp.example.test:8000\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + app.write_all(request.as_bytes()).await.unwrap(); + app.write_all(body).await.unwrap(); + + let mut response = [0u8; 512]; + let n = tokio::time::timeout(std::time::Duration::from_secs(2), app.read(&mut response)) + .await + .expect("relay should respond without reaching upstream") + .unwrap(); + let response = String::from_utf8_lossy(&response[..n]); + assert!( + response.contains("403"), + "tools/call not in allow list must be denied with 403, got: {response:?}" + ); + + let mut upstream_buf = [0u8; 128]; + let n = tokio::time::timeout( + std::time::Duration::from_millis(100), + upstream.read(&mut upstream_buf), + ) + .await + .unwrap_or(Ok(0)) + .unwrap_or(0); + assert_eq!(n, 0, "denied request must not be forwarded to upstream"); + + drop(app); + tokio::time::timeout(std::time::Duration::from_secs(1), relay) + .await + .expect("relay should complete") + .unwrap() + .unwrap(); + } } diff --git a/crates/openshell-sandbox/src/l7/websocket.rs b/crates/openshell-sandbox/src/l7/websocket.rs index 79c820e26..70c11f330 100644 --- a/crates/openshell-sandbox/src/l7/websocket.rs +++ b/crates/openshell-sandbox/src/l7/websocket.rs @@ -545,6 +545,7 @@ fn inspect_websocket_text_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; let (allowed, reason) = evaluate_l7_request(inspector.engine, inspector.ctx, &request_info)?; let decision = match (allowed, inspector.enforcement) { @@ -581,6 +582,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: None, + jsonrpc: None, }; emit_websocket_l7_event( host, @@ -602,6 +604,7 @@ fn inspect_graphql_websocket_message( target: inspector.target.clone(), query_params: inspector.query_params.clone(), graphql: Some(graphql.clone()), + jsonrpc: None, }; let parse_error_reason = graphql .error diff --git a/crates/openshell-sandbox/src/mechanistic_mapper.rs b/crates/openshell-sandbox/src/mechanistic_mapper.rs index ba7c51de9..273bdee3b 100644 --- a/crates/openshell-sandbox/src/mechanistic_mapper.rs +++ b/crates/openshell-sandbox/src/mechanistic_mapper.rs @@ -355,6 +355,7 @@ fn build_l7_rules(samples: &HashMap<(String, String), u32>) -> Vec { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }), }); } diff --git a/crates/openshell-sandbox/src/opa.rs b/crates/openshell-sandbox/src/opa.rs index f73f3bc14..aa49509e5 100644 --- a/crates/openshell-sandbox/src/opa.rs +++ b/crates/openshell-sandbox/src/opa.rs @@ -1021,6 +1021,7 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St "command": a.map_or("", |a| &a.command), "operation_type": a.map_or("", |a| &a.operation_type), "operation_name": a.map_or("", |a| &a.operation_name), + "rpc_method": a.map_or("", |a| &a.rpc_method), }); if let Some(a) = a && !a.fields.is_empty() @@ -1946,6 +1947,25 @@ process: }) } + fn l7_jsonrpc_input(host: &str, port: u16, path: &str, rpc_method: &str) -> serde_json::Value { + serde_json::json!({ + "network": { "host": host, "port": port }, + "exec": { + "path": "/usr/bin/curl", + "ancestors": [], + "cmdline_paths": [] + }, + "request": { + "method": "POST", + "path": path, + "query_params": {}, + "jsonrpc": { + "method": rpc_method + } + } + }) + } + fn l7_graphql_input(host: &str, operations: serde_json::Value) -> serde_json::Value { serde_json::json!({ "network": { "host": host, "port": 443 }, @@ -2492,6 +2512,7 @@ network_policies: operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }), }], ..Default::default() @@ -2540,6 +2561,65 @@ network_policies: assert!(!eval_l7(&engine, &deny_input)); } + #[test] + fn l7_jsonrpc_rpc_method_from_proto_is_enforced() { + let mut network_policies = std::collections::HashMap::new(); + network_policies.insert( + "jsonrpc_proto".to_string(), + NetworkPolicyRule { + name: "jsonrpc_proto".to_string(), + endpoints: vec![NetworkEndpoint { + host: "mcp.proto.com".to_string(), + port: 8000, + path: "/mcp".to_string(), + protocol: "json-rpc".to_string(), + enforcement: "enforce".to_string(), + rules: vec![L7Rule { + allow: Some(L7Allow { + method: String::new(), + path: String::new(), + command: String::new(), + query: std::collections::HashMap::new(), + operation_type: String::new(), + operation_name: String::new(), + fields: Vec::new(), + rpc_method: "initialize".to_string(), + }), + }], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }, + ); + + let proto = ProtoSandboxPolicy { + version: 1, + filesystem: Some(ProtoFs { + include_workdir: true, + read_only: vec![], + read_write: vec![], + }), + landlock: Some(openshell_core::proto::LandlockPolicy { + compatibility: "best_effort".to_string(), + }), + process: Some(ProtoProc { + run_as_user: "sandbox".to_string(), + run_as_group: "sandbox".to_string(), + }), + network_policies, + }; + + let engine = OpaEngine::from_proto(&proto).expect("engine from proto"); + let allow_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "initialize"); + assert!(eval_l7(&engine, &allow_input)); + + let deny_input = l7_jsonrpc_input("mcp.proto.com", 8000, "/mcp", "tools/list"); + assert!(!eval_l7(&engine, &deny_input)); + } + #[test] fn l7_no_request_on_l4_only_endpoint() { // L4-only endpoint should not match L7 allow_request diff --git a/crates/openshell-sandbox/src/policy_local.rs b/crates/openshell-sandbox/src/policy_local.rs index d65bce324..34eeaada5 100644 --- a/crates/openshell-sandbox/src/policy_local.rs +++ b/crates/openshell-sandbox/src/policy_local.rs @@ -1083,6 +1083,7 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }), }) .collect(); @@ -1097,6 +1098,7 @@ fn network_endpoint_from_json( operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + rpc_method: String::new(), }) .collect(); diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 39620657d..7782f69d7 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -3394,11 +3394,63 @@ async fn handle_forward_proxy( } else { None }; + let jsonrpc = if l7_config.config.protocol == crate::l7::L7Protocol::JsonRpc { + let header_end = forward_request_bytes + .windows(4) + .position(|w| w == b"\r\n\r\n") + .map_or(forward_request_bytes.len(), |p| p + 4); + let header_str = std::str::from_utf8(&forward_request_bytes[..header_end]) + .map_err(|_| miette::miette!("Forward JSON-RPC headers contain invalid UTF-8"))?; + let body_length = crate::l7::rest::parse_body_length(header_str)?; + let mut jsonrpc_request = crate::l7::provider::L7Request { + action: method.to_string(), + target: path.clone(), + query_params: query_params.clone(), + raw_header: forward_request_bytes, + body_length, + }; + let body = match crate::l7::http::read_body_for_inspection( + client, + &mut jsonrpc_request, + 64 * 1024, + ) + .await + { + Ok(body) => body, + Err(e) => { + let event = NetworkActivityBuilder::new(crate::ocsf_ctx()) + .activity(ActivityId::Fail) + .severity(SeverityId::Medium) + .status(StatusId::Failure) + .dst_endpoint(Endpoint::from_domain(&host_lc, port)) + .message(format!("FORWARD_JSONRPC_L7 request rejected: {e}")) + .build(); + ocsf_emit!(event); + emit_activity_simple(activity_tx, true, "l7_parse_rejection"); + respond( + client, + &build_json_error_response( + 400, + "Bad Request", + "invalid_jsonrpc_request", + &format!("JSON-RPC request rejected before policy evaluation: {e}"), + ), + ) + .await?; + return Ok(()); + } + }; + forward_request_bytes = jsonrpc_request.raw_header; + Some(crate::l7::jsonrpc::parse_jsonrpc_body(&body)) + } else { + None + }; let request_info = crate::l7::L7RequestInfo { action: method.to_string(), target: path.clone(), query_params, graphql, + jsonrpc, }; let parse_error_reason = request_info diff --git a/proto/sandbox.proto b/proto/sandbox.proto index ef0b0540f..cf0fc902b 100644 --- a/proto/sandbox.proto +++ b/proto/sandbox.proto @@ -160,6 +160,8 @@ message L7DenyRule { // GraphQL root field globs. Deny rules match when any selected root field // matches any configured glob. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; } // An L7 policy rule (allow-only). @@ -186,6 +188,8 @@ message L7Allow { // GraphQL root field globs. Allow rules match only when every selected root // field matches one of the configured globs. Omit to match all fields. repeated string fields = 7; + // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". + string rpc_method = 8; } // Query value matcher for one query parameter key. From e4ff85f22c1592edfd054ea6ae9830d1f71f7f15 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 17:05:38 -0700 Subject: [PATCH 5/9] fix(l7): honor JSON-RPC body size config Carry JSON-RPC max body bytes from policy into runtime endpoint config and use it on both CONNECT and forward JSON-RPC inspection paths instead of hardcoding 64 KiB. Signed-off-by: Kris Hicks --- crates/openshell-policy/src/lib.rs | 43 ++++++++++++- crates/openshell-providers/src/profiles.rs | 4 ++ crates/openshell-sandbox/src/l7/jsonrpc.rs | 2 + crates/openshell-sandbox/src/l7/mod.rs | 65 ++++++++++++++++++++ crates/openshell-sandbox/src/l7/relay.rs | 5 +- crates/openshell-sandbox/src/opa.rs | 3 + crates/openshell-sandbox/src/policy_local.rs | 1 + crates/openshell-sandbox/src/proxy.rs | 5 +- proto/sandbox.proto | 3 + 9 files changed, 128 insertions(+), 3 deletions(-) diff --git a/crates/openshell-policy/src/lib.rs b/crates/openshell-policy/src/lib.rs index d97017983..8de08fe3d 100644 --- a/crates/openshell-policy/src/lib.rs +++ b/crates/openshell-policy/src/lib.rs @@ -162,6 +162,14 @@ struct JsonRpcConfigDef { batch_policy: String, } +fn json_rpc_config_from_proto(max_body_bytes: u32) -> Option { + (max_body_bytes > 0).then_some(JsonRpcConfigDef { + max_body_bytes, + on_parse_error: String::new(), + batch_policy: String::new(), + }) +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct GraphqlOperationDef { @@ -370,6 +378,10 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, + json_rpc_max_body_bytes: e + .json_rpc + .as_ref() + .map_or(0, |config| config.max_body_bytes), } }) .collect(), @@ -539,7 +551,7 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { }) .collect(), graphql_max_body_bytes: e.graphql_max_body_bytes, - json_rpc: None, + json_rpc: json_rpc_config_from_proto(e.json_rpc_max_body_bytes), } }) .collect(), @@ -1743,6 +1755,35 @@ network_policies: assert_eq!(ep.deny_rules[0].fields, vec!["deleteRepository"]); } + #[test] + fn round_trip_preserves_json_rpc_max_body_bytes() { + let yaml = r" +version: 1 +network_policies: + mcp: + name: mcp + endpoints: + - host: mcp.example.com + port: 443 + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + binaries: + - path: /usr/bin/curl +"; + let proto1 = parse_sandbox_policy(yaml).expect("parse failed"); + let yaml_out = serialize_sandbox_policy(&proto1).expect("serialize failed"); + let proto2 = parse_sandbox_policy(&yaml_out).expect("re-parse failed"); + + let ep = &proto2.network_policies["mcp"].endpoints[0]; + assert_eq!(ep.protocol, "json-rpc"); + assert_eq!(ep.json_rpc_max_body_bytes, 131_072); + } + #[test] fn round_trip_preserves_websocket_credential_rewrite() { let yaml = r" diff --git a/crates/openshell-providers/src/profiles.rs b/crates/openshell-providers/src/profiles.rs index a6d282256..86e3928f0 100644 --- a/crates/openshell-providers/src/profiles.rs +++ b/crates/openshell-providers/src/profiles.rs @@ -200,6 +200,8 @@ pub struct EndpointProfile { pub graphql_persisted_queries: HashMap, #[serde(default, skip_serializing_if = "is_zero")] pub graphql_max_body_bytes: u32, + #[serde(default, skip_serializing_if = "is_zero")] + pub json_rpc_max_body_bytes: u32, #[serde(default, skip_serializing_if = "String::is_empty")] pub path: String, } @@ -743,6 +745,7 @@ fn endpoint_to_proto(endpoint: &EndpointProfile) -> NetworkEndpoint { .map(|(name, operation)| (name.clone(), graphql_operation_to_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } @@ -773,6 +776,7 @@ fn endpoint_from_proto(endpoint: &NetworkEndpoint) -> EndpointProfile { .map(|(name, operation)| (name.clone(), graphql_operation_from_proto(operation))) .collect(), graphql_max_body_bytes: endpoint.graphql_max_body_bytes, + json_rpc_max_body_bytes: endpoint.json_rpc_max_body_bytes, path: endpoint.path.clone(), } } diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs index 977c8046f..2dc83c12d 100644 --- a/crates/openshell-sandbox/src/l7/jsonrpc.rs +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -8,6 +8,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::l7::provider::{L7Provider, L7Request}; +pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024; + pub struct JsonRpcHttpRequest { pub request: L7Request, pub info: JsonRpcRequestInfo, diff --git a/crates/openshell-sandbox/src/l7/mod.rs b/crates/openshell-sandbox/src/l7/mod.rs index d83a4dfa5..14fee09da 100644 --- a/crates/openshell-sandbox/src/l7/mod.rs +++ b/crates/openshell-sandbox/src/l7/mod.rs @@ -80,6 +80,8 @@ pub struct L7EndpointConfig { pub enforcement: EnforcementMode, /// Maximum GraphQL request body bytes to buffer for inspection. pub graphql_max_body_bytes: usize, + /// Maximum JSON-RPC request body bytes to buffer for inspection. + pub json_rpc_max_body_bytes: usize, /// When true, percent-encoded `/` (`%2F`) is preserved in path segments /// rather than rejected at the parser. Needed by upstreams like GitLab /// that embed `%2F` in namespaced project paths. Defaults to false. @@ -171,6 +173,10 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { .and_then(|v| usize::try_from(v).ok()) .filter(|v| *v > 0) .unwrap_or(graphql::DEFAULT_MAX_BODY_BYTES); + let json_rpc_max_body_bytes = get_object_u64(val, "json_rpc_max_body_bytes") + .and_then(|v| usize::try_from(v).ok()) + .filter(|v| *v > 0) + .unwrap_or(jsonrpc::DEFAULT_MAX_BODY_BYTES); Some(L7EndpointConfig { protocol, @@ -178,6 +184,7 @@ pub fn parse_l7_config(val: ®orus::Value) -> Option { tls, enforcement, graphql_max_body_bytes, + json_rpc_max_body_bytes, allow_encoded_slash, websocket_credential_rewrite, request_body_credential_rewrite, @@ -630,6 +637,18 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< } } + if ep.get("json_rpc_max_body_bytes").is_some() { + let valid_max = ep + .get("json_rpc_max_body_bytes") + .and_then(serde_json::Value::as_u64) + .is_some_and(|v| v > 0); + if !valid_max { + errors.push(format!( + "{loc}: json_rpc_max_body_bytes must be a positive integer" + )); + } + } + if protocol != "graphql" && protocol != "websocket" && (ep.get("persisted_queries").is_some() @@ -641,6 +660,12 @@ pub fn validate_l7_policies(data_json: &serde_json::Value) -> (Vec, Vec< )); } + if protocol != "json-rpc" && ep.get("json_rpc_max_body_bytes").is_some() { + warnings.push(format!( + "{loc}: JSON-RPC-specific endpoint fields are ignored unless protocol is json-rpc" + )); + } + if ep .get("websocket_credential_rewrite") .and_then(serde_json::Value::as_bool) @@ -1201,6 +1226,46 @@ mod tests { assert_eq!(config.protocol, L7Protocol::Rest); assert_eq!(config.tls, TlsMode::Auto); assert_eq!(config.enforcement, EnforcementMode::Audit); + assert_eq!( + config.json_rpc_max_body_bytes, + jsonrpc::DEFAULT_MAX_BODY_BYTES + ); + } + + #[test] + fn parse_l7_config_jsonrpc_max_body_bytes() { + let val = regorus::Value::from_json_str( + r#"{"protocol": "json-rpc", "host": "mcp.example.com", "port": 443, "json_rpc_max_body_bytes": 131072}"#, + ) + .unwrap(); + let config = parse_l7_config(&val).unwrap(); + assert_eq!(config.protocol, L7Protocol::JsonRpc); + assert_eq!(config.json_rpc_max_body_bytes, 131_072); + } + + #[test] + fn validate_jsonrpc_max_body_bytes_must_be_positive() { + let data = serde_json::json!({ + "network_policies": { + "test": { + "endpoints": [{ + "host": "mcp.example.com", + "port": 443, + "protocol": "json-rpc", + "access": "full", + "json_rpc_max_body_bytes": 0 + }], + "binaries": [] + } + } + }); + let (errors, _warnings) = validate_l7_policies(&data); + assert!( + errors + .iter() + .any(|e| e.contains("json_rpc_max_body_bytes must be a positive integer")), + "should reject non-positive JSON-RPC max body size, got errors: {errors:?}" + ); } #[test] diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index ec723df50..0df2fe9b8 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -906,7 +906,7 @@ where let parsed = match crate::l7::jsonrpc::parse_jsonrpc_http_request( client, - 64 * 1024, + config.json_rpc_max_body_bytes, crate::l7::path::CanonicalizeOptions { allow_encoded_slash: config.allow_encoded_slash, ..Default::default() @@ -1993,6 +1993,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2096,6 +2097,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, @@ -2216,6 +2218,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: EnforcementMode::Enforce, graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: true, request_body_credential_rewrite: false, diff --git a/crates/openshell-sandbox/src/opa.rs b/crates/openshell-sandbox/src/opa.rs index aa49509e5..de6ff5257 100644 --- a/crates/openshell-sandbox/src/opa.rs +++ b/crates/openshell-sandbox/src/opa.rs @@ -1140,6 +1140,9 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if e.graphql_max_body_bytes > 0 { ep["graphql_max_body_bytes"] = e.graphql_max_body_bytes.into(); } + if e.json_rpc_max_body_bytes > 0 { + ep["json_rpc_max_body_bytes"] = e.json_rpc_max_body_bytes.into(); + } ep }) .collect(); diff --git a/crates/openshell-sandbox/src/policy_local.rs b/crates/openshell-sandbox/src/policy_local.rs index 34eeaada5..20536ac2b 100644 --- a/crates/openshell-sandbox/src/policy_local.rs +++ b/crates/openshell-sandbox/src/policy_local.rs @@ -1122,6 +1122,7 @@ fn network_endpoint_from_json( persisted_queries: String::new(), graphql_persisted_queries: HashMap::new(), graphql_max_body_bytes: 0, + json_rpc_max_body_bytes: 0, path: String::new(), }) } diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index 7782f69d7..a45f1bc60 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -3412,7 +3412,7 @@ async fn handle_forward_proxy( let body = match crate::l7::http::read_body_for_inspection( client, &mut jsonrpc_request, - 64 * 1024, + l7_config.config.json_rpc_max_body_bytes, ) .await { @@ -4142,6 +4142,7 @@ mod tests { tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite, request_body_credential_rewrite: false, @@ -4741,6 +4742,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, @@ -4754,6 +4756,7 @@ network_policies: tls: crate::l7::TlsMode::Auto, enforcement: crate::l7::EnforcementMode::Enforce, graphql_max_body_bytes: crate::l7::graphql::DEFAULT_MAX_BODY_BYTES, + json_rpc_max_body_bytes: crate::l7::jsonrpc::DEFAULT_MAX_BODY_BYTES, allow_encoded_slash: false, websocket_credential_rewrite: false, request_body_credential_rewrite: false, diff --git a/proto/sandbox.proto b/proto/sandbox.proto index cf0fc902b..9d6ec2824 100644 --- a/proto/sandbox.proto +++ b/proto/sandbox.proto @@ -128,6 +128,9 @@ message NetworkEndpoint { // Advisor-proposed endpoints must not satisfy exact-host SSRF trust unless // they are converted through an explicit user-authored policy path. bool advisor_proposed = 18; + // Maximum JSON-RPC-over-HTTP request body bytes to buffer for inspection. + // Defaults to 65536 when unset. + uint32 json_rpc_max_body_bytes = 19; } // Trusted GraphQL operation classification. From a257133517ecec2dca23cc86d73a90f3535b9e65 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 11:42:53 -0700 Subject: [PATCH 6/9] feat(l7): match JSON-RPC params in rules Add JSON-RPC params matcher maps to proto and YAML policy conversion, including shared matcher conversion helpers. Flatten object params into dot-separated keys for policy input and extend Rego allow and deny matching to filter JSON-RPC calls by params. Signed-off-by: Kris Hicks --- crates/openshell-cli/src/policy_update.rs | 2 + crates/openshell-policy/src/lib.rs | 89 +++++----- crates/openshell-policy/src/merge.rs | 2 + crates/openshell-providers/src/profiles.rs | 2 + .../data/sandbox-policy.rego | 43 ++++- crates/openshell-sandbox/src/l7/jsonrpc.rs | 52 ++++++ crates/openshell-sandbox/src/l7/relay.rs | 1 + .../src/mechanistic_mapper.rs | 1 + crates/openshell-sandbox/src/opa.rs | 158 ++++++++++++++---- crates/openshell-sandbox/src/policy_local.rs | 2 + crates/openshell-server/src/grpc/policy.rs | 6 + proto/sandbox.proto | 6 + 12 files changed, 285 insertions(+), 79 deletions(-) diff --git a/crates/openshell-cli/src/policy_update.rs b/crates/openshell-cli/src/policy_update.rs index 03695d48e..22363c920 100644 --- a/crates/openshell-cli/src/policy_update.rs +++ b/crates/openshell-cli/src/policy_update.rs @@ -206,6 +206,7 @@ fn group_allow_rules(specs: &[String]) -> Result Result L7QueryMatcher { + match matcher { + QueryMatcherDef::Glob(glob) => L7QueryMatcher { glob, any: vec![] }, + QueryMatcherDef::Any(any) => L7QueryMatcher { + glob: String::new(), + any: any.any, + }, + } +} + +fn matcher_proto_to_def(matcher: L7QueryMatcher) -> QueryMatcherDef { + if matcher.any.is_empty() { + QueryMatcherDef::Glob(matcher.glob) + } else { + QueryMatcherDef::Any(QueryAnyDef { any: matcher.any }) + } +} + fn to_proto(raw: PolicyFile) -> SandboxPolicy { let network_policies = raw .network_policies @@ -311,16 +329,15 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { .query .into_iter() .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) + (key, matcher_def_to_proto(matcher)) + }) + .collect(), + params: r + .allow + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_def_to_proto(matcher)) }) .collect(), }), @@ -341,18 +358,12 @@ fn to_proto(raw: PolicyFile) -> SandboxPolicy { query: d .query .into_iter() - .map(|(key, matcher)| { - let proto = match matcher { - QueryMatcherDef::Glob(glob) => { - L7QueryMatcher { glob, any: vec![] } - } - QueryMatcherDef::Any(any) => L7QueryMatcher { - glob: String::new(), - any: any.any, - }, - }; - (key, proto) - }) + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) + .collect(), + params: d + .params + .into_iter() + .map(|(key, matcher)| (key, matcher_def_to_proto(matcher))) .collect(), }) .collect(), @@ -488,17 +499,16 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { .query .into_iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any, - }) - }; - (key, yaml_matcher) + (key, matcher_proto_to_def(matcher)) + }) + .collect(), + params: a + .params + .into_iter() + .map(|(key, matcher)| { + (key, matcher_proto_to_def(matcher)) }) .collect(), - params: BTreeMap::new(), }, } }) @@ -519,17 +529,16 @@ fn from_proto(policy: &SandboxPolicy) -> PolicyFile { .query .iter() .map(|(key, matcher)| { - let yaml_matcher = if matcher.any.is_empty() { - QueryMatcherDef::Glob(matcher.glob.clone()) - } else { - QueryMatcherDef::Any(QueryAnyDef { - any: matcher.any.clone(), - }) - }; - (key.clone(), yaml_matcher) + (key.clone(), matcher_proto_to_def(matcher.clone())) + }) + .collect(), + params: d + .params + .iter() + .map(|(key, matcher)| { + (key.clone(), matcher_proto_to_def(matcher.clone())) }) .collect(), - params: BTreeMap::new(), }) .collect(), allow_encoded_slash: e.allow_encoded_slash, diff --git a/crates/openshell-policy/src/merge.rs b/crates/openshell-policy/src/merge.rs index 73c40316e..6be185ca0 100644 --- a/crates/openshell-policy/src/merge.rs +++ b/crates/openshell-policy/src/merge.rs @@ -748,6 +748,7 @@ fn expand_access_preset(protocol: &str, access: &str) -> Option> { operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: HashMap::default(), }), }) .collect(), @@ -963,6 +964,7 @@ mod tests { operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: HashMap::default(), }), } } diff --git a/crates/openshell-providers/src/profiles.rs b/crates/openshell-providers/src/profiles.rs index 86e3928f0..f94c09ef9 100644 --- a/crates/openshell-providers/src/profiles.rs +++ b/crates/openshell-providers/src/profiles.rs @@ -821,6 +821,7 @@ fn allow_to_proto(allow: &L7AllowProfile) -> L7Allow { operation_name: allow.operation_name.clone(), fields: allow.fields.clone(), rpc_method: String::new(), + params: HashMap::new(), } } @@ -854,6 +855,7 @@ fn deny_rule_to_proto(rule: &L7DenyRuleProfile) -> L7DenyRule { operation_name: rule.operation_name.clone(), fields: rule.fields.clone(), rpc_method: String::new(), + params: HashMap::new(), } } diff --git a/crates/openshell-sandbox/data/sandbox-policy.rego b/crates/openshell-sandbox/data/sandbox-policy.rego index c25b42af0..38070911c 100644 --- a/crates/openshell-sandbox/data/sandbox-policy.rego +++ b/crates/openshell-sandbox/data/sandbox-policy.rego @@ -274,6 +274,15 @@ request_denied_for_endpoint(request, endpoint) if { command_matches(request.command, deny_rule.command) } +# --- L7 deny rule matching: JSON-RPC method + params --- + +request_denied_for_endpoint(request, endpoint) if { + some deny_rule + deny_rule := endpoint.deny_rules[_] + deny_rule.rpc_method + jsonrpc_rule_matches(request, deny_rule) +} + # --- L7 deny rule matching: GraphQL operation --- request_denied_for_endpoint(request, endpoint) if { @@ -423,10 +432,7 @@ request_allowed_for_endpoint(request, endpoint) if { some rule rule := endpoint.rules[_] rule.allow.rpc_method - jsonrpc := object.get(request, "jsonrpc", {}) - method := object.get(jsonrpc, "method", null) - method != null - glob.match(rule.allow.rpc_method, [], method) + jsonrpc_rule_matches(request, rule.allow) } # --- L7 rule matching: GraphQL operation --- @@ -650,6 +656,35 @@ query_value_matches(value, matcher) if { glob.match(any_patterns[i], [], value) } +# JSON-RPC method and params matching. The sandbox flattens object params into +# dot-separated keys before policy evaluation, e.g. arguments.scope. +jsonrpc_rule_matches(request, rule) if { + jsonrpc := object.get(request, "jsonrpc", {}) + method := object.get(jsonrpc, "method", null) + method != null + glob.match(rule.rpc_method, [], method) + jsonrpc_params_match(jsonrpc, rule) +} + +jsonrpc_params_match(jsonrpc, rule) if { + param_rules := object.get(rule, "params", {}) + not jsonrpc_param_mismatch(jsonrpc, param_rules) +} + +jsonrpc_param_mismatch(jsonrpc, param_rules) if { + some key + matcher := param_rules[key] + not jsonrpc_param_key_matches(jsonrpc, key, matcher) +} + +jsonrpc_param_key_matches(jsonrpc, key, matcher) if { + params := object.get(jsonrpc, "params", {}) + value := object.get(params, key, null) + value != null + is_string(value) + query_value_matches(value, matcher) +} + # SQL command matching: "*" matches any; otherwise case-insensitive. command_matches(_, "*") if true diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs index 2dc83c12d..e75ac761f 100644 --- a/crates/openshell-sandbox/src/l7/jsonrpc.rs +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -4,6 +4,7 @@ //! JSON-RPC 2.0 over HTTP L7 inspection. use miette::Result; +use std::collections::HashMap; use tokio::io::{AsyncRead, AsyncWrite}; use crate::l7::provider::{L7Provider, L7Request}; @@ -33,6 +34,7 @@ pub(crate) async fn parse_jsonrpc_http_request, + pub params: HashMap, pub error: Option, } @@ -54,21 +56,57 @@ pub fn parse_jsonrpc_body(body: &[u8]) -> JsonRpcRequestInfo { let Ok(value) = serde_json::from_slice::(body) else { return JsonRpcRequestInfo { method: None, + params: HashMap::new(), error: Some("invalid JSON".to_string()), }; }; let Some(method) = value.get("method").and_then(|m| m.as_str()) else { return JsonRpcRequestInfo { method: None, + params: HashMap::new(), error: Some("missing or non-string 'method' field".to_string()), }; }; JsonRpcRequestInfo { method: Some(method.to_string()), + params: value + .get("params") + .map_or_else(HashMap::new, flatten_jsonrpc_params), error: None, } } +fn flatten_jsonrpc_params(value: &serde_json::Value) -> HashMap { + let mut params = HashMap::new(); + flatten_json_value("", value, &mut params); + params +} + +fn flatten_json_value(prefix: &str, value: &serde_json::Value, out: &mut HashMap) { + match value { + serde_json::Value::Object(map) => { + for (key, child) in map { + let next = if prefix.is_empty() { + key.clone() + } else { + format!("{prefix}.{key}") + }; + flatten_json_value(&next, child, out); + } + } + serde_json::Value::String(s) if !prefix.is_empty() => { + out.insert(prefix.to_string(), s.clone()); + } + serde_json::Value::Number(n) if !prefix.is_empty() => { + out.insert(prefix.to_string(), n.to_string()); + } + serde_json::Value::Bool(b) if !prefix.is_empty() => { + out.insert(prefix.to_string(), b.to_string()); + } + _ => {} + } +} + #[cfg(test)] mod tests { use super::*; @@ -81,6 +119,20 @@ mod tests { assert!(info.error.is_none()); } + #[test] + fn flattens_object_params_for_policy_matching() { + let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#; + let info = parse_jsonrpc_body(body); + assert_eq!( + info.params.get("name").map(String::as_str), + Some("submit_report") + ); + assert_eq!( + info.params.get("arguments.scope").map(String::as_str), + Some("workspace/main") + ); + } + #[test] fn rpc_method_rule_empty_matches_any() { let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"tools/call"}"#); diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 0df2fe9b8..7d998f1e1 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -1321,6 +1321,7 @@ pub fn evaluate_l7_request( "graphql": request.graphql.clone(), "jsonrpc": request.jsonrpc.as_ref().map(|j| serde_json::json!({ "method": j.method, + "params": j.params, "error": j.error, })), } diff --git a/crates/openshell-sandbox/src/mechanistic_mapper.rs b/crates/openshell-sandbox/src/mechanistic_mapper.rs index 273bdee3b..bbe7b93b8 100644 --- a/crates/openshell-sandbox/src/mechanistic_mapper.rs +++ b/crates/openshell-sandbox/src/mechanistic_mapper.rs @@ -356,6 +356,7 @@ fn build_l7_rules(samples: &HashMap<(String, String), u32>) -> Vec { operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: HashMap::new(), }), }); } diff --git a/crates/openshell-sandbox/src/opa.rs b/crates/openshell-sandbox/src/opa.rs index de6ff5257..ac50340d1 100644 --- a/crates/openshell-sandbox/src/opa.rs +++ b/crates/openshell-sandbox/src/opa.rs @@ -923,6 +923,24 @@ fn resolve_binary_in_container(_policy_path: &str, _entrypoint_pid: u32) -> Opti None } +fn l7_matchers_to_json( + matchers: &std::collections::HashMap, +) -> serde_json::Map { + matchers + .iter() + .map(|(key, matcher)| { + let mut matcher_json = serde_json::json!({}); + if !matcher.glob.is_empty() { + matcher_json["glob"] = matcher.glob.clone().into(); + } + if !matcher.any.is_empty() { + matcher_json["any"] = matcher.any.clone().into(); + } + (key.clone(), matcher_json) + }) + .collect() +} + /// Convert typed proto policy fields to JSON suitable for `engine.add_data_json()`. /// /// The rego rules reference `data.*` directly, so the JSON structure has @@ -1028,29 +1046,18 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St { allow["fields"] = a.fields.clone().into(); } - let query: serde_json::Map = a - .map(|allow| { - allow - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = - matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = - matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect() - }) - .unwrap_or_default(); + let query = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.query) + }); if !query.is_empty() { allow["query"] = query.into(); } + let params = a.map_or_else(serde_json::Map::new, |allow| { + l7_matchers_to_json(&allow.params) + }); + if !params.is_empty() { + allow["params"] = params.into(); + } serde_json::json!({ "allow": allow }) }) .collect(); @@ -1086,23 +1093,17 @@ fn proto_to_opa_data_json(proto: &ProtoSandboxPolicy, entrypoint_pid: u32) -> St if !d.fields.is_empty() { deny["fields"] = d.fields.clone().into(); } - let query: serde_json::Map = d - .query - .iter() - .map(|(key, matcher)| { - let mut matcher_json = serde_json::json!({}); - if !matcher.glob.is_empty() { - matcher_json["glob"] = matcher.glob.clone().into(); - } - if !matcher.any.is_empty() { - matcher_json["any"] = matcher.any.clone().into(); - } - (key.clone(), matcher_json) - }) - .collect(); + if !d.rpc_method.is_empty() { + deny["rpc_method"] = d.rpc_method.clone().into(); + } + let query = l7_matchers_to_json(&d.query); if !query.is_empty() { deny["query"] = query.into(); } + let params = l7_matchers_to_json(&d.params); + if !params.is_empty() { + deny["params"] = params.into(); + } deny }) .collect(); @@ -1951,6 +1952,16 @@ process: } fn l7_jsonrpc_input(host: &str, port: u16, path: &str, rpc_method: &str) -> serde_json::Value { + l7_jsonrpc_input_with_params(host, port, path, rpc_method, serde_json::json!({})) + } + + fn l7_jsonrpc_input_with_params( + host: &str, + port: u16, + path: &str, + rpc_method: &str, + params: serde_json::Value, + ) -> serde_json::Value { serde_json::json!({ "network": { "host": host, "port": port }, "exec": { @@ -1963,7 +1974,8 @@ process: "path": path, "query_params": {}, "jsonrpc": { - "method": rpc_method + "method": rpc_method, + "params": params } } }) @@ -2516,6 +2528,7 @@ network_policies: operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: std::collections::HashMap::new(), }), }], ..Default::default() @@ -2587,6 +2600,7 @@ network_policies: operation_name: String::new(), fields: Vec::new(), rpc_method: "initialize".to_string(), + params: std::collections::HashMap::new(), }), }], ..Default::default() @@ -2623,6 +2637,80 @@ network_policies: assert!(!eval_l7(&engine, &deny_input)); } + #[test] + fn l7_jsonrpc_params_rules_filter_tools_call() { + let data = r#" +network_policies: + jsonrpc_params: + name: jsonrpc_params + endpoints: + - host: mcp.params.test + port: 8000 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: blocked_action + binaries: + - { path: /usr/bin/curl } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).expect("engine from yaml"); + + let read_status = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "read_status"}), + ); + assert!(eval_l7(&engine, &read_status)); + + let submit_report = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "submit_report", + "arguments.scope": "workspace/main" + }), + ); + assert!(eval_l7(&engine, &submit_report)); + + let blocked_without_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({"name": "blocked_action"}), + ); + assert!(!eval_l7(&engine, &blocked_without_args)); + + let blocked_with_args = l7_jsonrpc_input_with_params( + "mcp.params.test", + 8000, + "/mcp", + "tools/call", + serde_json::json!({ + "name": "blocked_action", + "arguments.reason": "test" + }), + ); + assert!(!eval_l7(&engine, &blocked_with_args)); + } + #[test] fn l7_no_request_on_l4_only_endpoint() { // L4-only endpoint should not match L7 allow_request diff --git a/crates/openshell-sandbox/src/policy_local.rs b/crates/openshell-sandbox/src/policy_local.rs index 20536ac2b..aa270d017 100644 --- a/crates/openshell-sandbox/src/policy_local.rs +++ b/crates/openshell-sandbox/src/policy_local.rs @@ -1084,6 +1084,7 @@ fn network_endpoint_from_json( operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: HashMap::new(), }), }) .collect(); @@ -1099,6 +1100,7 @@ fn network_endpoint_from_json( operation_name: String::new(), fields: Vec::new(), rpc_method: String::new(), + params: HashMap::new(), }) .collect(); diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2e2210f44..ebae4809a 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -8049,6 +8049,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; @@ -8444,6 +8446,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }]; @@ -8590,6 +8594,8 @@ mod tests { operation_type: String::new(), operation_name: String::new(), fields: Vec::new(), + params: HashMap::default(), + rpc_method: String::new(), }), }], }; diff --git a/proto/sandbox.proto b/proto/sandbox.proto index 9d6ec2824..afe1d3301 100644 --- a/proto/sandbox.proto +++ b/proto/sandbox.proto @@ -165,6 +165,9 @@ message L7DenyRule { repeated string fields = 7; // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // An L7 policy rule (allow-only). @@ -193,6 +196,9 @@ message L7Allow { repeated string fields = 7; // JSON-RPC method name (JSON-RPC): exact name or glob, e.g. "tools/call". string rpc_method = 8; + // JSON-RPC params matcher map. Dot-separated keys select nested params + // fields, e.g. "arguments.scope". + map params = 9; } // Query value matcher for one query parameter key. From 755ec0ffcee0ee3bc1ba25ac73f747087d23e28d Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 11:59:38 -0700 Subject: [PATCH 7/9] feat(l7): support JSON-RPC batch calls Parse JSON-RPC batch arrays into per-call metadata and evaluate each batch item with the existing method and params policy rules. Deny the whole batch when any call is denied. Signed-off-by: Kris Hicks --- crates/openshell-sandbox/src/l7/jsonrpc.rs | 109 ++++++++++++------- crates/openshell-sandbox/src/l7/relay.rs | 117 +++++++++++++++++++-- 2 files changed, 184 insertions(+), 42 deletions(-) diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs index e75ac761f..81c7c62fa 100644 --- a/crates/openshell-sandbox/src/l7/jsonrpc.rs +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -33,19 +33,15 @@ pub(crate) async fn parse_jsonrpc_http_request, - pub params: HashMap, + pub calls: Vec, + pub is_batch: bool, pub error: Option, } -/// Returns true if the parsed request's method matches the given `rpc_method` rule pattern. -/// -/// An empty `rpc_method` pattern matches any method. -pub fn rpc_method_rule_matches(info: &JsonRpcRequestInfo, rpc_method: &str) -> bool { - if rpc_method.is_empty() { - return true; - } - info.method.as_deref() == Some(rpc_method) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcCallInfo { + pub method: String, + pub params: HashMap, } /// Parse a JSON-RPC 2.0 request body and extract the `method` field. @@ -55,25 +51,60 @@ pub fn rpc_method_rule_matches(info: &JsonRpcRequestInfo, rpc_method: &str) -> b pub fn parse_jsonrpc_body(body: &[u8]) -> JsonRpcRequestInfo { let Ok(value) = serde_json::from_slice::(body) else { return JsonRpcRequestInfo { - method: None, - params: HashMap::new(), + calls: Vec::new(), + is_batch: false, error: Some("invalid JSON".to_string()), }; }; - let Some(method) = value.get("method").and_then(|m| m.as_str()) else { + + if let serde_json::Value::Array(items) = value { + if items.is_empty() { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some("empty batch".to_string()), + }; + } + let mut calls = Vec::new(); + for item in &items { + let Some(call) = parse_jsonrpc_call(item) else { + return JsonRpcRequestInfo { + calls: Vec::new(), + is_batch: true, + error: Some("batch item missing or non-string 'method' field".to_string()), + }; + }; + calls.push(call); + } + return JsonRpcRequestInfo { + calls, + is_batch: true, + error: None, + }; + } + + let Some(call) = parse_jsonrpc_call(&value) else { return JsonRpcRequestInfo { - method: None, - params: HashMap::new(), + calls: Vec::new(), + is_batch: false, error: Some("missing or non-string 'method' field".to_string()), }; }; JsonRpcRequestInfo { - method: Some(method.to_string()), + calls: vec![call], + is_batch: false, + error: None, + } +} + +fn parse_jsonrpc_call(value: &serde_json::Value) -> Option { + let method = value.get("method").and_then(|m| m.as_str())?; + Some(JsonRpcCallInfo { + method: method.to_string(), params: value .get("params") .map_or_else(HashMap::new, flatten_jsonrpc_params), - error: None, - } + }) } fn flatten_jsonrpc_params(value: &serde_json::Value) -> HashMap { @@ -115,7 +146,12 @@ mod tests { fn parses_method_from_request_body() { let body = br#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#; let info = parse_jsonrpc_body(body); - assert_eq!(info.method.as_deref(), Some("initialize")); + assert_eq!( + info.calls.first().map(|call| call.method.as_str()), + Some("initialize") + ); + assert_eq!(info.calls.len(), 1); + assert!(!info.is_batch); assert!(info.error.is_none()); } @@ -123,31 +159,32 @@ mod tests { fn flattens_object_params_for_policy_matching() { let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#; let info = parse_jsonrpc_body(body); + let params = &info.calls.first().expect("single request call").params; assert_eq!( - info.params.get("name").map(String::as_str), + params.get("name").map(String::as_str), Some("submit_report") ); assert_eq!( - info.params.get("arguments.scope").map(String::as_str), + params.get("arguments.scope").map(String::as_str), Some("workspace/main") ); } #[test] - fn rpc_method_rule_empty_matches_any() { - let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"tools/call"}"#); - assert!(rpc_method_rule_matches(&info, "")); - } - - #[test] - fn rpc_method_rule_matches_exact_method() { - let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"initialize"}"#); - assert!(rpc_method_rule_matches(&info, "initialize")); - } - - #[test] - fn rpc_method_rule_does_not_match_different_method() { - let info = parse_jsonrpc_body(br#"{"jsonrpc":"2.0","id":1,"method":"tools/call"}"#); - assert!(!rpc_method_rule_matches(&info, "initialize")); + fn parses_valid_batch_without_error() { + let body = br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#; + let info = parse_jsonrpc_body(body); + assert!(info.error.is_none()); + assert!(info.is_batch); + assert_eq!(info.calls.len(), 2); + assert_eq!(info.calls[0].method, "tools/list"); + assert_eq!(info.calls[1].method, "tools/call"); + assert_eq!( + info.calls[1].params.get("name").map(String::as_str), + Some("read_status") + ); } } diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index 7d998f1e1..f79dc7851 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -981,7 +981,11 @@ where SeverityId::Informational, ), }; - let rpc_method = jsonrpc_info.method.as_deref().unwrap_or("-"); + let rpc_method = jsonrpc_info + .calls + .first() + .map(|call| call.method.as_str()) + .unwrap_or("-"); let event = HttpActivityBuilder::new(crate::ocsf_ctx()) .activity(ActivityId::Other) .action(action_id) @@ -1295,6 +1299,33 @@ pub fn evaluate_l7_request( engine: &TunnelPolicyEngine, ctx: &L7EvalContext, request: &L7RequestInfo, +) -> Result<(bool, String)> { + if let Some(jsonrpc) = &request.jsonrpc + && jsonrpc.is_batch + && !jsonrpc.calls.is_empty() + { + for call in &jsonrpc.calls { + let mut item_request = request.clone(); + item_request.jsonrpc = Some(crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: vec![call.clone()], + is_batch: false, + error: None, + }); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + return Ok((false, reason)); + } + } + return Ok((true, String::new())); + } + + evaluate_l7_request_once(engine, ctx, request) +} + +fn evaluate_l7_request_once( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, ) -> Result<(bool, String)> { if engine.is_stale() { return Err(miette!( @@ -1319,11 +1350,14 @@ pub fn evaluate_l7_request( "path": request.target, "query_params": request.query_params.clone(), "graphql": request.graphql.clone(), - "jsonrpc": request.jsonrpc.as_ref().map(|j| serde_json::json!({ - "method": j.method, - "params": j.params, - "error": j.error, - })), + "jsonrpc": request.jsonrpc.as_ref().map(|j| { + let call = if j.is_batch { None } else { j.calls.first() }; + serde_json::json!({ + "method": call.map(|call| call.method.as_str()), + "params": call.map(|call| call.params.clone()).unwrap_or_default(), + "error": j.error, + }) + }), } }); @@ -1966,6 +2000,77 @@ network_policies: assert!(reason.contains("WEBSOCKET_TEXT /ws not permitted")); } + #[test] + fn jsonrpc_batch_evaluates_each_call() { + let data = r#" +network_policies: + jsonrpc_api: + name: jsonrpc_api + endpoints: + - host: api.example.test + port: 443 + protocol: json-rpc + enforcement: enforce + rules: + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/list" + - allow: + method: POST + path: "/mcp" + rpc_method: "tools/call" + params: + name: read_status + deny_rules: + - rpc_method: "tools/call" + params: + name: blocked_action + binaries: + - { path: /usr/bin/node } +"#; + let engine = OpaEngine::from_strings(TEST_POLICY, data).unwrap(); + let tunnel_engine = engine + .clone_engine_for_tunnel(engine.current_generation()) + .unwrap(); + let ctx = L7EvalContext { + host: "api.example.test".into(), + port: 443, + policy_name: "jsonrpc_api".into(), + binary_path: "/usr/bin/node".into(), + ancestors: vec![], + cmdline_paths: vec![], + secret_resolver: None, + activity_tx: None, + dynamic_credentials: None, + token_grant_resolver: None, + }; + let mut request = L7RequestInfo { + action: "POST".into(), + target: "/mcp".into(), + query_params: std::collections::HashMap::new(), + graphql: None, + jsonrpc: Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"read_status"}} + ]"#, + )), + }; + + let (allowed, reason) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(allowed, "{reason}"); + + request.jsonrpc = Some(crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}} + ]"#, + )); + let (allowed, _) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); + assert!(!allowed); + } + #[tokio::test] async fn route_selected_websocket_upgrade_rejects_invalid_accept_without_forwarding_101() { let data = r#" From 6302ec9cf6fa1dcdc3a3c7d1900740145740d06a Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 14:02:40 -0700 Subject: [PATCH 8/9] fix(l7): redact JSON-RPC params in logs Log JSON-RPC endpoint, RPC methods, params SHA-256 digest, and policy version without recording raw params. Use when no params are present. Signed-off-by: Kris Hicks --- crates/openshell-sandbox/src/l7/jsonrpc.rs | 78 +++++++ crates/openshell-sandbox/src/l7/relay.rs | 233 +++++++++++++++++++-- 2 files changed, 293 insertions(+), 18 deletions(-) diff --git a/crates/openshell-sandbox/src/l7/jsonrpc.rs b/crates/openshell-sandbox/src/l7/jsonrpc.rs index 81c7c62fa..563767200 100644 --- a/crates/openshell-sandbox/src/l7/jsonrpc.rs +++ b/crates/openshell-sandbox/src/l7/jsonrpc.rs @@ -4,6 +4,8 @@ //! JSON-RPC 2.0 over HTTP L7 inspection. use miette::Result; +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; use std::collections::HashMap; use tokio::io::{AsyncRead, AsyncWrite}; @@ -44,6 +46,27 @@ pub struct JsonRpcCallInfo { pub params: HashMap, } +impl JsonRpcRequestInfo { + pub(crate) fn params_sha256(&self) -> Option { + if self.is_batch { + if self.calls.is_empty() || self.calls.iter().all(|call| call.params.is_empty()) { + return None; + } + let canonical_params = self + .calls + .iter() + .map(|call| canonical_params_map(&call.params)) + .collect::>(); + return Some(sha256_json(&canonical_params)); + } + + let call = self.calls.first()?; + if call.params.is_empty() { + return None; + } + Some(sha256_json(&canonical_params_map(&call.params))) + } +} /// Parse a JSON-RPC 2.0 request body and extract the `method` field. /// /// Returns an info struct with `method` set on success, or `error` set if the @@ -113,6 +136,18 @@ fn flatten_jsonrpc_params(value: &serde_json::Value) -> HashMap params } +fn canonical_params_map(params: &HashMap) -> BTreeMap { + params + .iter() + .map(|(key, value)| (key.clone(), value.clone())) + .collect() +} + +fn sha256_json(value: &impl serde::Serialize) -> String { + let encoded = serde_json::to_vec(value).expect("canonical JSON-RPC params should serialize"); + hex::encode(Sha256::digest(&encoded)) +} + fn flatten_json_value(prefix: &str, value: &serde_json::Value, out: &mut HashMap) { match value { serde_json::Value::Object(map) => { @@ -187,4 +222,47 @@ mod tests { Some("read_status") ); } + + #[test] + fn params_digest_is_canonical_and_redacted() { + let first = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/main"}}}"#, + ); + let reordered = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"arguments":{"scope":"workspace/main"},"name":"submit_report"}}"#, + ); + let changed = parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit_report","arguments":{"scope":"workspace/other"}}}"#, + ); + + let digest = first.params_sha256().expect("params digest"); + assert_eq!(Some(digest.as_str()), reordered.params_sha256().as_deref()); + assert_ne!(Some(digest.as_str()), changed.params_sha256().as_deref()); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("workspace/main")); + assert!(!digest.contains("submit_report")); + } + + #[test] + fn batch_params_digest_covers_call_params_without_raw_values() { + let batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}} + ]"#, + ); + let empty_batch = parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"initialize"} + ]"#, + ); + + let digest = batch.params_sha256().expect("batch params digest"); + assert_eq!(digest.len(), 64); + assert!(digest.chars().all(|c| c.is_ascii_hexdigit())); + assert!(!digest.contains("blocked_action")); + assert!(empty_batch.params_sha256().is_none()); + } } diff --git a/crates/openshell-sandbox/src/l7/relay.rs b/crates/openshell-sandbox/src/l7/relay.rs index f79dc7851..dfc2ee389 100644 --- a/crates/openshell-sandbox/src/l7/relay.rs +++ b/crates/openshell-sandbox/src/l7/relay.rs @@ -955,10 +955,12 @@ where .as_deref() .map(|e| format!("JSON-RPC request rejected: {e}")); let force_deny = parse_error_reason.is_some(); - let (allowed, reason) = if let Some(reason) = parse_error_reason { - (false, reason) + let (allowed, reason, jsonrpc_log_info) = if let Some(reason) = parse_error_reason { + (false, reason, jsonrpc_info.clone()) } else { - evaluate_l7_request(engine, ctx, &request_info)? + let evaluation = + evaluate_jsonrpc_l7_request_for_log(engine, ctx, &request_info, &jsonrpc_info)?; + (evaluation.allowed, evaluation.reason, evaluation.log_info) }; if close_if_stale(engine.generation_guard(), ctx) { @@ -981,11 +983,11 @@ where SeverityId::Informational, ), }; - let rpc_method = jsonrpc_info - .calls - .first() - .map(|call| call.method.as_str()) - .unwrap_or("-"); + let endpoint = format!("{}:{}{}", ctx.host, ctx.port, redacted_target); + let params_sha256 = jsonrpc_log_info + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let policy_version = engine.captured_generation(); let event = HttpActivityBuilder::new(crate::ocsf_ctx()) .activity(ActivityId::Other) .action(action_id) @@ -997,9 +999,14 @@ where )) .dst_endpoint(Endpoint::from_domain(&ctx.host, ctx.port)) .firewall_rule(&ctx.policy_name, "l7-jsonrpc") - .message(format!( - "JSONRPC_L7_REQUEST {decision_str} {} {}:{}{} rpc_method={rpc_method} reason={}", - request_info.action, ctx.host, ctx.port, redacted_target, reason, + .message(jsonrpc_log_message( + decision_str, + &request_info.action, + &endpoint, + &jsonrpc_log_info, + ¶ms_sha256, + policy_version, + &reason, )) .build(); ocsf_emit!(event); @@ -1274,6 +1281,38 @@ fn graphql_log_summary(info: &crate::l7::graphql::GraphqlRequestInfo) -> String format!("graphql_ops={}", ops.join(";")) } +fn jsonrpc_log_message( + decision: &str, + http_method: &str, + endpoint: &str, + info: &crate::l7::jsonrpc::JsonRpcRequestInfo, + params_sha256: &str, + policy_version: u64, + reason: &str, +) -> String { + let rpc_methods = jsonrpc_methods_for_log(info); + format!( + "JSONRPC_L7_REQUEST decision={decision} http_method={http_method} endpoint={endpoint} rpc_methods={rpc_methods} params_sha256={params_sha256} policy_version={policy_version} reason={reason}" + ) +} + +fn jsonrpc_methods_for_log(info: &crate::l7::jsonrpc::JsonRpcRequestInfo) -> String { + if info.calls.is_empty() { + return "-".to_string(); + } + info.calls + .iter() + .map(|call| call.method.as_str()) + .collect::>() + .join(",") +} + +struct JsonRpcEvaluation { + allowed: bool, + reason: String, + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo, +} + /// Check if a miette error represents a benign connection close. /// /// TLS handshake EOF, missing `close_notify`, connection resets, and broken @@ -1305,12 +1344,7 @@ pub fn evaluate_l7_request( && !jsonrpc.calls.is_empty() { for call in &jsonrpc.calls { - let mut item_request = request.clone(); - item_request.jsonrpc = Some(crate::l7::jsonrpc::JsonRpcRequestInfo { - calls: vec![call.clone()], - is_batch: false, - error: None, - }); + let item_request = jsonrpc_request_for_call(request, call); let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; if !allowed { return Ok((false, reason)); @@ -1322,6 +1356,66 @@ pub fn evaluate_l7_request( evaluate_l7_request_once(engine, ctx, request) } +fn evaluate_jsonrpc_l7_request_for_log( + engine: &TunnelPolicyEngine, + ctx: &L7EvalContext, + request: &L7RequestInfo, + jsonrpc: &crate::l7::jsonrpc::JsonRpcRequestInfo, +) -> Result { + if jsonrpc.is_batch && !jsonrpc.calls.is_empty() { + let mut denied_calls = Vec::new(); + let mut first_denied_reason = None; + for call in &jsonrpc.calls { + let item_request = jsonrpc_request_for_call(request, call); + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, &item_request)?; + if !allowed { + if first_denied_reason.is_none() { + first_denied_reason = Some(reason); + } + denied_calls.push(call.clone()); + } + } + + if denied_calls.is_empty() { + return Ok(JsonRpcEvaluation { + allowed: true, + reason: String::new(), + log_info: jsonrpc.clone(), + }); + } + + return Ok(JsonRpcEvaluation { + allowed: false, + reason: first_denied_reason.unwrap_or_else(|| "request denied by policy".to_string()), + log_info: crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: denied_calls, + is_batch: true, + error: None, + }, + }); + } + + let (allowed, reason) = evaluate_l7_request_once(engine, ctx, request)?; + Ok(JsonRpcEvaluation { + allowed, + reason, + log_info: jsonrpc.clone(), + }) +} + +fn jsonrpc_request_for_call( + request: &L7RequestInfo, + call: &crate::l7::jsonrpc::JsonRpcCallInfo, +) -> L7RequestInfo { + let mut item_request = request.clone(); + item_request.jsonrpc = Some(crate::l7::jsonrpc::JsonRpcRequestInfo { + calls: vec![call.clone()], + is_batch: false, + error: None, + }); + item_request +} + fn evaluate_l7_request_once( engine: &TunnelPolicyEngine, ctx: &L7EvalContext, @@ -2026,6 +2120,7 @@ network_policies: - rpc_method: "tools/call" params: name: blocked_action + - rpc_method: "tools/delete" binaries: - { path: /usr/bin/node } "#; @@ -2064,11 +2159,113 @@ network_policies: request.jsonrpc = Some(crate::l7::jsonrpc::parse_jsonrpc_body( br#"[ {"jsonrpc":"2.0","id":1,"method":"tools/list"}, - {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}} + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"blocked_action"}}, + {"jsonrpc":"2.0","id":3,"method":"tools/delete","params":{"name":"purge_cache"}} ]"#, )); let (allowed, _) = evaluate_l7_request(&tunnel_engine, &ctx, &request).unwrap(); assert!(!allowed); + + let jsonrpc = request.jsonrpc.as_ref().expect("jsonrpc request"); + let evaluation = + evaluate_jsonrpc_l7_request_for_log(&tunnel_engine, &ctx, &request, jsonrpc).unwrap(); + assert!(!evaluation.allowed); + assert!(evaluation.log_info.is_batch); + assert_eq!( + jsonrpc_methods_for_log(&evaluation.log_info), + "tools/call,tools/delete" + ); + + let full_params_sha256 = jsonrpc.params_sha256().expect("full batch params digest"); + let log_params_sha256 = evaluation + .log_info + .params_sha256() + .expect("logged batch params digest"); + assert_ne!(full_params_sha256, log_params_sha256); + let message = jsonrpc_log_message( + "deny", + "POST", + "api.example.test:443/mcp", + &evaluation.log_info, + &log_params_sha256, + 42, + &evaluation.reason, + ); + assert!(message.contains("rpc_methods=tools/call,tools/delete")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("tools/list")); + assert!(!message.contains("blocked_action")); + assert!(!message.contains("purge_cache")); + } + + #[test] + fn jsonrpc_log_records_digest_not_args() { + let info = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"delete_resource","arguments":{"scope":"secret-scope"}}}"#, + ); + let params_sha256 = info.params_sha256().expect("params digest"); + let message = jsonrpc_log_message( + "deny", + "POST", + "mcp.example.com:443/mcp", + &info, + ¶ms_sha256, + 42, + "request denied by policy", + ); + + assert!(message.contains("endpoint=mcp.example.com:443/mcp")); + assert!(message.contains("rpc_methods=tools/call")); + assert!(message.contains("params_sha256=")); + assert!(!message.contains("params_sha256=sha256:")); + assert!(message.contains("policy_version=42")); + assert!(!message.contains("delete_resource")); + assert!(!message.contains("secret-scope")); + + let batch = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"[ + {"jsonrpc":"2.0","id":1,"method":"tools/list"}, + {"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"delete_resource"}} + ]"#, + ); + let batch_params_sha256 = batch.params_sha256().expect("batch params digest"); + let batch_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &batch, + &batch_params_sha256, + 43, + "", + ); + + assert!(batch_message.starts_with("JSONRPC_L7_REQUEST ")); + assert!(batch_message.contains("rpc_methods=tools/list,tools/call")); + assert!(batch_message.contains("params_sha256=")); + assert!(!batch_message.contains("params_sha256=sha256:")); + assert!(batch_message.contains("policy_version=43")); + assert!(!batch_message.contains("rpc_method=")); + assert!(!batch_message.contains("delete_resource")); + + let no_params = crate::l7::jsonrpc::parse_jsonrpc_body( + br#"{"jsonrpc":"2.0","id":1,"method":"initialize"}"#, + ); + let no_params_sha256 = no_params + .params_sha256() + .unwrap_or_else(|| "".to_string()); + let no_params_message = jsonrpc_log_message( + "allow", + "POST", + "mcp.example.com:443/mcp", + &no_params, + &no_params_sha256, + 44, + "", + ); + assert!(no_params_message.contains("rpc_methods=initialize")); + assert!(no_params_message.contains("params_sha256=")); } #[tokio::test] From 8dc2a54f9b99d2aa297ccfd49c102ea10ce982f4 Mon Sep 17 00:00:00 2001 From: Kris Hicks Date: Wed, 10 Jun 2026 13:06:04 -0700 Subject: [PATCH 9/9] docs(policy): document JSON-RPC L7 rules Document JSON-RPC endpoint configuration, rpc_method and params matchers, batch denial behavior, current directionality limits, matcher scope, and the current policy update CLI limitation. Signed-off-by: Kris Hicks --- architecture/sandbox.md | 6 +++ docs/reference/policy-schema.mdx | 78 +++++++++++++++++++++++++++++--- docs/sandboxes/policies.mdx | 54 ++++++++++++++++++++-- 3 files changed, 128 insertions(+), 10 deletions(-) diff --git a/architecture/sandbox.md b/architecture/sandbox.md index e60b727a5..0e2d1c559 100644 --- a/architecture/sandbox.md +++ b/architecture/sandbox.md @@ -49,6 +49,12 @@ paths, such as proxy support files or GPU device paths when a GPU is present. All ordinary agent egress is routed through the sandbox proxy. The proxy identifies the calling binary, checks trust-on-first-use binary identity, rejects unsafe internal destinations, and evaluates the active policy. +For inspected HTTP traffic, the proxy can enforce REST method/path rules, +WebSocket upgrade and text-message rules, GraphQL operation rules, and +JSON-RPC method and params rules on sandbox-to-server request bodies. JSON-RPC +request inspection buffers up to the endpoint `json_rpc.max_body_bytes` limit. +JSON-RPC responses and server-to-client MCP messages on response or SSE streams +are relayed but are not currently parsed for policy enforcement. `https://inference.local` is special. It bypasses OPA network policy and is handled by the inference interception path: diff --git a/docs/reference/policy-schema.mdx b/docs/reference/policy-schema.mdx index 59f72c9f7..fa540e4dd 100644 --- a/docs/reference/policy-schema.mdx +++ b/docs/reference/policy-schema.mdx @@ -155,7 +155,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `host` | string | Yes | Hostname or IP address. Supports a `*` wildcard inside the first DNS label only: `*.example.com`, `**.example.com`, and intra-label patterns like `*-aiplatform.googleapis.com` are accepted; bare `*`/`**`, TLD wildcards (`*.com`), and wildcards outside the first label are rejected at load time. | | `port` | integer | Yes | TCP port number. | | `path` | string | No | Optional HTTP path glob used to select between L7 endpoints that share the same host and port. Empty means all paths. Use this when REST and GraphQL live under the same host, such as `/repos/**` and `/graphql`. | -| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, or `graphql` for GraphQL-over-HTTP operation inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | +| `protocol` | string | No | Set to `rest` for HTTP method/path inspection, `websocket` for RFC 6455 upgrade and client text-message inspection, `graphql` for GraphQL-over-HTTP operation inspection, or `json-rpc` for sandbox-to-server JSON-RPC-over-HTTP method and params inspection. WebSocket endpoints can also use GraphQL operation rules for GraphQL-over-WebSocket traffic. Omit for TCP passthrough. | | `tls` | string | No | TLS handling mode. The proxy auto-detects TLS by peeking the first bytes of each connection and terminates it for inspected HTTPS traffic, so this field is optional in most cases. Set to `skip` to disable auto-detection for edge cases such as client-certificate mTLS or non-standard protocols. The values `terminate` and `passthrough` are deprecated and log a warning; they are still accepted for backward compatibility but have no effect on behavior. | | `enforcement` | string | No | `enforce` actively blocks disallowed requests. `audit` logs violations but allows traffic through. | | `access` | string | No | Access preset. One of `read-only`, `read-write`, or `full`. Mutually exclusive with `rules`. | @@ -168,6 +168,7 @@ Each endpoint defines a reachable destination and optional inspection rules. | `persisted_queries` | string | No | GraphQL hash-only behavior for `protocol: graphql` and GraphQL-over-WebSocket operation policy. Default is `deny`; use `allow_registered` only with `graphql_persisted_queries`. | | `graphql_persisted_queries` | map | No | Trusted GraphQL persisted-query registry keyed by hash or saved-query ID. Values contain `operation_type`, optional `operation_name`, and optional root `fields`. | | `graphql_max_body_bytes` | integer | No | Maximum GraphQL-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | +| `json_rpc` | object | No | JSON-RPC endpoint options. For `protocol: json-rpc`, `json_rpc.max_body_bytes` sets the maximum JSON-RPC-over-HTTP request body bytes buffered for inspection. Defaults to `65536`. | Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placeholder form and whole-token provider-shaped aliases such as `provider-OPENSHELL-RESOLVE-ENV-API_TOKEN` when the referenced environment key exists in the configured provider credentials. @@ -175,11 +176,13 @@ Credential rewrite recognizes the canonical `openshell:resolve:env:KEY` placehol The `access` field accepts one of the following values: -| Value | REST expansion | WebSocket expansion | GraphQL expansion | -|---|---|---|---| -| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | -| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | -| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | +| Value | REST expansion | WebSocket expansion | GraphQL expansion | JSON-RPC behavior | +|---|---|---|---|---| +| `full` | All methods and paths. | WebSocket upgrade and all inspected client text-message paths. | All operation types. | Allows matching HTTP requests without constraining JSON-RPC methods. | +| `read-only` | `GET`, `HEAD`, `OPTIONS`. | WebSocket upgrade handshake only. | `query` operations. | Expands to HTTP read methods and does not allow typical JSON-RPC `POST` calls. | +| `read-write` | `GET`, `HEAD`, `OPTIONS`, `POST`, `PUT`, `PATCH`. | WebSocket upgrade handshake and client text messages. | `query` and `mutation` operations. | Allows matching HTTP write requests without constraining JSON-RPC methods. | + +For JSON-RPC endpoints, prefer explicit `rules` with `rpc_method` and optional `params` when you need method-level control. #### Allow Rule Objects @@ -274,6 +277,42 @@ rules: Do not combine `method`, `path`, or `query` with `operation_type`, `operation_name`, or `fields` inside the same WebSocket rule. When a WebSocket endpoint has GraphQL operation policy, use GraphQL rules for client messages instead of a raw `WEBSOCKET_TEXT` allow rule. +##### JSON-RPC Allow Rule (`protocol: json-rpc`) + +JSON-RPC allow rules match sandbox-to-server JSON-RPC-over-HTTP request objects by RPC method and optional params. They apply to single JSON-RPC requests and batch requests. For a batch, OpenShell evaluates each call independently. JSON-RPC responses and server-to-client messages on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob, such as `initialize`, `tools/list`, or `tools/*`. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Use dot-separated keys for nested object params, such as `arguments.scope`. Matcher value can be a glob string or an object with `any`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. | + +Example JSON-RPC allow rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main +``` + #### Deny Rule Objects Blocks specific operations on endpoints that otherwise have broad access. Deny rules are evaluated after allow rules and take precedence: if a request matches any deny rule, it is blocked regardless of what the allow rules or access preset permit. @@ -356,6 +395,33 @@ endpoints: operation_name: Admin* ``` +##### JSON-RPC Deny Rule (`protocol: json-rpc`) + +JSON-RPC deny rules use the same field names as JSON-RPC allow rules, but they appear directly under each `deny_rules` entry instead of under an `allow` wrapper. Deny rules take precedence over allow rules. In a batch request, one denied call denies the full batch. + +| Field | Type | Required | Description | +|---|---|---|---| +| `rpc_method` | string | Yes | JSON-RPC method name or glob to deny. | +| `params` | map | No | Params matchers keyed by flattened object-param path. Omit to deny every call matching `rpc_method`. Strings, numbers, and booleans are converted to strings; arrays, `null`, and non-object top-level params do not produce matcher keys. | + +Example JSON-RPC deny rules: + +```yaml showLineNumbers={false} +endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + rules: + - allow: + rpc_method: tools/* + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource +``` + ### Binary Object Identifies an executable that is permitted to use the associated endpoints. diff --git a/docs/sandboxes/policies.mdx b/docs/sandboxes/policies.mdx index 406ed12b8..bae6fa279 100644 --- a/docs/sandboxes/policies.mdx +++ b/docs/sandboxes/policies.mdx @@ -148,7 +148,7 @@ The following steps outline the hot-reload policy update workflow. To inspect a stored sandbox-authored revision instead of the current effective policy, pass `--rev `. -5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, or `rules`. +5. Edit the YAML: add or adjust `network_policies` entries, binaries, `access`, `rules`, or protocol-specific matchers such as GraphQL operation fields and JSON-RPC `rpc_method` / `params` rules. 6. Push the updated policy when you need a full replacement. Exit codes: 0 = loaded, 1 = validation failed, 124 = timeout. @@ -173,7 +173,7 @@ Use `openshell policy update` when you want to merge network policy changes into - remove one endpoint or one named rule without rewriting the rest of the file. - preview a merged result locally with `--dry-run` before you send it to the gateway. -Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. +Use `openshell policy set` instead when you want to replace the full policy, update static sections, or make broader edits that are easier to express in YAML. Use full YAML for GraphQL and JSON-RPC rule shapes. ### Update Commands @@ -210,6 +210,7 @@ This is the practical difference: Current constraints: - `--add-allow` and `--add-deny` work on `protocol: rest` and `protocol: websocket` endpoints. +- GraphQL and JSON-RPC fine-grained rules require full policy YAML applied with `openshell policy set`. - `--add-deny` requires the endpoint to already have an allow base, either an `access` preset or explicit allow `rules`. - `protocol: sql` is not a practical incremental workflow today. OpenShell does not do full SQL parsing, and SQL enforcement is not meaningfully supported yet. @@ -228,7 +229,7 @@ Each segment has a fixed meaning: | `host` | Yes | Destination hostname. | | `port` | Yes | Destination port, `1` through `65535`. | | `access` | No | Access preset for L7 endpoints: `read-only`, `read-write`, or `full`. Incremental updates expand presets into protocol-specific method/path rules for REST and WebSocket endpoints. | -| `protocol` | No | L7 inspection mode: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. | +| `protocol` | No | L7 inspection mode accepted by `openshell policy update`: `rest`, `websocket`, or `sql`. `sql` is audit-only and not a recommended workflow today. Full policy YAML also supports `graphql` and `json-rpc`. | | `enforcement` | No | Enforcement mode for inspected traffic: `enforce` or `audit`. | | `options` | No | Comma-separated endpoint options. Use `websocket-credential-rewrite` with `protocol: websocket` or REST compatibility endpoints that perform a WebSocket upgrade. Use `request-body-credential-rewrite` only with `protocol: rest`. | @@ -548,7 +549,7 @@ For an end-to-end walkthrough that combines this policy with a GitHub credential - { path: /usr/bin/gh } ``` -Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. +Endpoints with `protocol: rest` enable HTTP request inspection and can opt in to supported text request body credential rewrite. Endpoints with `protocol: websocket` validate WebSocket upgrades and inspect client text messages on the upgraded request path. WebSocket endpoints can also classify GraphQL-over-WebSocket operation messages with the same operation rules used by GraphQL-over-HTTP. Endpoints with `protocol: graphql` parse GraphQL-over-HTTP payloads before evaluating rules. Endpoints with `protocol: json-rpc` parse JSON-RPC-over-HTTP request bodies and evaluate `rpc_method` and optional params rules. The endpoint-level `path` field lets these protocols share `api.github.com:443` without treating GraphQL payloads as plain REST `POST /graphql` requests. @@ -579,6 +580,51 @@ REST rules can also constrain query parameter values: `query` matchers are case-sensitive and run on decoded values. If a request has duplicate keys (for example, `tag=a&tag=b`), every value for that key must match the configured glob(s). +### JSON-RPC matching + +JSON-RPC endpoints use `protocol: json-rpc`. The proxy parses sandbox-to-server JSON-RPC-over-HTTP request bodies, evaluates the `method` field against `rpc_method`, and can match object params through dot-separated `params` keys. + +JSON-RPC policy enforcement is directional. It applies to HTTP request bodies sent by the sandboxed process to the configured endpoint. JSON-RPC responses and server-to-client messages carried on response bodies or MCP SSE streams are relayed but are not currently parsed for policy enforcement. + +JSON-RPC endpoint policies currently require full policy YAML applied with `openshell policy set`; the incremental `openshell policy update --add-endpoint` parser does not accept `json-rpc` as a protocol. + +```yaml showLineNumbers={false} + mcp_server: + name: mcp_server + endpoints: + - host: mcp.example.com + port: 443 + path: /mcp + protocol: json-rpc + enforcement: enforce + json_rpc: + max_body_bytes: 131072 + rules: + - allow: + rpc_method: initialize + - allow: + rpc_method: tools/list + - allow: + rpc_method: tools/call + params: + name: read_status + - allow: + rpc_method: tools/call + params: + name: submit_report + arguments.scope: workspace/main + deny_rules: + - rpc_method: tools/call + params: + name: delete_resource + binaries: + - { path: /usr/bin/python3 } +``` + +`json_rpc.max_body_bytes` controls how many JSON-RPC-over-HTTP request body bytes OpenShell buffers for inspection. It defaults to `65536`. + +`params` matchers are case-sensitive and use the same string glob or `{ any: [...] }` matcher syntax as REST query parameters. They match scalar leaf values from object params: strings, numbers, and booleans are converted to strings, and nested JSON object params are flattened with dot-separated keys before matching. Arrays, `null`, and non-object top-level params do not produce matcher keys. This is useful for controls such as matching MCP `tools/call` by `params.name`, but it is not a complete MCP payload policy for rich nested content. For batch requests, OpenShell evaluates each JSON-RPC call independently and denies the whole batch if any call is denied. + ### GraphQL matching GraphQL endpoints use `protocol: graphql`. The proxy parses GraphQL-over-HTTP `GET` and `POST` requests, classifies each operation, and evaluates rules against the operation type, optional operation name, and selected root fields.