From 47f50b0f7120fc2d99518a732db5c252dc098466 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:34:18 -0500 Subject: [PATCH 01/12] message: Add constants for protocol string literals Replace repeated string literals with constants: - FD_PLACEHOLDER_KEY for "__jsonrpc_fd__" - FD_INDEX_KEY for "index" - JSONRPC_VERSION for "2.0" This also makes collect_placeholder_indices a static method since it doesn't use self. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- src/message.rs | 29 ++++++++++++++++++----------- src/transport.rs | 4 ++-- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/message.rs b/src/message.rs index 6b7a929..9632f09 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,6 +3,13 @@ use jsonrpsee::types::error::ErrorObject as JsonRpcError; use serde::{Deserialize, Serialize}; use std::os::unix::io::OwnedFd; +/// The JSON key used to identify file descriptor placeholders. +pub const FD_PLACEHOLDER_KEY: &str = "__jsonrpc_fd__"; +/// The JSON key for the file descriptor index within a placeholder. +pub const FD_INDEX_KEY: &str = "index"; +/// The JSON-RPC protocol version. +pub const JSONRPC_VERSION: &str = "2.0"; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileDescriptorPlaceholder { #[serde(rename = "__jsonrpc_fd__")] @@ -55,7 +62,7 @@ pub enum JsonRpcMessage { impl JsonRpcRequest { pub fn new(method: String, params: Option, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), method, params, id, @@ -66,7 +73,7 @@ impl JsonRpcRequest { impl JsonRpcResponse { pub fn success(result: serde_json::Value, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), result: Some(result), error: None, id, @@ -75,7 +82,7 @@ impl JsonRpcResponse { pub fn error(error: JsonRpcError<'static>, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), result: None, error: Some(error), id, @@ -86,7 +93,7 @@ impl JsonRpcResponse { impl JsonRpcNotification { pub fn new(method: String, params: Option) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), method, params, } @@ -148,7 +155,7 @@ impl MessageWithFds { let fd_count = self.file_descriptors.len(); let mut placeholder_indices = Vec::new(); - self.collect_placeholder_indices(value, &mut placeholder_indices); + Self::collect_placeholder_indices(value, &mut placeholder_indices); if placeholder_indices.len() != fd_count { return Err(Error::MismatchedCount { @@ -166,26 +173,26 @@ impl MessageWithFds { Ok(()) } - fn collect_placeholder_indices(&self, value: &serde_json::Value, indices: &mut Vec) { + fn collect_placeholder_indices(value: &serde_json::Value, indices: &mut Vec) { match value { serde_json::Value::Object(map) => { if let ( Some(serde_json::Value::Bool(true)), Some(serde_json::Value::Number(index)), - ) = (map.get("__jsonrpc_fd__"), map.get("index")) + ) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { if let Some(index) = index.as_u64() { indices.push(index as usize); } } else { for v in map.values() { - self.collect_placeholder_indices(v, indices); + Self::collect_placeholder_indices(v, indices); } } } serde_json::Value::Array(arr) => { for v in arr { - self.collect_placeholder_indices(v, indices); + Self::collect_placeholder_indices(v, indices); } } _ => {} @@ -219,7 +226,7 @@ impl MessageWithFds { match value { serde_json::Value::Object(map) => { if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get("__jsonrpc_fd__"), map.get("index")) + (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { *count += 1; } else { @@ -260,7 +267,7 @@ impl MessageWithFds { if let ( Some(serde_json::Value::Bool(true)), Some(serde_json::Value::Number(index)), - ) = (map.get("__jsonrpc_fd__"), map.get("index")) + ) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { if let Some(index) = index.as_u64() { indices.push(index as usize); diff --git a/src/transport.rs b/src/transport.rs index 90a8980..96ad26f 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,5 +1,5 @@ use crate::error::{Error, Result}; -use crate::message::{JsonRpcMessage, MessageWithFds}; +use crate::message::{JsonRpcMessage, MessageWithFds, FD_INDEX_KEY, FD_PLACEHOLDER_KEY}; use rustix::fd::AsFd; use rustix::net::{ RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags, SendAncillaryBuffer, @@ -153,7 +153,7 @@ impl Receiver { match value { serde_json::Value::Object(map) => { if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get("__jsonrpc_fd__"), map.get("index")) + (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { *count += 1; } else { From d70a26dbfb88d2650b967396cba4fb4ce0613ad9 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:34:57 -0500 Subject: [PATCH 02/12] main: Use function reference instead of redundant closure Replace `.map_err(|e| Error::Io(e))` with `.map_err(Error::Io)`. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- src/main.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5cde338..21cee5c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { let mut contents = String::new(); file.read_to_string(&mut contents) - .map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?; + .map_err(ndjson_rpc_fdpass::Error::Io)?; info!("Server read from file: {}", contents.trim()); Ok((Some(Value::String(contents)), Vec::new())) @@ -85,8 +85,7 @@ async fn run_client(socket_path: PathBuf) -> Result<()> { let mut client = Client::connect(&socket_path).await?; // Create a temporary file to send to the server - let mut temp_file = - tempfile::NamedTempFile::new().map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?; + let mut temp_file = tempfile::NamedTempFile::new().map_err(ndjson_rpc_fdpass::Error::Io)?; write!(temp_file, "Hello from client file!").unwrap(); temp_file.flush().unwrap(); From ea10d1a192b412f234149c82347918c90134507f Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:36:53 -0500 Subject: [PATCH 03/12] message: Consolidate count_placeholders into shared function Extract count_fd_placeholders() as a public function and remove the duplicate implementations from MessageWithFds and Receiver. This also fixes double JSON parsing in Receiver::try_parse_message - previously the JSON was parsed once to count placeholders and again to create the message. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- src/message.rs | 53 ++++++++++++++++++++++++++---------------------- src/transport.rs | 46 +++++------------------------------------ 2 files changed, 34 insertions(+), 65 deletions(-) diff --git a/src/message.rs b/src/message.rs index 9632f09..e5fa5cc 100644 --- a/src/message.rs +++ b/src/message.rs @@ -10,6 +10,34 @@ pub const FD_INDEX_KEY: &str = "index"; /// The JSON-RPC protocol version. pub const JSONRPC_VERSION: &str = "2.0"; +/// Count file descriptor placeholders in a JSON value. +pub fn count_fd_placeholders(value: &serde_json::Value) -> usize { + fn count_inner(value: &serde_json::Value, count: &mut usize) { + match value { + serde_json::Value::Object(map) => { + if let (Some(serde_json::Value::Bool(true)), Some(_)) = + (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) + { + *count += 1; + } else { + for v in map.values() { + count_inner(v, count); + } + } + } + serde_json::Value::Array(arr) => { + for v in arr { + count_inner(v, count); + } + } + _ => {} + } + } + let mut count = 0; + count_inner(value, &mut count); + count +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileDescriptorPlaceholder { #[serde(rename = "__jsonrpc_fd__")] @@ -202,8 +230,7 @@ impl MessageWithFds { pub fn from_json_with_fds(json_str: &str, fds: Vec) -> Result { let message_json: serde_json::Value = serde_json::from_str(json_str)?; - let mut placeholder_count = 0; - Self::count_placeholders(&message_json, &mut placeholder_count); + let placeholder_count = count_fd_placeholders(&message_json); if placeholder_count != fds.len() { return Err(Error::MismatchedCount { @@ -222,28 +249,6 @@ impl MessageWithFds { Ok(Self::new(message, fds)) } - fn count_placeholders(value: &serde_json::Value, count: &mut usize) { - match value { - serde_json::Value::Object(map) => { - if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) - { - *count += 1; - } else { - for v in map.values() { - Self::count_placeholders(v, count); - } - } - } - serde_json::Value::Array(arr) => { - for v in arr { - Self::count_placeholders(v, count); - } - } - _ => {} - } - } - fn validate_placeholder_indices( value: &serde_json::Value, expected_count: usize, diff --git a/src/transport.rs b/src/transport.rs index 96ad26f..65f2811 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,5 +1,5 @@ use crate::error::{Error, Result}; -use crate::message::{JsonRpcMessage, MessageWithFds, FD_INDEX_KEY, FD_PLACEHOLDER_KEY}; +use crate::message::{count_fd_placeholders, JsonRpcMessage, MessageWithFds}; use rustix::fd::AsFd; use rustix::net::{ RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags, SendAncillaryBuffer, @@ -113,8 +113,9 @@ impl Receiver { trace!("Parsing message: {}", message_str); - let mut placeholder_count = 0; - Self::count_placeholders_in_str(message_str, &mut placeholder_count)?; + // Parse JSON once and reuse for both counting and message creation + let value: serde_json::Value = serde_json::from_str(message_str)?; + let placeholder_count = count_fd_placeholders(&value); if placeholder_count > self.fd_queue.len() { return Err(Error::MismatchedCount { @@ -127,50 +128,13 @@ impl Receiver { .map(|_| self.fd_queue.pop_front().unwrap()) .collect(); - if placeholder_count == 0 && !fds.is_empty() { - return Err(Error::DanglingFileDescriptors); - } - - let message = self.parse_json_message(message_str)?; + let message = JsonRpcMessage::from_json_value(value)?; Ok(Some(MessageWithFds::new(message, fds))) } else { Ok(None) } } - fn parse_json_message(&self, json_str: &str) -> Result { - let value: serde_json::Value = serde_json::from_str(json_str)?; - JsonRpcMessage::from_json_value(value) - } - - fn count_placeholders_in_str(json_str: &str, count: &mut usize) -> Result<()> { - let value: serde_json::Value = serde_json::from_str(json_str)?; - Self::count_placeholders(&value, count); - Ok(()) - } - - fn count_placeholders(value: &serde_json::Value, count: &mut usize) { - match value { - serde_json::Value::Object(map) => { - if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) - { - *count += 1; - } else { - for v in map.values() { - Self::count_placeholders(v, count); - } - } - } - serde_json::Value::Array(arr) => { - for v in arr { - Self::count_placeholders(v, count); - } - } - _ => {} - } - } - async fn read_more_data(&mut self) -> Result<()> { let fd = Arc::clone(&self.fd); From 84aa57894e8df356155eae53ef90287723e9bc88 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:37:54 -0500 Subject: [PATCH 04/12] build: Move tempfile to dev-dependencies The demo binary was using tempfile, so move it to examples/demo.rs where dev-dependencies are available. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- Cargo.toml | 4 +++- src/main.rs => examples/demo.rs | 0 2 files changed, 3 insertions(+), 1 deletion(-) rename src/main.rs => examples/demo.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index c20519a..970c0c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,5 +15,7 @@ thiserror = "1.0" tokio = { version = "1.40", features = ["full"] } tracing = "0.1" tracing-subscriber = "0.3" -tempfile = "3.0" jsonrpsee = { version = "0.24", features = ["server", "client-core"], default-features = false } + +[dev-dependencies] +tempfile = "3.0" diff --git a/src/main.rs b/examples/demo.rs similarity index 100% rename from src/main.rs rename to examples/demo.rs From 68181f19a1183d573ac433bf8242b1c264b7959f Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:39:00 -0500 Subject: [PATCH 05/12] transport: Extract magic numbers to named constants Add MAX_FDS_PER_MESSAGE and READ_BUFFER_SIZE constants to clarify the meaning of the hardcoded values. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- src/transport.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/transport.rs b/src/transport.rs index 65f2811..f0076f4 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -12,6 +12,11 @@ use std::sync::{Arc, Mutex}; use tokio::net::UnixStream as TokioUnixStream; use tracing::{debug, trace}; +/// Maximum number of file descriptors per message. +const MAX_FDS_PER_MESSAGE: usize = 8; +/// Read buffer size for incoming data. +const READ_BUFFER_SIZE: usize = 4096; + pub struct UnixSocketTransport { fd: OwnedFd, } @@ -68,7 +73,8 @@ impl Sender { // Convert OwnedFd to BorrowedFd for sending let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect(); - let mut buffer: Vec = vec![0u8; rustix::cmsg_space!(ScmRights(8))]; + let mut buffer: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice()); if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) { @@ -140,9 +146,10 @@ impl Receiver { let (bytes_read, data, fds) = tokio::task::spawn_blocking(move || { let sockfd = fd.lock().unwrap(); - let mut data_buffer = [0u8; 4096]; + let mut data_buffer = [0u8; READ_BUFFER_SIZE]; let mut iov = [IoSliceMut::new(&mut data_buffer)]; - let mut cmsg_space: Vec = vec![0u8; rustix::cmsg_space!(ScmRights(8))]; + let mut cmsg_space: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; let mut cmsg_buffer = RecvAncillaryBuffer::new(cmsg_space.as_mut_slice()); let result = rustix::net::recvmsg( From c3dbbf70374c73f4c266429262444a5d3df93260 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:40:59 -0500 Subject: [PATCH 06/12] transport: Return Result from UnixSocketTransport::new The into_std() call can fail if setting blocking mode fails. Return a Result instead of panicking. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- examples/demo.rs | 2 +- src/client.rs | 2 +- src/server.rs | 2 +- src/transport.rs | 8 ++--- tests/integration_tests.rs | 66 +++++++++++++++++++------------------- 5 files changed, 40 insertions(+), 40 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index 21cee5c..f76348c 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -63,7 +63,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { // Accept one connection and handle it if let Ok((stream, _)) = listener.accept().await { - let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream); + let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream)?; let (mut sender, mut receiver) = transport.split(); // Handle messages from this connection diff --git a/src/client.rs b/src/client.rs index a331c84..12f4d58 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,7 +15,7 @@ pub struct Client { impl Client { pub async fn connect>(path: P) -> Result { let stream = UnixStream::connect(path).await?; - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream)?; let (sender, _receiver) = transport.split(); Ok(Self { sender, next_id: 1 }) diff --git a/src/server.rs b/src/server.rs index b625428..83ec03d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -68,7 +68,7 @@ impl Server { } async fn handle_connection(&self, stream: UnixStream) -> Result<()> { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream)?; let (mut sender, mut receiver) = transport.split(); debug!("New connection established"); diff --git a/src/transport.rs b/src/transport.rs index f0076f4..1c32418 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -22,10 +22,10 @@ pub struct UnixSocketTransport { } impl UnixSocketTransport { - pub fn new(stream: TokioUnixStream) -> Self { - // Convert tokio stream to OwnedFd - let fd = stream.into_std().unwrap().into(); - Self { fd } + pub fn new(stream: TokioUnixStream) -> Result { + // Convert tokio stream to OwnedFd. This sets the socket to blocking mode. + let fd = stream.into_std().map_err(Error::Io)?.into(); + Ok(Self { fd }) } pub fn split(self) -> (Sender, Receiver) { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71b25d7..c8c0bfc 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -75,7 +75,7 @@ async fn test_client_server_communication() -> Result<()> { server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -88,7 +88,7 @@ async fn test_client_server_communication() -> Result<()> { // Connect and send request (no race condition) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let request = JsonRpcRequest::new( @@ -154,7 +154,7 @@ async fn test_file_descriptor_passing() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -167,7 +167,7 @@ async fn test_file_descriptor_passing() -> Result<()> { // Connect and send file descriptor (no race condition) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with file descriptor placeholder @@ -253,7 +253,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process multiple messages sequentially @@ -269,7 +269,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { // Connect and send multiple messages with file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Send multiple messages sequentially @@ -349,7 +349,7 @@ async fn test_multiple_fds_single_message() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -362,7 +362,7 @@ async fn test_multiple_fds_single_message() -> Result<()> { // Connect and send message with multiple file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with multiple file descriptor placeholders @@ -443,7 +443,7 @@ async fn test_mixed_messages_with_and_without_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process multiple mixed messages @@ -462,7 +462,7 @@ async fn test_mixed_messages_with_and_without_fds() -> Result<()> { // Connect and send mixed messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // 1. Echo message (no FD) @@ -567,7 +567,7 @@ async fn test_large_number_of_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -580,7 +580,7 @@ async fn test_large_number_of_fds() -> Result<()> { // Connect and send message with many file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with many file descriptor placeholders @@ -654,7 +654,7 @@ async fn test_zero_byte_files_with_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -667,7 +667,7 @@ async fn test_zero_byte_files_with_fds() -> Result<()> { // Connect and send message with empty file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let params = serde_json::json!({ @@ -748,7 +748,7 @@ async fn test_fd_placeholder_index_ordering() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -761,7 +761,7 @@ async fn test_fd_placeholder_index_ordering() -> Result<()> { // Connect and send message with FDs in specific order let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with placeholders in non-sequential order to test protocol @@ -831,7 +831,7 @@ async fn test_rapid_message_bursts() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process burst of messages @@ -847,7 +847,7 @@ async fn test_rapid_message_bursts() -> Result<()> { // Connect and send burst of messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Send 20 messages in rapid succession, some with FDs, some without @@ -948,7 +948,7 @@ async fn test_interleaved_requests_responses_notifications() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process interleaved messages @@ -964,7 +964,7 @@ async fn test_interleaved_requests_responses_notifications() -> Result<()> { // Connect and send interleaved messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // 1. Request with FD @@ -1072,7 +1072,7 @@ async fn test_invalid_json_framing_error() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); // Expect error when receiving invalid JSON @@ -1126,7 +1126,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Expect error when processing message with FD count mismatch @@ -1206,7 +1206,7 @@ async fn test_invalid_placeholder_indices() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Expect error when processing message with invalid placeholder indices @@ -1228,7 +1228,7 @@ async fn test_invalid_placeholder_indices() -> Result<()> { // Connect and send message with invalid placeholder indices (non-dense range) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with invalid placeholder indices (0, 2, 4 instead of 0, 1, 2) @@ -1293,7 +1293,7 @@ async fn test_duplicate_placeholder_indices() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); match receiver.receive().await { @@ -1314,7 +1314,7 @@ async fn test_duplicate_placeholder_indices() -> Result<()> { // Connect and send message with duplicate placeholder indices let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with duplicate placeholder indices @@ -1380,7 +1380,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); match receiver.receive().await { @@ -1401,7 +1401,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { // Connect and send message with FDs but no placeholders let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with NO file descriptor placeholders @@ -1446,7 +1446,7 @@ async fn test_connection_drop_with_pending_fds() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); // Try to receive but client will drop connection @@ -1466,7 +1466,7 @@ async fn test_connection_drop_with_pending_fds() -> Result<()> { // Connect and then immediately drop connection { let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let params = serde_json::json!({ @@ -1540,7 +1540,7 @@ async fn test_large_message_with_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -1553,7 +1553,7 @@ async fn test_large_message_with_fds() -> Result<()> { // Connect and send large message with FD let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with large data and FD placeholder @@ -1591,7 +1591,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); match receiver.receive().await { From 1490ba18b1b41be9fd27c68a742743117efef5e3 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 07:55:41 -0500 Subject: [PATCH 07/12] transport: Use async_io instead of spawn_blocking for socket I/O The previous implementation converted the tokio UnixStream to a blocking OwnedFd and used spawn_blocking for sendmsg/recvmsg calls. This had several issues: - Moved I/O work off the async runtime to the blocking thread pool - Did not integrate with tokio's event loop for readiness notifications - Could exhaust the blocking thread pool under high load Now use TokioUnixStream::async_io() which properly integrates with tokio's reactor. The closure returns WouldBlock when the socket isn't ready, and tokio handles the async waiting. This also simplifies the code by removing the Mutex wrapper since async_io only requires &self. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- src/transport.rs | 154 +++++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 72 deletions(-) diff --git a/src/transport.rs b/src/transport.rs index 1c32418..6c08ec7 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -6,9 +6,10 @@ use rustix::net::{ SendAncillaryMessage, SendFlags, }; use std::collections::VecDeque; -use std::io::{IoSlice, IoSliceMut}; +use std::io::{self, IoSlice, IoSliceMut}; use std::os::unix::io::OwnedFd; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::io::Interest; use tokio::net::UnixStream as TokioUnixStream; use tracing::{debug, trace}; @@ -18,25 +19,23 @@ const MAX_FDS_PER_MESSAGE: usize = 8; const READ_BUFFER_SIZE: usize = 4096; pub struct UnixSocketTransport { - fd: OwnedFd, + stream: TokioUnixStream, } impl UnixSocketTransport { pub fn new(stream: TokioUnixStream) -> Result { - // Convert tokio stream to OwnedFd. This sets the socket to blocking mode. - let fd = stream.into_std().map_err(Error::Io)?.into(); - Ok(Self { fd }) + Ok(Self { stream }) } pub fn split(self) -> (Sender, Receiver) { - let fd = Arc::new(Mutex::new(self.fd)); + let stream = Arc::new(self.stream); ( Sender { - fd: Arc::clone(&fd), + stream: Arc::clone(&stream), }, Receiver { - fd: Arc::clone(&fd), + stream, buffer: Vec::new(), fd_queue: VecDeque::new(), }, @@ -45,7 +44,7 @@ impl UnixSocketTransport { } pub struct Sender { - fd: Arc>, + stream: Arc, } impl Sender { @@ -59,43 +58,43 @@ impl Sender { message_with_fds.file_descriptors.len() ); - let fd = Arc::clone(&self.fd); let fds = message_with_fds.file_descriptors; - tokio::task::spawn_blocking(move || { - let sockfd = fd.lock().unwrap(); - - if fds.is_empty() { - // No file descriptors to send - use regular send - rustix::net::send(&*sockfd, &data, SendFlags::empty()) - .map_err(|e| Error::SystemCall(format!("send failed: {}", e)))?; - } else { - // Convert OwnedFd to BorrowedFd for sending - let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect(); - - let mut buffer: Vec = - vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; - let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice()); - - if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) { - return Err(Error::SystemCall( - "Failed to add file descriptors to control message".to_string(), - )); + self.stream + .async_io(Interest::WRITABLE, || { + let sockfd = self.stream.as_fd(); + + if fds.is_empty() { + // No file descriptors to send - use regular send + rustix::net::send(sockfd, &data, SendFlags::empty()) + .map_err(|e| to_io_error(e, "send"))?; + } else { + // Convert OwnedFd to BorrowedFd for sending + let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect(); + + let mut buffer: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; + let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice()); + + if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) { + return Err(io::Error::other( + "Failed to add file descriptors to control message", + )); + } + + let iov = [IoSlice::new(&data)]; + rustix::net::sendmsg(sockfd, &iov, &mut control, SendFlags::empty()) + .map_err(|e| to_io_error(e, "sendmsg"))?; } - - let iov = [IoSlice::new(&data)]; - rustix::net::sendmsg(&*sockfd, &iov, &mut control, SendFlags::empty()) - .map_err(|e| Error::SystemCall(format!("sendmsg failed: {}", e)))?; - } - Ok(()) - }) - .await - .map_err(|e| Error::SystemCall(format!("Task join error: {}", e)))? + Ok(()) + }) + .await + .map_err(Error::Io) } } pub struct Receiver { - fd: Arc>, + stream: Arc, buffer: Vec, fd_queue: VecDeque, } @@ -142,45 +141,45 @@ impl Receiver { } async fn read_more_data(&mut self) -> Result<()> { - let fd = Arc::clone(&self.fd); - - let (bytes_read, data, fds) = tokio::task::spawn_blocking(move || { - let sockfd = fd.lock().unwrap(); - let mut data_buffer = [0u8; READ_BUFFER_SIZE]; - let mut iov = [IoSliceMut::new(&mut data_buffer)]; - let mut cmsg_space: Vec = - vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; - let mut cmsg_buffer = RecvAncillaryBuffer::new(cmsg_space.as_mut_slice()); - - let result = rustix::net::recvmsg( - &*sockfd, - &mut iov, - &mut cmsg_buffer, - RecvFlags::CMSG_CLOEXEC, - ) - .map_err(|e| Error::SystemCall(format!("recvmsg failed: {}", e)))?; - - let bytes_read = result.bytes; - let mut fds = Vec::new(); - - // Extract file descriptors from control messages - for msg in cmsg_buffer.drain() { - if let RecvAncillaryMessage::ScmRights(received_fds) = msg { - fds.extend(received_fds); + let mut data_buffer = [0u8; READ_BUFFER_SIZE]; + let mut received_fds: Vec = Vec::new(); + + let bytes_read = self + .stream + .async_io(Interest::READABLE, || { + let sockfd = self.stream.as_fd(); + + let mut iov = [IoSliceMut::new(&mut data_buffer)]; + let mut cmsg_space: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; + let mut cmsg_buffer = RecvAncillaryBuffer::new(cmsg_space.as_mut_slice()); + + let result = rustix::net::recvmsg( + sockfd, + &mut iov, + &mut cmsg_buffer, + RecvFlags::CMSG_CLOEXEC, + ) + .map_err(|e| to_io_error(e, "recvmsg"))?; + + // Extract file descriptors from control messages + for msg in cmsg_buffer.drain() { + if let RecvAncillaryMessage::ScmRights(fds) = msg { + received_fds.extend(fds); + } } - } - Ok::<_, Error>((bytes_read, data_buffer, fds)) - }) - .await - .map_err(|e| Error::SystemCall(format!("Task join error: {}", e)))??; + Ok(result.bytes) + }) + .await + .map_err(Error::Io)?; if bytes_read == 0 { return Err(Error::ConnectionClosed); } - self.buffer.extend_from_slice(&data[..bytes_read]); - self.fd_queue.extend(fds); + self.buffer.extend_from_slice(&data_buffer[..bytes_read]); + self.fd_queue.extend(received_fds); debug!( "Read {} bytes, {} FDs in queue", @@ -190,3 +189,14 @@ impl Receiver { Ok(()) } } + +/// Convert a rustix error to an io::Error, preserving EAGAIN/EWOULDBLOCK for async_io +fn to_io_error(e: rustix::io::Errno, operation: &str) -> io::Error { + // rustix::io::Errno can be converted to io::Error, which preserves the error kind + let io_err: io::Error = e.into(); + if io_err.kind() == io::ErrorKind::WouldBlock { + io_err + } else { + io::Error::new(io_err.kind(), format!("{} failed: {}", operation, io_err)) + } +} From d34e69a4d9fa17c7289814147f1f8c95b10efa6b Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 09:55:03 -0500 Subject: [PATCH 08/12] Rename crate to jsonrpc-fdpass The NDJSON framing is implicit per the JSON-RPC 2.0 transport specification, so there's no need to call it out explicitly in the crate name. Assisted-by: OpenCode (Opus 4.5) Signed-off-by: Colin Walters --- Cargo.toml | 4 ++-- README.md | 6 +++++- examples/demo.rs | 10 ++++----- src/lib.rs | 14 ++++++------ tests/integration_tests.rs | 44 +++++++++++++++++++------------------- 5 files changed, 41 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 970c0c9..23e31fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] -name = "ndjson-rpc-fdpass" +name = "jsonrpc-fdpass" version = "0.1.0" edition = "2021" -description = "NDJSON JSON-RPC 2.0 with File Descriptor Passing implementation" +description = "JSON-RPC 2.0 with Unix file descriptor passing" authors = ["Colin Walters "] license = "MIT OR Apache-2.0" repository = "https://github.com/cgwalters/spec-json-rpc-fdpass" diff --git a/README.md b/README.md index 62d76d5..12f451a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ -# A Specification for NDJSON JSON-RPC 2.0 with File Descriptor Passing (NDJSON-RPC-FD) +# JSON-RPC 2.0 with Unix File Descriptor Passing + +This repository contains both a protocol specification and a Rust implementation +(`jsonrpc-fdpass` crate) for JSON-RPC 2.0 with file descriptor passing over Unix +domain sockets. ## 1. Overview diff --git a/examples/demo.rs b/examples/demo.rs index f76348c..72334e3 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -1,4 +1,4 @@ -use ndjson_rpc_fdpass::{Client, Result, Server}; +use jsonrpc_fdpass::{Client, Result, Server}; use serde_json::Value; use std::fs::File; use std::io::{Read, Write}; @@ -33,7 +33,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { // Register a method that reads from a file descriptor server.register_method("read_file", |_method, _params, fds| { if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -43,7 +43,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { let mut contents = String::new(); file.read_to_string(&mut contents) - .map_err(ndjson_rpc_fdpass::Error::Io)?; + .map_err(jsonrpc_fdpass::Error::Io)?; info!("Server read from file: {}", contents.trim()); Ok((Some(Value::String(contents)), Vec::new())) @@ -63,7 +63,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { // Accept one connection and handle it if let Ok((stream, _)) = listener.accept().await { - let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream)?; + let transport = jsonrpc_fdpass::UnixSocketTransport::new(stream)?; let (mut sender, mut receiver) = transport.split(); // Handle messages from this connection @@ -85,7 +85,7 @@ async fn run_client(socket_path: PathBuf) -> Result<()> { let mut client = Client::connect(&socket_path).await?; // Create a temporary file to send to the server - let mut temp_file = tempfile::NamedTempFile::new().map_err(ndjson_rpc_fdpass::Error::Io)?; + let mut temp_file = tempfile::NamedTempFile::new().map_err(jsonrpc_fdpass::Error::Io)?; write!(temp_file, "Hello from client file!").unwrap(); temp_file.flush().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 446eb5e..27cebc2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ -//! # NDJSON JSON-RPC 2.0 with File Descriptor Passing +//! # JSON-RPC 2.0 with Unix File Descriptor Passing //! -//! This crate provides an implementation of the NDJSON JSON-RPC 2.0 with File Descriptor Passing -//! specification. It enables reliable inter-process communication (IPC) over Unix domain sockets -//! with the ability to pass file descriptors alongside JSON-RPC messages. +//! This crate provides an implementation of JSON-RPC 2.0 with file descriptor passing over Unix +//! domain sockets. It enables reliable inter-process communication (IPC) with the ability to +//! pass file descriptors alongside JSON-RPC messages. //! //! ## Features //! @@ -17,7 +17,7 @@ //! ### Server Example //! //! ```rust,no_run -//! use ndjson_rpc_fdpass::{Server, Result}; +//! use jsonrpc_fdpass::{Server, Result}; //! use std::fs::File; //! use serde_json::Value; //! @@ -33,7 +33,7 @@ //! file.read_to_string(&mut contents).unwrap(); //! Ok((Some(Value::String(contents)), Vec::new())) //! } else { -//! Err(ndjson_rpc_fdpass::Error::InvalidMessage("No FD provided".into())) +//! Err(jsonrpc_fdpass::Error::InvalidMessage("No FD provided".into())) //! } //! }); //! @@ -44,7 +44,7 @@ //! ### Client Example //! //! ```rust,no_run -//! use ndjson_rpc_fdpass::{Client, Result}; +//! use jsonrpc_fdpass::{Client, Result}; //! use std::fs::File; //! use std::os::unix::io::OwnedFd; //! use serde_json::json; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c8c0bfc..edbb052 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,4 +1,4 @@ -use ndjson_rpc_fdpass::{ +use jsonrpc_fdpass::{ JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, MessageWithFds, Result, Server, UnixSocketTransport, }; @@ -83,7 +83,7 @@ async fn test_client_server_communication() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send request (no race condition) @@ -134,7 +134,7 @@ async fn test_file_descriptor_passing() -> Result<()> { server.register_method("read_file", move |_method, _params, fds| { if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -162,7 +162,7 @@ async fn test_file_descriptor_passing() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send file descriptor (no race condition) @@ -224,7 +224,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { *count += 1; if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -264,7 +264,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send multiple messages with file descriptors @@ -357,7 +357,7 @@ async fn test_multiple_fds_single_message() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with multiple file descriptors @@ -457,7 +457,7 @@ async fn test_mixed_messages_with_and_without_fds() -> Result<()> { assert_eq!(message_count, 4); } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send mixed messages @@ -575,7 +575,7 @@ async fn test_large_number_of_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with many file descriptors @@ -662,7 +662,7 @@ async fn test_zero_byte_files_with_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with empty file descriptors @@ -756,7 +756,7 @@ async fn test_fd_placeholder_index_ordering() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with FDs in specific order @@ -842,7 +842,7 @@ async fn test_rapid_message_bursts() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send burst of messages @@ -959,7 +959,7 @@ async fn test_interleaved_requests_responses_notifications() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send interleaved messages @@ -1085,7 +1085,7 @@ async fn test_invalid_json_framing_error() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send invalid JSON @@ -1144,7 +1144,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message that claims 2 FDs but only provides 1 @@ -1223,7 +1223,7 @@ async fn test_invalid_placeholder_indices() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with invalid placeholder indices (non-dense range) @@ -1309,7 +1309,7 @@ async fn test_duplicate_placeholder_indices() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with duplicate placeholder indices @@ -1369,7 +1369,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { // According to spec, this is non-compliant but may be handled // The FDs should be cleaned up if !fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Received FDs but no placeholders in message".to_string(), )); } @@ -1396,7 +1396,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with FDs but no placeholders @@ -1460,7 +1460,7 @@ async fn test_connection_drop_with_pending_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and then immediately drop connection @@ -1548,7 +1548,7 @@ async fn test_large_message_with_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send large message with FD @@ -1604,7 +1604,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with malformed placeholder From baf279d426e34e9f99f07d940fdf6e5a6333e2db Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 10:32:32 -0500 Subject: [PATCH 09/12] transport: Use streaming JSON parsing instead of newline delimiters JSON-RPC doesn't require newline delimiters - JSON is self-delimiting. Replace NDJSON framing with serde_json's StreamDeserializer to parse message boundaries directly from the JSON structure. This makes the protocol a cleaner extension of standard JSON-RPC rather than imposing an unnecessary framing constraint. Assisted-by: OpenCode (Sonnet 4) Signed-off-by: Colin Walters --- src/lib.rs | 14 ++++---- src/message.rs | 2 +- src/transport.rs | 69 +++++++++++++++++++++++++------------- tests/integration_tests.rs | 5 ++- 4 files changed, 56 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 27cebc2..bdc7705 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ //! //! - **JSON-RPC 2.0 compliance**: Full support for requests, responses, and notifications //! - **File descriptor passing**: Pass file descriptors using Unix socket ancillary data -//! - **NDJSON framing**: Newline-delimited JSON for reliable message boundaries +//! - **Streaming JSON parsing**: Self-delimiting JSON messages without newline requirements //! - **Async support**: Built on tokio for high-performance async I/O //! - **Type-safe**: Rust's type system ensures correct message handling //! @@ -71,13 +71,15 @@ //! //! ## Protocol Details //! -//! This implementation follows the NDJSON JSON-RPC with File Descriptor Passing specification: +//! This implementation is a minimal extension to JSON-RPC 2.0 that adds file descriptor +//! passing over Unix domain sockets: //! //! - Uses Unix domain sockets (SOCK_STREAM) -//! - Messages are framed using newline-delimited JSON (NDJSON) -//! - File descriptors are passed using ancillary data via sendmsg(2)/recvmsg(2) -//! - Each sendmsg() call contains exactly one complete NDJSON message -//! - File descriptors are represented in JSON using placeholder objects +//! - Standard JSON-RPC 2.0 message format with no additional framing requirements +//! - JSON objects are self-delimiting; no newline or length-prefix framing is required +//! - File descriptors are passed as ancillary data via sendmsg(2)/recvmsg(2) +//! - Each sendmsg() call contains exactly one complete JSON-RPC message +//! - File descriptors are represented in JSON using placeholder objects (see below) //! //! ### File Descriptor Placeholders //! diff --git a/src/message.rs b/src/message.rs index e5fa5cc..e8f202a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -176,7 +176,7 @@ impl MessageWithFds { self.insert_placeholders(&mut message_json)?; let json_str = serde_json::to_string(&message_json)?; - Ok(format!("{}\n", json_str)) + Ok(json_str) } fn insert_placeholders(&self, value: &mut serde_json::Value) -> Result<()> { diff --git a/src/transport.rs b/src/transport.rs index 6c08ec7..76df2bc 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -111,32 +111,53 @@ impl Receiver { } fn try_parse_message(&mut self) -> Result> { - if let Some(newline_pos) = self.buffer.iter().position(|&b| b == b'\n') { - let message_bytes = self.buffer.drain(..=newline_pos).collect::>(); - let message_str = std::str::from_utf8(&message_bytes[..message_bytes.len() - 1]) - .map_err(|_| Error::FramingError)?; - - trace!("Parsing message: {}", message_str); - - // Parse JSON once and reuse for both counting and message creation - let value: serde_json::Value = serde_json::from_str(message_str)?; - let placeholder_count = count_fd_placeholders(&value); - - if placeholder_count > self.fd_queue.len() { - return Err(Error::MismatchedCount { - expected: placeholder_count, - found: self.fd_queue.len(), - }); - } + if self.buffer.is_empty() { + return Ok(None); + } + + // Use streaming JSON parser to find message boundaries + let mut stream = + serde_json::Deserializer::from_slice(&self.buffer).into_iter::(); + + match stream.next() { + Some(Ok(value)) => { + // Successfully parsed a complete JSON value + let bytes_consumed = stream.byte_offset(); - let fds: Vec = (0..placeholder_count) - .map(|_| self.fd_queue.pop_front().unwrap()) - .collect(); + trace!("Parsed message ({} bytes): {:?}", bytes_consumed, value); - let message = JsonRpcMessage::from_json_value(value)?; - Ok(Some(MessageWithFds::new(message, fds))) - } else { - Ok(None) + // Drain the consumed bytes from the buffer + self.buffer.drain(..bytes_consumed); + + // Count placeholders and extract FDs + let placeholder_count = count_fd_placeholders(&value); + + if placeholder_count > self.fd_queue.len() { + return Err(Error::MismatchedCount { + expected: placeholder_count, + found: self.fd_queue.len(), + }); + } + + let fds: Vec = (0..placeholder_count) + .map(|_| self.fd_queue.pop_front().unwrap()) + .collect(); + + let message = JsonRpcMessage::from_json_value(value)?; + Ok(Some(MessageWithFds::new(message, fds))) + } + Some(Err(e)) if e.is_eof() => { + // Incomplete JSON - need more data + Ok(None) + } + Some(Err(e)) => { + // Actual parse error + Err(Error::Json(e)) + } + None => { + // No more values (shouldn't happen with non-empty buffer, but handle it) + Ok(None) + } } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index edbb052..725499b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -22,7 +22,6 @@ async fn test_basic_message_serialization() -> Result<()> { let serialized = message_with_fds.serialize_with_placeholders()?; assert!(serialized.contains("test_method")); assert!(serialized.contains("test_param")); - assert!(serialized.ends_with('\n')); Ok(()) } @@ -1165,7 +1164,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { "id": 1 }); - let json_str = serde_json::to_string(&json_with_mismatch).unwrap() + "\n"; + let json_str = serde_json::to_string(&json_with_mismatch).unwrap(); // Send the JSON with ancillary data containing only 1 FD (but JSON expects 2) let _fd: OwnedFd = temp_file.into_file().into(); @@ -1626,7 +1625,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { "id": 1 }); - let json_str = serde_json::to_string(&malformed_json).unwrap() + "\n"; + let json_str = serde_json::to_string(&malformed_json).unwrap(); stream.write_all(json_str.as_bytes()).await.unwrap(); stream.flush().await.unwrap(); From a0e9006c6f34955b913f6083a4932a7c8d32ddcd Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 10:44:09 -0500 Subject: [PATCH 10/12] transport: Add pretty-printing support for JSON messages Add Sender::set_pretty() method to enable human-readable JSON output with indentation and newlines. This is useful for debugging and interoperability with tools that expect formatted JSON. Also add tests verifying the streaming JSON parser correctly handles: - Pretty-printed JSON with embedded newlines - Concatenated compact JSON without separators - Mixed formatting styles Assisted-by: OpenCode (Sonnet 4) --- src/message.rs | 14 ++- src/transport.rs | 17 ++- tests/integration_tests.rs | 251 +++++++++++++++++++++++++++++++++++++ 3 files changed, 280 insertions(+), 2 deletions(-) diff --git a/src/message.rs b/src/message.rs index e8f202a..8ce99ef 100644 --- a/src/message.rs +++ b/src/message.rs @@ -172,10 +172,22 @@ impl MessageWithFds { } pub fn serialize_with_placeholders(&self) -> Result { + self.serialize_with_placeholders_impl(false) + } + + pub fn serialize_with_placeholders_pretty(&self) -> Result { + self.serialize_with_placeholders_impl(true) + } + + fn serialize_with_placeholders_impl(&self, pretty: bool) -> Result { let mut message_json = self.message.to_json_value()?; self.insert_placeholders(&mut message_json)?; - let json_str = serde_json::to_string(&message_json)?; + let json_str = if pretty { + serde_json::to_string_pretty(&message_json)? + } else { + serde_json::to_string(&message_json)? + }; Ok(json_str) } diff --git a/src/transport.rs b/src/transport.rs index 76df2bc..4d6c08a 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -33,6 +33,7 @@ impl UnixSocketTransport { ( Sender { stream: Arc::clone(&stream), + pretty: false, }, Receiver { stream, @@ -45,11 +46,25 @@ impl UnixSocketTransport { pub struct Sender { stream: Arc, + pretty: bool, } impl Sender { + /// Enable or disable pretty-printed JSON output. + /// + /// When enabled, messages are serialized with indentation and newlines. + /// This is useful for debugging or when interoperating with tools that + /// expect human-readable JSON. + pub fn set_pretty(&mut self, pretty: bool) { + self.pretty = pretty; + } + pub async fn send(&mut self, message_with_fds: MessageWithFds) -> Result<()> { - let serialized = message_with_fds.serialize_with_placeholders()?; + let serialized = if self.pretty { + message_with_fds.serialize_with_placeholders_pretty()? + } else { + message_with_fds.serialize_with_placeholders()? + }; let data = serialized.into_bytes(); trace!( diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 725499b..18ed99d 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1634,3 +1634,254 @@ async fn test_malformed_placeholder_structure() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_pretty_printed_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_pretty.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process multiple pretty-printed messages + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Connect and send pretty-printed JSON directly + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Send multiple pretty-printed JSON messages (with embedded newlines) + for i in 1..=3 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { + "message": format!("Pretty message {}", i), + "nested": { + "key": "value", + "number": i + } + }, + "id": i + }); + + // Use pretty printing - this includes newlines within the JSON + let pretty_json = serde_json::to_string_pretty(&msg).unwrap(); + assert!( + pretty_json.contains('\n'), + "Pretty JSON should contain newlines" + ); + + stream.write_all(pretty_json.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + } + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_concatenated_compact_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_concat.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + let count = std::sync::Arc::new(std::sync::Mutex::new(0)); + let count_clone = count.clone(); + + server.register_method("echo", move |_method, params, _fds| { + let mut c = count_clone.lock().unwrap(); + *c += 1; + Ok((params, Vec::new())) + }); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process 3 messages + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + + let final_count = *count.lock().unwrap(); + assert_eq!(final_count, 3, "Should have processed 3 messages"); + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Connect and send concatenated compact JSON (no separators) + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Build concatenated JSON without any separators + let mut concatenated = String::new(); + for i in 1..=3 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { "id": i }, + "id": i + }); + // Compact JSON, no trailing newline + concatenated.push_str(&serde_json::to_string(&msg).unwrap()); + } + + // Verify no newlines in the concatenated string + assert!( + !concatenated.contains('\n'), + "Compact JSON should not contain newlines" + ); + + // Send all at once + stream.write_all(concatenated.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + + // Give server time to process + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_mixed_pretty_and_compact_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_mixed_format.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process 4 messages with mixed formatting + for _ in 0..4 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Alternate between pretty and compact formatting + for i in 1..=4 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { "iteration": i }, + "id": i + }); + + let json_str = if i % 2 == 0 { + serde_json::to_string_pretty(&msg).unwrap() + } else { + serde_json::to_string(&msg).unwrap() + }; + + stream.write_all(json_str.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + } + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_sender_pretty_mode() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_sender_pretty.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process messages sent with pretty mode enabled + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, _receiver) = transport.split(); + + // Enable pretty printing via the official API + sender.set_pretty(true); + + for i in 1..=3 { + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(serde_json::json!({ + "message": format!("Pretty mode message {}", i), + "nested": { "key": "value" } + })), + Value::Number(i.into()), + ); + let message = JsonRpcMessage::Request(request); + let message_with_fds = MessageWithFds::new(message, vec![]); + + sender.send(message_with_fds).await?; + } + + // Clean up + server_handle.abort(); + + Ok(()) +} From 72cbca72f5ac1ac6ce4512fcc49b7d3621f25ff4 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 10:56:34 -0500 Subject: [PATCH 11/12] tests: Add jsonrpsee interoperability tests Add integration tests that verify wire-format compatibility with jsonrpsee when file descriptors are not present. Tests cover: - Round-trip message exchange over Unix socket pairs - Notification handling with proper synchronization - Error response formatting per JSON-RPC 2.0 spec - Sequential request handling - Raw JSON parsing from external sources - String and null ID handling - Array params (positional parameters) Also enable async-client feature for jsonrpsee and add async-trait dev dependency. Assisted-by: OpenCode (Sonnet 4) --- Cargo.toml | 3 +- tests/jsonrpsee_interop.rs | 492 +++++++++++++++++++++++++++++++++++++ 2 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 tests/jsonrpsee_interop.rs diff --git a/Cargo.toml b/Cargo.toml index 23e31fa..27b356e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ thiserror = "1.0" tokio = { version = "1.40", features = ["full"] } tracing = "0.1" tracing-subscriber = "0.3" -jsonrpsee = { version = "0.24", features = ["server", "client-core"], default-features = false } +jsonrpsee = { version = "0.24", features = ["server", "client-core", "async-client"], default-features = false } [dev-dependencies] tempfile = "3.0" +async-trait = "0.1" diff --git a/tests/jsonrpsee_interop.rs b/tests/jsonrpsee_interop.rs new file mode 100644 index 0000000..01d2216 --- /dev/null +++ b/tests/jsonrpsee_interop.rs @@ -0,0 +1,492 @@ +//! Integration tests for interoperability with jsonrpsee. +//! +//! These tests verify that our JSON-RPC implementation can exchange messages +//! with jsonrpsee over Unix socket pairs when file descriptors are not present. +//! +//! The tests focus on wire-format compatibility: messages serialized by one +//! implementation should be correctly parsed by the other. + +use jsonrpc_fdpass::{ + JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, MessageWithFds, Result, + UnixSocketTransport, +}; +use serde_json::Value; +use std::io; +use std::os::unix::net::UnixStream as StdUnixStream; +use tokio::net::UnixStream; + +/// Create a connected pair of Unix streams using socketpair. +fn create_socket_pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = StdUnixStream::pair()?; + a.set_nonblocking(true)?; + b.set_nonblocking(true)?; + Ok((UnixStream::from_std(a)?, UnixStream::from_std(b)?)) +} + +/// Test round-trip: our client -> our server, verifying wire format compatibility. +#[tokio::test] +async fn test_wire_format_round_trip() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle one request + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Use our client transport + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + let params = serde_json::json!({ + "string": "hello", + "number": 42, + "array": [1, 2, 3], + "nested": { "key": "value" } + }); + + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(params.clone()), + Value::Number(1.into()), + ); + let message = JsonRpcMessage::Request(request); + sender.send(MessageWithFds::new(message, vec![])).await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(params)); + assert!(resp.error.is_none()); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test that notifications work correctly (no response expected). +#[tokio::test] +async fn test_notification_no_response() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + let received = std::sync::Arc::new(std::sync::Mutex::new(None)); + let received_clone = received.clone(); + + // Channel to signal when server has processed the notification + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); + + // Start our server - note: notifications need register_notification, not register_method + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_notification("notify_me", move |_method, params, _fds| { + *received_clone.lock().unwrap() = params.clone(); + Ok(()) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle one notification + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + // Signal that we're done processing + let _ = done_tx.send(()); + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Send notification (no id) + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, _receiver) = transport.split(); + + let notification = JsonRpcNotification::new( + "notify_me".to_string(), + Some(serde_json::json!({ "event": "test" })), + ); + let message = JsonRpcMessage::Notification(notification); + sender.send(MessageWithFds::new(message, vec![])).await?; + + // Wait for server to signal completion + done_rx.await.expect("server should signal completion"); + + // Verify notification was received + let received_value = received.lock().unwrap().clone(); + assert_eq!(received_value, Some(serde_json::json!({ "event": "test" }))); + + server_handle.abort(); + Ok(()) +} + +/// Test error responses are correctly formatted per JSON-RPC 2.0 spec. +#[tokio::test] +async fn test_error_response_format() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let server = jsonrpc_fdpass::Server::new(); + // Don't register "unknown_method" - it should return method not found + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + let request = JsonRpcRequest::new("unknown_method".to_string(), None, Value::Number(1.into())); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert!(resp.result.is_none()); + assert!(resp.error.is_some()); + let error = resp.error.unwrap(); + assert_eq!(error.code(), -32601); // Method not found + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test that batch requests work (send multiple messages in sequence). +#[tokio::test] +async fn test_sequential_requests() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("add", |_method, params, _fds| { + let params = params.ok_or_else(|| { + jsonrpc_fdpass::Error::InvalidMessage("missing params".to_string()) + })?; + let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0); + let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0); + Ok((Some(Value::Number((a + b).into())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle multiple requests + for _ in 0..3 { + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send 3 requests and verify responses + for i in 1..=3i64 { + let request = JsonRpcRequest::new( + "add".to_string(), + Some(serde_json::json!({ "a": i, "b": i * 2 })), + Value::Number(i.into()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Number((i * 3).into()))); + assert_eq!(resp.id, Value::Number(i.into())); + } + _ => panic!("Expected response"), + } + } + + server_handle.abort(); + Ok(()) +} + +/// Test that messages serialized in jsonrpsee format can be parsed by our receiver. +/// This simulates receiving messages from a jsonrpsee client by writing raw JSON. +#[tokio::test] +async fn test_parse_jsonrpsee_format_request() -> Result<()> { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let (mut client_stream, server_stream) = create_socket_pair().unwrap(); + + // Spawn server to receive and echo + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("test", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Write raw JSON in jsonrpsee format (what jsonrpsee would send) + // jsonrpsee typically sends compact JSON + let jsonrpsee_request = serde_json::json!({ + "jsonrpc": "2.0", + "method": "test", + "params": { "key": "value" }, + "id": 1 + }); + let request_str = serde_json::to_string(&jsonrpsee_request).unwrap(); + client_stream + .write_all(request_str.as_bytes()) + .await + .unwrap(); + client_stream.flush().await.unwrap(); + + // Read raw response + let mut response_buf = vec![0u8; 4096]; + let n = client_stream.read(&mut response_buf).await.unwrap(); + let response_str = String::from_utf8_lossy(&response_buf[..n]); + + // Parse and verify response + let response: serde_json::Value = serde_json::from_str(&response_str).unwrap(); + assert_eq!( + response.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + response.get("result"), + Some(&serde_json::json!({ "key": "value" })) + ); + assert_eq!(response.get("id"), Some(&Value::Number(1.into()))); + + server_handle.abort(); + Ok(()) +} + +/// Test that our serialized messages can be parsed as valid JSON-RPC 2.0. +#[tokio::test] +async fn test_our_format_is_valid_jsonrpc() -> Result<()> { + // Create various message types and verify they serialize to valid JSON-RPC 2.0 + let request = JsonRpcRequest::new( + "method".to_string(), + Some(serde_json::json!({"param": "value"})), + Value::Number(1.into()), + ); + let msg = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]); + let serialized = msg.serialize_with_placeholders()?; + + // Parse and validate structure + let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert_eq!( + parsed.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + parsed.get("method"), + Some(&Value::String("method".to_string())) + ); + assert!(parsed.get("id").is_some()); + assert!(parsed.get("params").is_some()); + + // Test notification (no id) + let notification = + JsonRpcNotification::new("notify".to_string(), Some(serde_json::json!({"event": 1}))); + let msg = MessageWithFds::new(JsonRpcMessage::Notification(notification), vec![]); + let serialized = msg.serialize_with_placeholders()?; + + let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert_eq!( + parsed.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + parsed.get("method"), + Some(&Value::String("notify".to_string())) + ); + assert!(parsed.get("id").is_none()); // Notifications have no id + + Ok(()) +} + +/// Test string ID handling (JSON-RPC allows string or number IDs). +#[tokio::test] +async fn test_string_id_handling() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Use string ID + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(serde_json::json!({"test": true})), + Value::String("request-uuid-123".to_string()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.id, Value::String("request-uuid-123".to_string())); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test null params handling. +#[tokio::test] +async fn test_null_params() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("no_params", |_method, params, _fds| { + // params should be None + Ok((Some(Value::Bool(params.is_none())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send request with no params + let request = JsonRpcRequest::new("no_params".to_string(), None, Value::Number(1.into())); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Bool(true))); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test array params (positional parameters as per JSON-RPC 2.0 spec). +#[tokio::test] +async fn test_array_params() -> Result<()> { + let (client_stream, server_stream) = create_socket_pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("sum", |_method, params, _fds| { + let params = params.ok_or_else(|| { + jsonrpc_fdpass::Error::InvalidMessage("missing params".to_string()) + })?; + let sum: i64 = params + .as_array() + .map(|arr| arr.iter().filter_map(|v| v.as_i64()).sum()) + .unwrap_or(0); + Ok((Some(Value::Number(sum.into())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send request with array params (positional) + let request = JsonRpcRequest::new( + "sum".to_string(), + Some(serde_json::json!([1, 2, 3, 4, 5])), + Value::Number(1.into()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Number(15.into()))); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} From a6ac3d99b1a2791c6bc32c51a49e1510360dc2c8 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 Jan 2026 11:03:02 -0500 Subject: [PATCH 12/12] tests: Use tokio UnixStream::pair() directly Simplify socket pair creation by using tokio's UnixStream::pair() instead of manually creating std sockets, setting non-blocking mode, and converting to tokio. The tokio method handles all of this internally. --- tests/jsonrpsee_interop.rs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/tests/jsonrpsee_interop.rs b/tests/jsonrpsee_interop.rs index 01d2216..1c1005f 100644 --- a/tests/jsonrpsee_interop.rs +++ b/tests/jsonrpsee_interop.rs @@ -11,22 +11,12 @@ use jsonrpc_fdpass::{ UnixSocketTransport, }; use serde_json::Value; -use std::io; -use std::os::unix::net::UnixStream as StdUnixStream; use tokio::net::UnixStream; -/// Create a connected pair of Unix streams using socketpair. -fn create_socket_pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = StdUnixStream::pair()?; - a.set_nonblocking(true)?; - b.set_nonblocking(true)?; - Ok((UnixStream::from_std(a)?, UnixStream::from_std(b)?)) -} - /// Test round-trip: our client -> our server, verifying wire format compatibility. #[tokio::test] async fn test_wire_format_round_trip() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); // Start our server let server_handle = tokio::spawn(async move { @@ -79,7 +69,7 @@ async fn test_wire_format_round_trip() -> Result<()> { /// Test that notifications work correctly (no response expected). #[tokio::test] async fn test_notification_no_response() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); let received = std::sync::Arc::new(std::sync::Mutex::new(None)); let received_clone = received.clone(); @@ -134,7 +124,7 @@ async fn test_notification_no_response() -> Result<()> { /// Test error responses are correctly formatted per JSON-RPC 2.0 spec. #[tokio::test] async fn test_error_response_format() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); // Start our server let server_handle = tokio::spawn(async move { @@ -180,7 +170,7 @@ async fn test_error_response_format() -> Result<()> { /// Test that batch requests work (send multiple messages in sequence). #[tokio::test] async fn test_sequential_requests() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); // Start our server let server_handle = tokio::spawn(async move { @@ -244,7 +234,7 @@ async fn test_sequential_requests() -> Result<()> { async fn test_parse_jsonrpsee_format_request() -> Result<()> { use tokio::io::{AsyncReadExt, AsyncWriteExt}; - let (mut client_stream, server_stream) = create_socket_pair().unwrap(); + let (mut client_stream, server_stream) = UnixStream::pair().unwrap(); // Spawn server to receive and echo let server_handle = tokio::spawn(async move { @@ -345,7 +335,7 @@ async fn test_our_format_is_valid_jsonrpc() -> Result<()> { /// Test string ID handling (JSON-RPC allows string or number IDs). #[tokio::test] async fn test_string_id_handling() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); let server_handle = tokio::spawn(async move { let mut server = jsonrpc_fdpass::Server::new(); @@ -392,7 +382,7 @@ async fn test_string_id_handling() -> Result<()> { /// Test null params handling. #[tokio::test] async fn test_null_params() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); let server_handle = tokio::spawn(async move { let mut server = jsonrpc_fdpass::Server::new(); @@ -438,7 +428,7 @@ async fn test_null_params() -> Result<()> { /// Test array params (positional parameters as per JSON-RPC 2.0 spec). #[tokio::test] async fn test_array_params() -> Result<()> { - let (client_stream, server_stream) = create_socket_pair().unwrap(); + let (client_stream, server_stream) = UnixStream::pair().unwrap(); let server_handle = tokio::spawn(async move { let mut server = jsonrpc_fdpass::Server::new();