diff --git a/Cargo.toml b/Cargo.toml index c20519a..27b356e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] -name = "ndjson-rpc-fdpass" +name = "jsonrpc-fdpass" version = "0.1.0" edition = "2021" -description = "NDJSON JSON-RPC 2.0 with File Descriptor Passing implementation" +description = "JSON-RPC 2.0 with Unix file descriptor passing" authors = ["Colin Walters "] license = "MIT OR Apache-2.0" repository = "https://github.com/cgwalters/spec-json-rpc-fdpass" @@ -15,5 +15,8 @@ thiserror = "1.0" tokio = { version = "1.40", features = ["full"] } tracing = "0.1" tracing-subscriber = "0.3" +jsonrpsee = { version = "0.24", features = ["server", "client-core", "async-client"], default-features = false } + +[dev-dependencies] tempfile = "3.0" -jsonrpsee = { version = "0.24", features = ["server", "client-core"], default-features = false } +async-trait = "0.1" diff --git a/README.md b/README.md index 62d76d5..12f451a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ -# A Specification for NDJSON JSON-RPC 2.0 with File Descriptor Passing (NDJSON-RPC-FD) +# JSON-RPC 2.0 with Unix File Descriptor Passing + +This repository contains both a protocol specification and a Rust implementation +(`jsonrpc-fdpass` crate) for JSON-RPC 2.0 with file descriptor passing over Unix +domain sockets. ## 1. Overview diff --git a/src/main.rs b/examples/demo.rs similarity index 89% rename from src/main.rs rename to examples/demo.rs index 5cde338..72334e3 100644 --- a/src/main.rs +++ b/examples/demo.rs @@ -1,4 +1,4 @@ -use ndjson_rpc_fdpass::{Client, Result, Server}; +use jsonrpc_fdpass::{Client, Result, Server}; use serde_json::Value; use std::fs::File; use std::io::{Read, Write}; @@ -33,7 +33,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { // Register a method that reads from a file descriptor server.register_method("read_file", |_method, _params, fds| { if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -43,7 +43,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { let mut contents = String::new(); file.read_to_string(&mut contents) - .map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?; + .map_err(jsonrpc_fdpass::Error::Io)?; info!("Server read from file: {}", contents.trim()); Ok((Some(Value::String(contents)), Vec::new())) @@ -63,7 +63,7 @@ async fn run_server(listener: UnixListener) -> Result<()> { // Accept one connection and handle it if let Ok((stream, _)) = listener.accept().await { - let transport = ndjson_rpc_fdpass::UnixSocketTransport::new(stream); + let transport = jsonrpc_fdpass::UnixSocketTransport::new(stream)?; let (mut sender, mut receiver) = transport.split(); // Handle messages from this connection @@ -85,8 +85,7 @@ async fn run_client(socket_path: PathBuf) -> Result<()> { let mut client = Client::connect(&socket_path).await?; // Create a temporary file to send to the server - let mut temp_file = - tempfile::NamedTempFile::new().map_err(|e| ndjson_rpc_fdpass::Error::Io(e))?; + let mut temp_file = tempfile::NamedTempFile::new().map_err(jsonrpc_fdpass::Error::Io)?; write!(temp_file, "Hello from client file!").unwrap(); temp_file.flush().unwrap(); diff --git a/src/client.rs b/src/client.rs index a331c84..12f4d58 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,7 +15,7 @@ pub struct Client { impl Client { pub async fn connect>(path: P) -> Result { let stream = UnixStream::connect(path).await?; - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream)?; let (sender, _receiver) = transport.split(); Ok(Self { sender, next_id: 1 }) diff --git a/src/lib.rs b/src/lib.rs index 446eb5e..bdc7705 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ -//! # NDJSON JSON-RPC 2.0 with File Descriptor Passing +//! # JSON-RPC 2.0 with Unix File Descriptor Passing //! -//! This crate provides an implementation of the NDJSON JSON-RPC 2.0 with File Descriptor Passing -//! specification. It enables reliable inter-process communication (IPC) over Unix domain sockets -//! with the ability to pass file descriptors alongside JSON-RPC messages. +//! This crate provides an implementation of JSON-RPC 2.0 with file descriptor passing over Unix +//! domain sockets. It enables reliable inter-process communication (IPC) with the ability to +//! pass file descriptors alongside JSON-RPC messages. //! //! ## Features //! //! - **JSON-RPC 2.0 compliance**: Full support for requests, responses, and notifications //! - **File descriptor passing**: Pass file descriptors using Unix socket ancillary data -//! - **NDJSON framing**: Newline-delimited JSON for reliable message boundaries +//! - **Streaming JSON parsing**: Self-delimiting JSON messages without newline requirements //! - **Async support**: Built on tokio for high-performance async I/O //! - **Type-safe**: Rust's type system ensures correct message handling //! @@ -17,7 +17,7 @@ //! ### Server Example //! //! ```rust,no_run -//! use ndjson_rpc_fdpass::{Server, Result}; +//! use jsonrpc_fdpass::{Server, Result}; //! use std::fs::File; //! use serde_json::Value; //! @@ -33,7 +33,7 @@ //! file.read_to_string(&mut contents).unwrap(); //! Ok((Some(Value::String(contents)), Vec::new())) //! } else { -//! Err(ndjson_rpc_fdpass::Error::InvalidMessage("No FD provided".into())) +//! Err(jsonrpc_fdpass::Error::InvalidMessage("No FD provided".into())) //! } //! }); //! @@ -44,7 +44,7 @@ //! ### Client Example //! //! ```rust,no_run -//! use ndjson_rpc_fdpass::{Client, Result}; +//! use jsonrpc_fdpass::{Client, Result}; //! use std::fs::File; //! use std::os::unix::io::OwnedFd; //! use serde_json::json; @@ -71,13 +71,15 @@ //! //! ## Protocol Details //! -//! This implementation follows the NDJSON JSON-RPC with File Descriptor Passing specification: +//! This implementation is a minimal extension to JSON-RPC 2.0 that adds file descriptor +//! passing over Unix domain sockets: //! //! - Uses Unix domain sockets (SOCK_STREAM) -//! - Messages are framed using newline-delimited JSON (NDJSON) -//! - File descriptors are passed using ancillary data via sendmsg(2)/recvmsg(2) -//! - Each sendmsg() call contains exactly one complete NDJSON message -//! - File descriptors are represented in JSON using placeholder objects +//! - Standard JSON-RPC 2.0 message format with no additional framing requirements +//! - JSON objects are self-delimiting; no newline or length-prefix framing is required +//! - File descriptors are passed as ancillary data via sendmsg(2)/recvmsg(2) +//! - Each sendmsg() call contains exactly one complete JSON-RPC message +//! - File descriptors are represented in JSON using placeholder objects (see below) //! //! ### File Descriptor Placeholders //! diff --git a/src/message.rs b/src/message.rs index 6b7a929..8ce99ef 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,6 +3,41 @@ use jsonrpsee::types::error::ErrorObject as JsonRpcError; use serde::{Deserialize, Serialize}; use std::os::unix::io::OwnedFd; +/// The JSON key used to identify file descriptor placeholders. +pub const FD_PLACEHOLDER_KEY: &str = "__jsonrpc_fd__"; +/// The JSON key for the file descriptor index within a placeholder. +pub const FD_INDEX_KEY: &str = "index"; +/// The JSON-RPC protocol version. +pub const JSONRPC_VERSION: &str = "2.0"; + +/// Count file descriptor placeholders in a JSON value. +pub fn count_fd_placeholders(value: &serde_json::Value) -> usize { + fn count_inner(value: &serde_json::Value, count: &mut usize) { + match value { + serde_json::Value::Object(map) => { + if let (Some(serde_json::Value::Bool(true)), Some(_)) = + (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) + { + *count += 1; + } else { + for v in map.values() { + count_inner(v, count); + } + } + } + serde_json::Value::Array(arr) => { + for v in arr { + count_inner(v, count); + } + } + _ => {} + } + } + let mut count = 0; + count_inner(value, &mut count); + count +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileDescriptorPlaceholder { #[serde(rename = "__jsonrpc_fd__")] @@ -55,7 +90,7 @@ pub enum JsonRpcMessage { impl JsonRpcRequest { pub fn new(method: String, params: Option, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), method, params, id, @@ -66,7 +101,7 @@ impl JsonRpcRequest { impl JsonRpcResponse { pub fn success(result: serde_json::Value, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), result: Some(result), error: None, id, @@ -75,7 +110,7 @@ impl JsonRpcResponse { pub fn error(error: JsonRpcError<'static>, id: serde_json::Value) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), result: None, error: Some(error), id, @@ -86,7 +121,7 @@ impl JsonRpcResponse { impl JsonRpcNotification { pub fn new(method: String, params: Option) -> Self { Self { - jsonrpc: "2.0".to_string(), + jsonrpc: JSONRPC_VERSION.to_string(), method, params, } @@ -137,18 +172,30 @@ impl MessageWithFds { } pub fn serialize_with_placeholders(&self) -> Result { + self.serialize_with_placeholders_impl(false) + } + + pub fn serialize_with_placeholders_pretty(&self) -> Result { + self.serialize_with_placeholders_impl(true) + } + + fn serialize_with_placeholders_impl(&self, pretty: bool) -> Result { let mut message_json = self.message.to_json_value()?; self.insert_placeholders(&mut message_json)?; - let json_str = serde_json::to_string(&message_json)?; - Ok(format!("{}\n", json_str)) + let json_str = if pretty { + serde_json::to_string_pretty(&message_json)? + } else { + serde_json::to_string(&message_json)? + }; + Ok(json_str) } fn insert_placeholders(&self, value: &mut serde_json::Value) -> Result<()> { let fd_count = self.file_descriptors.len(); let mut placeholder_indices = Vec::new(); - self.collect_placeholder_indices(value, &mut placeholder_indices); + Self::collect_placeholder_indices(value, &mut placeholder_indices); if placeholder_indices.len() != fd_count { return Err(Error::MismatchedCount { @@ -166,26 +213,26 @@ impl MessageWithFds { Ok(()) } - fn collect_placeholder_indices(&self, value: &serde_json::Value, indices: &mut Vec) { + fn collect_placeholder_indices(value: &serde_json::Value, indices: &mut Vec) { match value { serde_json::Value::Object(map) => { if let ( Some(serde_json::Value::Bool(true)), Some(serde_json::Value::Number(index)), - ) = (map.get("__jsonrpc_fd__"), map.get("index")) + ) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { if let Some(index) = index.as_u64() { indices.push(index as usize); } } else { for v in map.values() { - self.collect_placeholder_indices(v, indices); + Self::collect_placeholder_indices(v, indices); } } } serde_json::Value::Array(arr) => { for v in arr { - self.collect_placeholder_indices(v, indices); + Self::collect_placeholder_indices(v, indices); } } _ => {} @@ -195,8 +242,7 @@ impl MessageWithFds { pub fn from_json_with_fds(json_str: &str, fds: Vec) -> Result { let message_json: serde_json::Value = serde_json::from_str(json_str)?; - let mut placeholder_count = 0; - Self::count_placeholders(&message_json, &mut placeholder_count); + let placeholder_count = count_fd_placeholders(&message_json); if placeholder_count != fds.len() { return Err(Error::MismatchedCount { @@ -215,28 +261,6 @@ impl MessageWithFds { Ok(Self::new(message, fds)) } - fn count_placeholders(value: &serde_json::Value, count: &mut usize) { - match value { - serde_json::Value::Object(map) => { - if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get("__jsonrpc_fd__"), map.get("index")) - { - *count += 1; - } else { - for v in map.values() { - Self::count_placeholders(v, count); - } - } - } - serde_json::Value::Array(arr) => { - for v in arr { - Self::count_placeholders(v, count); - } - } - _ => {} - } - } - fn validate_placeholder_indices( value: &serde_json::Value, expected_count: usize, @@ -260,7 +284,7 @@ impl MessageWithFds { if let ( Some(serde_json::Value::Bool(true)), Some(serde_json::Value::Number(index)), - ) = (map.get("__jsonrpc_fd__"), map.get("index")) + ) = (map.get(FD_PLACEHOLDER_KEY), map.get(FD_INDEX_KEY)) { if let Some(index) = index.as_u64() { indices.push(index as usize); diff --git a/src/server.rs b/src/server.rs index b625428..83ec03d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -68,7 +68,7 @@ impl Server { } async fn handle_connection(&self, stream: UnixStream) -> Result<()> { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream)?; let (mut sender, mut receiver) = transport.split(); debug!("New connection established"); diff --git a/src/transport.rs b/src/transport.rs index 90a8980..4d6c08a 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,37 +1,42 @@ use crate::error::{Error, Result}; -use crate::message::{JsonRpcMessage, MessageWithFds}; +use crate::message::{count_fd_placeholders, JsonRpcMessage, MessageWithFds}; use rustix::fd::AsFd; use rustix::net::{ RecvAncillaryBuffer, RecvAncillaryMessage, RecvFlags, SendAncillaryBuffer, SendAncillaryMessage, SendFlags, }; use std::collections::VecDeque; -use std::io::{IoSlice, IoSliceMut}; +use std::io::{self, IoSlice, IoSliceMut}; use std::os::unix::io::OwnedFd; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::io::Interest; use tokio::net::UnixStream as TokioUnixStream; use tracing::{debug, trace}; +/// Maximum number of file descriptors per message. +const MAX_FDS_PER_MESSAGE: usize = 8; +/// Read buffer size for incoming data. +const READ_BUFFER_SIZE: usize = 4096; + pub struct UnixSocketTransport { - fd: OwnedFd, + stream: TokioUnixStream, } impl UnixSocketTransport { - pub fn new(stream: TokioUnixStream) -> Self { - // Convert tokio stream to OwnedFd - let fd = stream.into_std().unwrap().into(); - Self { fd } + pub fn new(stream: TokioUnixStream) -> Result { + Ok(Self { stream }) } pub fn split(self) -> (Sender, Receiver) { - let fd = Arc::new(Mutex::new(self.fd)); + let stream = Arc::new(self.stream); ( Sender { - fd: Arc::clone(&fd), + stream: Arc::clone(&stream), + pretty: false, }, Receiver { - fd: Arc::clone(&fd), + stream, buffer: Vec::new(), fd_queue: VecDeque::new(), }, @@ -40,12 +45,26 @@ impl UnixSocketTransport { } pub struct Sender { - fd: Arc>, + stream: Arc, + pretty: bool, } impl Sender { + /// Enable or disable pretty-printed JSON output. + /// + /// When enabled, messages are serialized with indentation and newlines. + /// This is useful for debugging or when interoperating with tools that + /// expect human-readable JSON. + pub fn set_pretty(&mut self, pretty: bool) { + self.pretty = pretty; + } + pub async fn send(&mut self, message_with_fds: MessageWithFds) -> Result<()> { - let serialized = message_with_fds.serialize_with_placeholders()?; + let serialized = if self.pretty { + message_with_fds.serialize_with_placeholders_pretty()? + } else { + message_with_fds.serialize_with_placeholders()? + }; let data = serialized.into_bytes(); trace!( @@ -54,42 +73,43 @@ impl Sender { message_with_fds.file_descriptors.len() ); - let fd = Arc::clone(&self.fd); let fds = message_with_fds.file_descriptors; - tokio::task::spawn_blocking(move || { - let sockfd = fd.lock().unwrap(); + self.stream + .async_io(Interest::WRITABLE, || { + let sockfd = self.stream.as_fd(); - if fds.is_empty() { - // No file descriptors to send - use regular send - rustix::net::send(&*sockfd, &data, SendFlags::empty()) - .map_err(|e| Error::SystemCall(format!("send failed: {}", e)))?; - } else { - // Convert OwnedFd to BorrowedFd for sending - let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect(); + if fds.is_empty() { + // No file descriptors to send - use regular send + rustix::net::send(sockfd, &data, SendFlags::empty()) + .map_err(|e| to_io_error(e, "send"))?; + } else { + // Convert OwnedFd to BorrowedFd for sending + let borrowed_fds: Vec<_> = fds.iter().map(|fd| fd.as_fd()).collect(); - let mut buffer: Vec = vec![0u8; rustix::cmsg_space!(ScmRights(8))]; - let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice()); + let mut buffer: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; + let mut control = SendAncillaryBuffer::new(buffer.as_mut_slice()); - if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) { - return Err(Error::SystemCall( - "Failed to add file descriptors to control message".to_string(), - )); - } + if !control.push(SendAncillaryMessage::ScmRights(&borrowed_fds)) { + return Err(io::Error::other( + "Failed to add file descriptors to control message", + )); + } - let iov = [IoSlice::new(&data)]; - rustix::net::sendmsg(&*sockfd, &iov, &mut control, SendFlags::empty()) - .map_err(|e| Error::SystemCall(format!("sendmsg failed: {}", e)))?; - } - Ok(()) - }) - .await - .map_err(|e| Error::SystemCall(format!("Task join error: {}", e)))? + let iov = [IoSlice::new(&data)]; + rustix::net::sendmsg(sockfd, &iov, &mut control, SendFlags::empty()) + .map_err(|e| to_io_error(e, "sendmsg"))?; + } + Ok(()) + }) + .await + .map_err(Error::Io) } } pub struct Receiver { - fd: Arc>, + stream: Arc, buffer: Vec, fd_queue: VecDeque, } @@ -106,110 +126,96 @@ impl Receiver { } fn try_parse_message(&mut self) -> Result> { - if let Some(newline_pos) = self.buffer.iter().position(|&b| b == b'\n') { - let message_bytes = self.buffer.drain(..=newline_pos).collect::>(); - let message_str = std::str::from_utf8(&message_bytes[..message_bytes.len() - 1]) - .map_err(|_| Error::FramingError)?; - - trace!("Parsing message: {}", message_str); + if self.buffer.is_empty() { + return Ok(None); + } - let mut placeholder_count = 0; - Self::count_placeholders_in_str(message_str, &mut placeholder_count)?; + // Use streaming JSON parser to find message boundaries + let mut stream = + serde_json::Deserializer::from_slice(&self.buffer).into_iter::(); - if placeholder_count > self.fd_queue.len() { - return Err(Error::MismatchedCount { - expected: placeholder_count, - found: self.fd_queue.len(), - }); - } + match stream.next() { + Some(Ok(value)) => { + // Successfully parsed a complete JSON value + let bytes_consumed = stream.byte_offset(); - let fds: Vec = (0..placeholder_count) - .map(|_| self.fd_queue.pop_front().unwrap()) - .collect(); + trace!("Parsed message ({} bytes): {:?}", bytes_consumed, value); - if placeholder_count == 0 && !fds.is_empty() { - return Err(Error::DanglingFileDescriptors); - } + // Drain the consumed bytes from the buffer + self.buffer.drain(..bytes_consumed); - let message = self.parse_json_message(message_str)?; - Ok(Some(MessageWithFds::new(message, fds))) - } else { - Ok(None) - } - } + // Count placeholders and extract FDs + let placeholder_count = count_fd_placeholders(&value); - fn parse_json_message(&self, json_str: &str) -> Result { - let value: serde_json::Value = serde_json::from_str(json_str)?; - JsonRpcMessage::from_json_value(value) - } + if placeholder_count > self.fd_queue.len() { + return Err(Error::MismatchedCount { + expected: placeholder_count, + found: self.fd_queue.len(), + }); + } - fn count_placeholders_in_str(json_str: &str, count: &mut usize) -> Result<()> { - let value: serde_json::Value = serde_json::from_str(json_str)?; - Self::count_placeholders(&value, count); - Ok(()) - } + let fds: Vec = (0..placeholder_count) + .map(|_| self.fd_queue.pop_front().unwrap()) + .collect(); - fn count_placeholders(value: &serde_json::Value, count: &mut usize) { - match value { - serde_json::Value::Object(map) => { - if let (Some(serde_json::Value::Bool(true)), Some(_)) = - (map.get("__jsonrpc_fd__"), map.get("index")) - { - *count += 1; - } else { - for v in map.values() { - Self::count_placeholders(v, count); - } - } + let message = JsonRpcMessage::from_json_value(value)?; + Ok(Some(MessageWithFds::new(message, fds))) } - serde_json::Value::Array(arr) => { - for v in arr { - Self::count_placeholders(v, count); - } + Some(Err(e)) if e.is_eof() => { + // Incomplete JSON - need more data + Ok(None) + } + Some(Err(e)) => { + // Actual parse error + Err(Error::Json(e)) + } + None => { + // No more values (shouldn't happen with non-empty buffer, but handle it) + Ok(None) } - _ => {} } } async fn read_more_data(&mut self) -> Result<()> { - let fd = Arc::clone(&self.fd); - - let (bytes_read, data, fds) = tokio::task::spawn_blocking(move || { - let sockfd = fd.lock().unwrap(); - let mut data_buffer = [0u8; 4096]; - let mut iov = [IoSliceMut::new(&mut data_buffer)]; - let mut cmsg_space: Vec = vec![0u8; rustix::cmsg_space!(ScmRights(8))]; - let mut cmsg_buffer = RecvAncillaryBuffer::new(cmsg_space.as_mut_slice()); - - let result = rustix::net::recvmsg( - &*sockfd, - &mut iov, - &mut cmsg_buffer, - RecvFlags::CMSG_CLOEXEC, - ) - .map_err(|e| Error::SystemCall(format!("recvmsg failed: {}", e)))?; - - let bytes_read = result.bytes; - let mut fds = Vec::new(); - - // Extract file descriptors from control messages - for msg in cmsg_buffer.drain() { - if let RecvAncillaryMessage::ScmRights(received_fds) = msg { - fds.extend(received_fds); + let mut data_buffer = [0u8; READ_BUFFER_SIZE]; + let mut received_fds: Vec = Vec::new(); + + let bytes_read = self + .stream + .async_io(Interest::READABLE, || { + let sockfd = self.stream.as_fd(); + + let mut iov = [IoSliceMut::new(&mut data_buffer)]; + let mut cmsg_space: Vec = + vec![0u8; rustix::cmsg_space!(ScmRights(MAX_FDS_PER_MESSAGE))]; + let mut cmsg_buffer = RecvAncillaryBuffer::new(cmsg_space.as_mut_slice()); + + let result = rustix::net::recvmsg( + sockfd, + &mut iov, + &mut cmsg_buffer, + RecvFlags::CMSG_CLOEXEC, + ) + .map_err(|e| to_io_error(e, "recvmsg"))?; + + // Extract file descriptors from control messages + for msg in cmsg_buffer.drain() { + if let RecvAncillaryMessage::ScmRights(fds) = msg { + received_fds.extend(fds); + } } - } - Ok::<_, Error>((bytes_read, data_buffer, fds)) - }) - .await - .map_err(|e| Error::SystemCall(format!("Task join error: {}", e)))??; + Ok(result.bytes) + }) + .await + .map_err(Error::Io)?; if bytes_read == 0 { return Err(Error::ConnectionClosed); } - self.buffer.extend_from_slice(&data[..bytes_read]); - self.fd_queue.extend(fds); + self.buffer.extend_from_slice(&data_buffer[..bytes_read]); + self.fd_queue.extend(received_fds); debug!( "Read {} bytes, {} FDs in queue", @@ -219,3 +225,14 @@ impl Receiver { Ok(()) } } + +/// Convert a rustix error to an io::Error, preserving EAGAIN/EWOULDBLOCK for async_io +fn to_io_error(e: rustix::io::Errno, operation: &str) -> io::Error { + // rustix::io::Errno can be converted to io::Error, which preserves the error kind + let io_err: io::Error = e.into(); + if io_err.kind() == io::ErrorKind::WouldBlock { + io_err + } else { + io::Error::new(io_err.kind(), format!("{} failed: {}", operation, io_err)) + } +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71b25d7..18ed99d 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,4 +1,4 @@ -use ndjson_rpc_fdpass::{ +use jsonrpc_fdpass::{ JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, MessageWithFds, Result, Server, UnixSocketTransport, }; @@ -22,7 +22,6 @@ async fn test_basic_message_serialization() -> Result<()> { let serialized = message_with_fds.serialize_with_placeholders()?; assert!(serialized.contains("test_method")); assert!(serialized.contains("test_param")); - assert!(serialized.ends_with('\n')); Ok(()) } @@ -75,7 +74,7 @@ async fn test_client_server_communication() -> Result<()> { server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -83,12 +82,12 @@ async fn test_client_server_communication() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send request (no race condition) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let request = JsonRpcRequest::new( @@ -134,7 +133,7 @@ async fn test_file_descriptor_passing() -> Result<()> { server.register_method("read_file", move |_method, _params, fds| { if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -154,7 +153,7 @@ async fn test_file_descriptor_passing() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -162,12 +161,12 @@ async fn test_file_descriptor_passing() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send file descriptor (no race condition) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with file descriptor placeholder @@ -224,7 +223,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { *count += 1; if fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Expected file descriptor".to_string(), )); } @@ -253,7 +252,7 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process multiple messages sequentially @@ -264,12 +263,12 @@ async fn test_multiple_messages_with_fds_sequential() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send multiple messages with file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Send multiple messages sequentially @@ -349,7 +348,7 @@ async fn test_multiple_fds_single_message() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -357,12 +356,12 @@ async fn test_multiple_fds_single_message() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with multiple file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with multiple file descriptor placeholders @@ -443,7 +442,7 @@ async fn test_mixed_messages_with_and_without_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process multiple mixed messages @@ -457,12 +456,12 @@ async fn test_mixed_messages_with_and_without_fds() -> Result<()> { assert_eq!(message_count, 4); } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send mixed messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // 1. Echo message (no FD) @@ -567,7 +566,7 @@ async fn test_large_number_of_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -575,12 +574,12 @@ async fn test_large_number_of_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with many file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with many file descriptor placeholders @@ -654,7 +653,7 @@ async fn test_zero_byte_files_with_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -662,12 +661,12 @@ async fn test_zero_byte_files_with_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with empty file descriptors let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let params = serde_json::json!({ @@ -748,7 +747,7 @@ async fn test_fd_placeholder_index_ordering() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -756,12 +755,12 @@ async fn test_fd_placeholder_index_ordering() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with FDs in specific order let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with placeholders in non-sequential order to test protocol @@ -831,7 +830,7 @@ async fn test_rapid_message_bursts() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process burst of messages @@ -842,12 +841,12 @@ async fn test_rapid_message_bursts() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send burst of messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Send 20 messages in rapid succession, some with FDs, some without @@ -948,7 +947,7 @@ async fn test_interleaved_requests_responses_notifications() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Process interleaved messages @@ -959,12 +958,12 @@ async fn test_interleaved_requests_responses_notifications() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send interleaved messages let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // 1. Request with FD @@ -1072,7 +1071,7 @@ async fn test_invalid_json_framing_error() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); // Expect error when receiving invalid JSON @@ -1085,7 +1084,7 @@ async fn test_invalid_json_framing_error() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send invalid JSON @@ -1126,7 +1125,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Expect error when processing message with FD count mismatch @@ -1144,7 +1143,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message that claims 2 FDs but only provides 1 @@ -1165,7 +1164,7 @@ async fn test_mismatched_fd_count_error() -> Result<()> { "id": 1 }); - let json_str = serde_json::to_string(&json_with_mismatch).unwrap() + "\n"; + let json_str = serde_json::to_string(&json_with_mismatch).unwrap(); // Send the JSON with ancillary data containing only 1 FD (but JSON expects 2) let _fd: OwnedFd = temp_file.into_file().into(); @@ -1206,7 +1205,7 @@ async fn test_invalid_placeholder_indices() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); // Expect error when processing message with invalid placeholder indices @@ -1223,12 +1222,12 @@ async fn test_invalid_placeholder_indices() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with invalid placeholder indices (non-dense range) let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with invalid placeholder indices (0, 2, 4 instead of 0, 1, 2) @@ -1293,7 +1292,7 @@ async fn test_duplicate_placeholder_indices() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); match receiver.receive().await { @@ -1309,12 +1308,12 @@ async fn test_duplicate_placeholder_indices() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with duplicate placeholder indices let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with duplicate placeholder indices @@ -1369,7 +1368,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { // According to spec, this is non-compliant but may be handled // The FDs should be cleaned up if !fds.is_empty() { - return Err(ndjson_rpc_fdpass::Error::InvalidMessage( + return Err(jsonrpc_fdpass::Error::InvalidMessage( "Received FDs but no placeholders in message".to_string(), )); } @@ -1380,7 +1379,7 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); match receiver.receive().await { @@ -1396,12 +1395,12 @@ async fn test_no_placeholders_but_fds_provided() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with FDs but no placeholders let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with NO file descriptor placeholders @@ -1446,7 +1445,7 @@ async fn test_connection_drop_with_pending_fds() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); // Try to receive but client will drop connection @@ -1460,13 +1459,13 @@ async fn test_connection_drop_with_pending_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and then immediately drop connection { let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); let params = serde_json::json!({ @@ -1540,7 +1539,7 @@ async fn test_large_message_with_fds() -> Result<()> { }); if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, mut receiver) = transport.split(); if let Ok(message_with_fds) = receiver.receive().await { @@ -1548,12 +1547,12 @@ async fn test_large_message_with_fds() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send large message with FD let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (mut sender, _receiver) = transport.split(); // Create params with large data and FD placeholder @@ -1591,7 +1590,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { let server_handle = tokio::spawn(async move { if let Ok((stream, _)) = listener.accept().await { - let transport = UnixSocketTransport::new(stream); + let transport = UnixSocketTransport::new(stream).unwrap(); let (_sender, mut receiver) = transport.split(); match receiver.receive().await { @@ -1604,7 +1603,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { } } - Ok::<(), ndjson_rpc_fdpass::Error>(()) + Ok::<(), jsonrpc_fdpass::Error>(()) }); // Connect and send message with malformed placeholder @@ -1626,7 +1625,7 @@ async fn test_malformed_placeholder_structure() -> Result<()> { "id": 1 }); - let json_str = serde_json::to_string(&malformed_json).unwrap() + "\n"; + let json_str = serde_json::to_string(&malformed_json).unwrap(); stream.write_all(json_str.as_bytes()).await.unwrap(); stream.flush().await.unwrap(); @@ -1635,3 +1634,254 @@ async fn test_malformed_placeholder_structure() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_pretty_printed_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_pretty.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process multiple pretty-printed messages + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Connect and send pretty-printed JSON directly + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Send multiple pretty-printed JSON messages (with embedded newlines) + for i in 1..=3 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { + "message": format!("Pretty message {}", i), + "nested": { + "key": "value", + "number": i + } + }, + "id": i + }); + + // Use pretty printing - this includes newlines within the JSON + let pretty_json = serde_json::to_string_pretty(&msg).unwrap(); + assert!( + pretty_json.contains('\n'), + "Pretty JSON should contain newlines" + ); + + stream.write_all(pretty_json.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + } + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_concatenated_compact_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_concat.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + let count = std::sync::Arc::new(std::sync::Mutex::new(0)); + let count_clone = count.clone(); + + server.register_method("echo", move |_method, params, _fds| { + let mut c = count_clone.lock().unwrap(); + *c += 1; + Ok((params, Vec::new())) + }); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process 3 messages + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + + let final_count = *count.lock().unwrap(); + assert_eq!(final_count, 3, "Should have processed 3 messages"); + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Connect and send concatenated compact JSON (no separators) + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Build concatenated JSON without any separators + let mut concatenated = String::new(); + for i in 1..=3 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { "id": i }, + "id": i + }); + // Compact JSON, no trailing newline + concatenated.push_str(&serde_json::to_string(&msg).unwrap()); + } + + // Verify no newlines in the concatenated string + assert!( + !concatenated.contains('\n'), + "Compact JSON should not contain newlines" + ); + + // Send all at once + stream.write_all(concatenated.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + + // Give server time to process + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_mixed_pretty_and_compact_json() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_mixed_format.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process 4 messages with mixed formatting + for _ in 0..4 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + + use tokio::io::AsyncWriteExt; + let mut stream = stream; + + // Alternate between pretty and compact formatting + for i in 1..=4 { + let msg = serde_json::json!({ + "jsonrpc": "2.0", + "method": "echo", + "params": { "iteration": i }, + "id": i + }); + + let json_str = if i % 2 == 0 { + serde_json::to_string_pretty(&msg).unwrap() + } else { + serde_json::to_string(&msg).unwrap() + }; + + stream.write_all(json_str.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + } + + // Clean up + server_handle.abort(); + + Ok(()) +} + +#[tokio::test] +async fn test_sender_pretty_mode() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let socket_path = temp_dir.path().join("test_sender_pretty.sock"); + + let listener = tokio::net::UnixListener::bind(&socket_path).unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = Server::new(); + + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + if let Ok((stream, _)) = listener.accept().await { + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Process messages sent with pretty mode enabled + for _ in 0..3 { + if let Ok(message_with_fds) = receiver.receive().await { + let _ = server.process_message(message_with_fds, &mut sender).await; + } + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let stream = tokio::net::UnixStream::connect(&socket_path).await.unwrap(); + let transport = UnixSocketTransport::new(stream).unwrap(); + let (mut sender, _receiver) = transport.split(); + + // Enable pretty printing via the official API + sender.set_pretty(true); + + for i in 1..=3 { + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(serde_json::json!({ + "message": format!("Pretty mode message {}", i), + "nested": { "key": "value" } + })), + Value::Number(i.into()), + ); + let message = JsonRpcMessage::Request(request); + let message_with_fds = MessageWithFds::new(message, vec![]); + + sender.send(message_with_fds).await?; + } + + // Clean up + server_handle.abort(); + + Ok(()) +} diff --git a/tests/jsonrpsee_interop.rs b/tests/jsonrpsee_interop.rs new file mode 100644 index 0000000..1c1005f --- /dev/null +++ b/tests/jsonrpsee_interop.rs @@ -0,0 +1,482 @@ +//! Integration tests for interoperability with jsonrpsee. +//! +//! These tests verify that our JSON-RPC implementation can exchange messages +//! with jsonrpsee over Unix socket pairs when file descriptors are not present. +//! +//! The tests focus on wire-format compatibility: messages serialized by one +//! implementation should be correctly parsed by the other. + +use jsonrpc_fdpass::{ + JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, MessageWithFds, Result, + UnixSocketTransport, +}; +use serde_json::Value; +use tokio::net::UnixStream; + +/// Test round-trip: our client -> our server, verifying wire format compatibility. +#[tokio::test] +async fn test_wire_format_round_trip() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle one request + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Use our client transport + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + let params = serde_json::json!({ + "string": "hello", + "number": 42, + "array": [1, 2, 3], + "nested": { "key": "value" } + }); + + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(params.clone()), + Value::Number(1.into()), + ); + let message = JsonRpcMessage::Request(request); + sender.send(MessageWithFds::new(message, vec![])).await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(params)); + assert!(resp.error.is_none()); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test that notifications work correctly (no response expected). +#[tokio::test] +async fn test_notification_no_response() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + let received = std::sync::Arc::new(std::sync::Mutex::new(None)); + let received_clone = received.clone(); + + // Channel to signal when server has processed the notification + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); + + // Start our server - note: notifications need register_notification, not register_method + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_notification("notify_me", move |_method, params, _fds| { + *received_clone.lock().unwrap() = params.clone(); + Ok(()) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle one notification + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + // Signal that we're done processing + let _ = done_tx.send(()); + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Send notification (no id) + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, _receiver) = transport.split(); + + let notification = JsonRpcNotification::new( + "notify_me".to_string(), + Some(serde_json::json!({ "event": "test" })), + ); + let message = JsonRpcMessage::Notification(notification); + sender.send(MessageWithFds::new(message, vec![])).await?; + + // Wait for server to signal completion + done_rx.await.expect("server should signal completion"); + + // Verify notification was received + let received_value = received.lock().unwrap().clone(); + assert_eq!(received_value, Some(serde_json::json!({ "event": "test" }))); + + server_handle.abort(); + Ok(()) +} + +/// Test error responses are correctly formatted per JSON-RPC 2.0 spec. +#[tokio::test] +async fn test_error_response_format() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let server = jsonrpc_fdpass::Server::new(); + // Don't register "unknown_method" - it should return method not found + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + let request = JsonRpcRequest::new("unknown_method".to_string(), None, Value::Number(1.into())); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert!(resp.result.is_none()); + assert!(resp.error.is_some()); + let error = resp.error.unwrap(); + assert_eq!(error.code(), -32601); // Method not found + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test that batch requests work (send multiple messages in sequence). +#[tokio::test] +async fn test_sequential_requests() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + // Start our server + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("add", |_method, params, _fds| { + let params = params.ok_or_else(|| { + jsonrpc_fdpass::Error::InvalidMessage("missing params".to_string()) + })?; + let a = params.get("a").and_then(|v| v.as_i64()).unwrap_or(0); + let b = params.get("b").and_then(|v| v.as_i64()).unwrap_or(0); + Ok((Some(Value::Number((a + b).into())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Handle multiple requests + for _ in 0..3 { + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send 3 requests and verify responses + for i in 1..=3i64 { + let request = JsonRpcRequest::new( + "add".to_string(), + Some(serde_json::json!({ "a": i, "b": i * 2 })), + Value::Number(i.into()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Number((i * 3).into()))); + assert_eq!(resp.id, Value::Number(i.into())); + } + _ => panic!("Expected response"), + } + } + + server_handle.abort(); + Ok(()) +} + +/// Test that messages serialized in jsonrpsee format can be parsed by our receiver. +/// This simulates receiving messages from a jsonrpsee client by writing raw JSON. +#[tokio::test] +async fn test_parse_jsonrpsee_format_request() -> Result<()> { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let (mut client_stream, server_stream) = UnixStream::pair().unwrap(); + + // Spawn server to receive and echo + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("test", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + // Write raw JSON in jsonrpsee format (what jsonrpsee would send) + // jsonrpsee typically sends compact JSON + let jsonrpsee_request = serde_json::json!({ + "jsonrpc": "2.0", + "method": "test", + "params": { "key": "value" }, + "id": 1 + }); + let request_str = serde_json::to_string(&jsonrpsee_request).unwrap(); + client_stream + .write_all(request_str.as_bytes()) + .await + .unwrap(); + client_stream.flush().await.unwrap(); + + // Read raw response + let mut response_buf = vec![0u8; 4096]; + let n = client_stream.read(&mut response_buf).await.unwrap(); + let response_str = String::from_utf8_lossy(&response_buf[..n]); + + // Parse and verify response + let response: serde_json::Value = serde_json::from_str(&response_str).unwrap(); + assert_eq!( + response.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + response.get("result"), + Some(&serde_json::json!({ "key": "value" })) + ); + assert_eq!(response.get("id"), Some(&Value::Number(1.into()))); + + server_handle.abort(); + Ok(()) +} + +/// Test that our serialized messages can be parsed as valid JSON-RPC 2.0. +#[tokio::test] +async fn test_our_format_is_valid_jsonrpc() -> Result<()> { + // Create various message types and verify they serialize to valid JSON-RPC 2.0 + let request = JsonRpcRequest::new( + "method".to_string(), + Some(serde_json::json!({"param": "value"})), + Value::Number(1.into()), + ); + let msg = MessageWithFds::new(JsonRpcMessage::Request(request), vec![]); + let serialized = msg.serialize_with_placeholders()?; + + // Parse and validate structure + let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert_eq!( + parsed.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + parsed.get("method"), + Some(&Value::String("method".to_string())) + ); + assert!(parsed.get("id").is_some()); + assert!(parsed.get("params").is_some()); + + // Test notification (no id) + let notification = + JsonRpcNotification::new("notify".to_string(), Some(serde_json::json!({"event": 1}))); + let msg = MessageWithFds::new(JsonRpcMessage::Notification(notification), vec![]); + let serialized = msg.serialize_with_placeholders()?; + + let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert_eq!( + parsed.get("jsonrpc"), + Some(&Value::String("2.0".to_string())) + ); + assert_eq!( + parsed.get("method"), + Some(&Value::String("notify".to_string())) + ); + assert!(parsed.get("id").is_none()); // Notifications have no id + + Ok(()) +} + +/// Test string ID handling (JSON-RPC allows string or number IDs). +#[tokio::test] +async fn test_string_id_handling() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("echo", |_method, params, _fds| Ok((params, Vec::new()))); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Use string ID + let request = JsonRpcRequest::new( + "echo".to_string(), + Some(serde_json::json!({"test": true})), + Value::String("request-uuid-123".to_string()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.id, Value::String("request-uuid-123".to_string())); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test null params handling. +#[tokio::test] +async fn test_null_params() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("no_params", |_method, params, _fds| { + // params should be None + Ok((Some(Value::Bool(params.is_none())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send request with no params + let request = JsonRpcRequest::new("no_params".to_string(), None, Value::Number(1.into())); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Bool(true))); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +} + +/// Test array params (positional parameters as per JSON-RPC 2.0 spec). +#[tokio::test] +async fn test_array_params() -> Result<()> { + let (client_stream, server_stream) = UnixStream::pair().unwrap(); + + let server_handle = tokio::spawn(async move { + let mut server = jsonrpc_fdpass::Server::new(); + server.register_method("sum", |_method, params, _fds| { + let params = params.ok_or_else(|| { + jsonrpc_fdpass::Error::InvalidMessage("missing params".to_string()) + })?; + let sum: i64 = params + .as_array() + .map(|arr| arr.iter().filter_map(|v| v.as_i64()).sum()) + .unwrap_or(0); + Ok((Some(Value::Number(sum.into())), Vec::new())) + }); + + let transport = UnixSocketTransport::new(server_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + if let Ok(msg) = receiver.receive().await { + let _ = server.process_message(msg, &mut sender).await; + } + + Ok::<(), jsonrpc_fdpass::Error>(()) + }); + + let transport = UnixSocketTransport::new(client_stream).unwrap(); + let (mut sender, mut receiver) = transport.split(); + + // Send request with array params (positional) + let request = JsonRpcRequest::new( + "sum".to_string(), + Some(serde_json::json!([1, 2, 3, 4, 5])), + Value::Number(1.into()), + ); + sender + .send(MessageWithFds::new( + JsonRpcMessage::Request(request), + vec![], + )) + .await?; + + let response = receiver.receive().await?; + match response.message { + JsonRpcMessage::Response(resp) => { + assert_eq!(resp.result, Some(Value::Number(15.into()))); + } + _ => panic!("Expected response"), + } + + server_handle.abort(); + Ok(()) +}