diff --git a/Cargo.lock b/Cargo.lock index c9f6c4699..ff8f7b1d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,15 @@ dependencies = [ "byte-tools", ] +[[package]] +name = "buf-list" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb213ab6aa87733e74428d8a33f0bb93181810b0ae09ae96b8cc1521d265283" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.9.1" @@ -285,6 +294,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.20.0", + "buf-list", "bytes", "chrono", "dropshot_endpoint", diff --git a/dropshot/Cargo.toml b/dropshot/Cargo.toml index 0028e4d66..8f89e4be8 100644 --- a/dropshot/Cargo.toml +++ b/dropshot/Cargo.toml @@ -14,6 +14,7 @@ categories = ["network-programming", "web-programming::http-server"] async-stream = "0.3.3" async-trait = "0.1.60" base64 = "0.20.0" +buf-list = "0.1.3" bytes = "1" futures = "0.3.25" hostname = "0.3.0" diff --git a/dropshot/src/handler.rs b/dropshot/src/handler.rs index c05abc3ab..de072c4f1 100644 --- a/dropshot/src/handler.rs +++ b/dropshot/src/handler.rs @@ -35,7 +35,7 @@ use super::error::HttpError; use super::http_util::http_extract_path_params; -use super::http_util::http_read_body; +use super::http_util::http_read_body_bytes; use super::http_util::CONTENT_TYPE_JSON; use super::http_util::CONTENT_TYPE_OCTET_STREAM; use super::server::DropshotState; @@ -46,6 +46,7 @@ use crate::api_description::ApiEndpointParameterLocation; use crate::api_description::ApiEndpointResponse; use crate::api_description::ApiSchemaGenerator; use crate::api_description::{ApiEndpointBodyContentType, ExtensionMode}; +use crate::http_util::http_read_body_buf_list; use crate::pagination::PaginationParams; use crate::pagination::PAGINATION_PARAM_SENTINEL; use crate::router::VariableSet; @@ -53,6 +54,7 @@ use crate::to_map::to_map; use crate::websocket::WEBSOCKET_PARAM_SENTINEL; use async_trait::async_trait; +use buf_list::BufList; use bytes::Bytes; use futures::lock::Mutex; use http::HeaderMap; @@ -972,7 +974,7 @@ where { let server = &rqctx.server; let mut request = rqctx.request.lock().await; - let body = http_read_body( + let body = http_read_body_bytes( request.body_mut(), server.config.request_body_max_bytes, ) @@ -1072,6 +1074,9 @@ where /** * `UntypedBody` is an extractor for reading in the contents of the HTTP request * body and making the raw bytes directly available to the consumer. + * + * `UntypedBody` stores its content as a single, contiguous `Bytes`, and is + * recommended for small request bodies. For larger ones, use `ChunkedBody`. */ #[derive(Debug)] pub struct UntypedBody { @@ -1110,7 +1115,7 @@ impl Extractor for UntypedBody { ) -> Result { let server = &rqctx.server; let mut request = rqctx.request.lock().await; - let body_bytes = http_read_body( + let body_bytes = http_read_body_bytes( request.body_mut(), server.config.request_body_max_bytes, ) @@ -1121,25 +1126,68 @@ impl Extractor for UntypedBody { fn metadata( _content_type: ApiEndpointBodyContentType, ) -> ExtractorMetadata { - ExtractorMetadata { - parameters: vec![ApiEndpointParameter::new_body( - ApiEndpointBodyContentType::Bytes, - true, - ApiSchemaGenerator::Static { - schema: Box::new( - SchemaObject { - instance_type: Some(InstanceType::String.into()), - format: Some(String::from("binary")), - ..Default::default() - } - .into(), - ), - dependencies: indexmap::IndexMap::default(), - }, - vec![], - )], - extension_mode: ExtensionMode::None, - } + untyped_metadata() + } +} + +/// An extractor that makes the raw bytes directly available to the consumer as a +/// [`BufList`]. +/// +/// `ChunkedBody` stores its content as a [`BufList`], and is recommended for +/// large request bodies. +#[derive(Debug)] +pub struct ChunkedBody { + content: BufList, +} + +impl ChunkedBody { + /// Returns the contents as a `BufList`. + pub fn into_inner(self) -> BufList { + self.content + } +} + +#[async_trait] +impl Extractor for ChunkedBody { + async fn from_request( + rqctx: Arc>, + ) -> Result { + let server = &rqctx.server; + let mut request = rqctx.request.lock().await; + let content = http_read_body_buf_list( + request.body_mut(), + server.config.request_body_max_bytes, + ) + .await?; + Ok(ChunkedBody { content }) + } + + fn metadata( + _content_type: ApiEndpointBodyContentType, + ) -> ExtractorMetadata { + untyped_metadata() + } +} + +fn untyped_metadata() -> ExtractorMetadata { + ExtractorMetadata { + parameters: vec![ApiEndpointParameter::new_body( + ApiEndpointBodyContentType::Bytes, + true, + ApiSchemaGenerator::Static { + schema: Box::new( + SchemaObject { + instance_type: Some(InstanceType::String.into()), + format: Some(String::from("binary")), + ..Default::default() + } + .into(), + ), + dependencies: indexmap::IndexMap::default(), + }, + vec![], + )], + extension_mode: ExtensionMode::None, } } diff --git a/dropshot/src/http_util.rs b/dropshot/src/http_util.rs index c043ecb5a..6e4b186fc 100644 --- a/dropshot/src/http_util.rs +++ b/dropshot/src/http_util.rs @@ -3,8 +3,10 @@ * General-purpose HTTP-related facilities */ +use buf_list::BufList; use bytes::BufMut; use bytes::Bytes; +use bytes::BytesMut; use hyper::body::HttpBody; use serde::de::DeserializeOwned; @@ -23,17 +25,91 @@ pub const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson"; /** MIME type for form/urlencoded data */ pub const CONTENT_TYPE_URL_ENCODED: &str = "application/x-www-form-urlencoded"; +/// Reads the rest of the body from the request up to the given number of bytes, as a `Bytes`. +/// +/// This is intended for smaller bodies (e.g. a few kilobytes). +/// +/// # Errors +/// +/// Errors if the body length exceeds the given cap. +pub async fn http_read_body_bytes( + body: &mut T, + cap: usize, +) -> Result +where + T: HttpBody + std::marker::Unpin, +{ + http_read_body::(body, cap).await +} + +/// Reads the rest of the body from the request up to the given number of bytes, as a `BufList`. +/// +/// This is intended for smaller bodies (e.g. a megabyte or larger). +/// +/// # Errors +/// +/// Errors if the body length exceeds the given cap. +pub async fn http_read_body_buf_list( + body: &mut T, + cap: usize, +) -> Result +where + T: HttpBody + std::marker::Unpin, +{ + http_read_body::(body, cap).await +} + +trait BufListLike { + type Output; + fn new() -> Self; + fn push_chunk(&mut self, chunk: Bytes); + fn finish(self) -> Self::Output; +} + +impl BufListLike for BytesMut { + type Output = Bytes; + + fn new() -> Self { + BytesMut::new() + } + + fn push_chunk(&mut self, chunk: Bytes) { + self.put(chunk); + } + + fn finish(self) -> Self::Output { + self.freeze() + } +} + +impl BufListLike for BufList { + type Output = BufList; + + fn new() -> Self { + BufList::new() + } + + fn push_chunk(&mut self, chunk: Bytes) { + self.push_chunk(chunk); + } + + fn finish(self) -> Self::Output { + self + } +} + /** * Reads the rest of the body from the request up to the given number of bytes. * If the body fits within the specified cap, a buffer is returned with all the * bytes read. If not, an error is returned. */ -pub async fn http_read_body( +async fn http_read_body( body: &mut T, cap: usize, -) -> Result +) -> Result where T: HttpBody + std::marker::Unpin, + B: BufListLike, { /* * This looks a lot like the implementation of hyper::body::to_bytes(), but @@ -47,7 +123,7 @@ where * work too? * TODO do we need to use saturating_add() here? */ - let mut parts = std::vec::Vec::new(); + let mut parts = B::new(); let mut nbytesread: usize = 0; while let Some(maybebuf) = body.data().await { let buf = maybebuf?; @@ -63,7 +139,7 @@ where } nbytesread += bufsize; - parts.put(buf); + parts.push_chunk(buf); } /* @@ -78,7 +154,7 @@ where // assert!(body.is_end_stream()); // assert!(body.data().await.is_none()); // assert!(body.trailers().await?.is_none()); - Ok(parts.into()) + Ok(parts.finish()) } /** diff --git a/dropshot/src/lib.rs b/dropshot/src/lib.rs index 12fd1fb62..e5ed40af5 100644 --- a/dropshot/src/lib.rs +++ b/dropshot/src/lib.rs @@ -214,7 +214,8 @@ * [query_params: Query,] * [path_params: Path

,] * [body_param: TypedBody,] - * [body_param: UntypedBody,] + * [body_param: UntypedBody,] + * [body_param: ChunkedBody,] * ) -> Result * ``` * @@ -237,11 +238,13 @@ * body as JSON (or form/url-encoded) and deserializing it into an instance * of type `J`. `J` must implement `serde::Deserialize` and `schemars::JsonSchema`. * * [`UntypedBody`] extracts the raw bytes of the request body. + * * [`ChunkedBody`] extracts the raw bytes of the request body as a list of bytes. + * This is recommended for larger bodies. * - * If the handler takes a `Query`, `Path

`, `TypedBody`, or - * `UntypedBody`, and the corresponding extraction cannot be completed, the - * request fails with status code 400 and an error message reflecting a - * validation error. + * If the handler takes a `Query`, `Path

`, `TypedBody`, + * `UntypedBody`, or `ChunkedBody, and the corresponding extraction cannot be + * completed, the request fails with status code 400 and an error message + * reflecting a validation error. * * As with any serde-deserializable type, you can make fields optional by having * the corresponding property of the type be an `Option`. Here's an example of @@ -637,6 +640,7 @@ pub use error::HttpErrorResponseBody; pub use handler::http_response_found; pub use handler::http_response_see_other; pub use handler::http_response_temporary_redirect; +pub use handler::ChunkedBody; pub use handler::Extractor; pub use handler::ExtractorMetadata; pub use handler::FreeformBody; diff --git a/dropshot/tests/common/mod.rs b/dropshot/tests/common/mod.rs index 93e8951d8..bda67bf8a 100644 --- a/dropshot/tests/common/mod.rs +++ b/dropshot/tests/common/mod.rs @@ -34,6 +34,19 @@ pub fn test_setup( TestContext::new(api, 0 as usize, &config_dropshot, Some(logctx), log) } +pub fn test_setup_with_large_request_bodies( + test_name: &str, + api: ApiDescription, + request_body_max_bytes: usize, +) -> TestContext { + let config_dropshot: ConfigDropshot = + ConfigDropshot { request_body_max_bytes, ..Default::default() }; + + let logctx = create_log_context(test_name); + let log = logctx.log.new(o!()); + TestContext::new(api, 0 as usize, &config_dropshot, Some(logctx), log) +} + pub fn create_log_context(test_name: &str) -> LogContext { let log_config = ConfigLogging::File { level: ConfigLoggingLevel::Debug, diff --git a/dropshot/tests/fail/bad_endpoint1.stderr b/dropshot/tests/fail/bad_endpoint1.stderr index 7d1989a1f..ee7244430 100644 --- a/dropshot/tests/fail/bad_endpoint1.stderr +++ b/dropshot/tests/fail/bad_endpoint1.stderr @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result --> tests/fail/bad_endpoint1.rs:20:1 | diff --git a/dropshot/tests/fail/bad_endpoint11.stderr b/dropshot/tests/fail/bad_endpoint11.stderr index 1d4f19d8d..6b866ec6b 100644 --- a/dropshot/tests/fail/bad_endpoint11.stderr +++ b/dropshot/tests/fail/bad_endpoint11.stderr @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result --> tests/fail/bad_endpoint11.rs:13:1 | diff --git a/dropshot/tests/fail/bad_endpoint13.stderr b/dropshot/tests/fail/bad_endpoint13.stderr index 1559b41d3..865171c13 100644 --- a/dropshot/tests/fail/bad_endpoint13.stderr +++ b/dropshot/tests/fail/bad_endpoint13.stderr @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result --> tests/fail/bad_endpoint13.rs:19:1 | diff --git a/dropshot/tests/fail/bad_endpoint2.stderr b/dropshot/tests/fail/bad_endpoint2.stderr index c71207619..a41e6a470 100644 --- a/dropshot/tests/fail/bad_endpoint2.stderr +++ b/dropshot/tests/fail/bad_endpoint2.stderr @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result --> tests/fail/bad_endpoint2.rs:13:1 | diff --git a/dropshot/tests/fail/bad_endpoint3.stderr b/dropshot/tests/fail/bad_endpoint3.stderr index 1c0a1ce47..7961d51b9 100644 --- a/dropshot/tests/fail/bad_endpoint3.stderr +++ b/dropshot/tests/fail/bad_endpoint3.stderr @@ -9,11 +9,11 @@ error[E0277]: the trait bound `String: Extractor` is not satisfied (T1, T2) (T1, T2, T3) (T1,) + ChunkedBody TypedBody UntypedBody WebsocketUpgrade - dropshot::Path - dropshot::Query + and $N others note: required by a bound in `need_extractor` --> tests/fail/bad_endpoint3.rs:11:1 | diff --git a/dropshot/tests/fail/bad_endpoint8.stderr b/dropshot/tests/fail/bad_endpoint8.stderr index dc6067086..e022b38d3 100644 --- a/dropshot/tests/fail/bad_endpoint8.stderr +++ b/dropshot/tests/fail/bad_endpoint8.stderr @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result --> tests/fail/bad_endpoint8.rs:20:1 | diff --git a/dropshot/tests/test_demo.rs b/dropshot/tests/test_demo.rs index e0ed378f7..0b727f776 100644 --- a/dropshot/tests/test_demo.rs +++ b/dropshot/tests/test_demo.rs @@ -26,6 +26,7 @@ use dropshot::test_util::read_string; use dropshot::test_util::TEST_HEADER_1; use dropshot::test_util::TEST_HEADER_2; use dropshot::ApiDescription; +use dropshot::ChunkedBody; use dropshot::HttpError; use dropshot::HttpResponseDeleted; use dropshot::HttpResponseFound; @@ -72,6 +73,7 @@ fn demo_api() -> ApiDescription { api.register(demo_handler_path_param_uuid).unwrap(); api.register(demo_handler_path_param_u32).unwrap(); api.register(demo_handler_untyped_body).unwrap(); + api.register(demo_handler_chunked_body).unwrap(); api.register(demo_handler_delete).unwrap(); api.register(demo_handler_headers).unwrap(); api.register(demo_handler_302_bogus).unwrap(); @@ -704,6 +706,95 @@ async fn test_untyped_body() { testctx.teardown().await; } +#[tokio::test] +async fn test_chunked_body() { + const MAX_BYTES: usize = 1 * 1024 * 1024; + + let api = demo_api(); + let testctx = common::test_setup_with_large_request_bodies( + "test_chunked_body", + api, + MAX_BYTES, + ); + let client = &testctx.client_testctx; + + /* Success case: large body that's still under the limit. */ + let big_body = vec![0u8; MAX_BYTES]; + let mut response = client + .make_request_with_body( + Method::PUT, + "/testing/chunked_body", + big_body.into(), + StatusCode::OK, + ) + .await + .unwrap(); + let json: DemoChunked = read_json(&mut response).await; + println!("for big body, got: {:?}", json); + assert_eq!(json.nbytes, MAX_BYTES); + + /* Error case: body too large. */ + let big_body = vec![0u8; MAX_BYTES + 1]; + let error = client + .make_request_with_body( + Method::PUT, + "/testing/chunked_body", + big_body.into(), + StatusCode::BAD_REQUEST, + ) + .await + .unwrap_err(); + assert_eq!( + error.message, + format!("request body exceeded maximum size of {MAX_BYTES} bytes") + ); + + /* Success case: invalid UTF-8 (chunked_body doesn't do any parsing). */ + let bad_body = vec![0x80u8; 1]; + let mut response = client + .make_request_with_body( + Method::PUT, + "/testing/chunked_body", + bad_body.into(), + StatusCode::OK, + ) + .await + .unwrap(); + let json: DemoChunked = read_json(&mut response).await; + assert_eq!(json.nbytes, 1); + + /* Success case: empty body */ + let mut response = client + .make_request_with_body( + Method::PUT, + "/testing/untyped_body?parse_str=true", + "".into(), + StatusCode::OK, + ) + .await + .unwrap(); + let json: DemoUntyped = read_json(&mut response).await; + assert_eq!(json.nbytes, 0); + assert_eq!(json.as_utf8, Some(String::from(""))); + + /* Success case: non-empty content */ + let body: Vec = Vec::from(&b"t\xce\xbcv"[..]); + let mut response = client + .make_request_with_body( + Method::PUT, + "/testing/untyped_body?parse_str=true", + body.into(), + StatusCode::OK, + ) + .await + .unwrap(); + let json: DemoUntyped = read_json(&mut response).await; + assert_eq!(json.nbytes, 4); + assert_eq!(json.as_utf8, Some(String::from("tμv"))); + + testctx.teardown().await; +} + /* * Test delete request */ @@ -1029,6 +1120,27 @@ async fn demo_handler_untyped_body( Ok(HttpResponseOk(DemoUntyped { nbytes, as_utf8 })) } +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct DemoChunked { + pub nbytes: usize, + pub nchunks: usize, +} + +#[endpoint { + method = PUT, + path = "/testing/chunked_body" +}] +async fn demo_handler_chunked_body( + _rqctx: Arc>, + body: ChunkedBody, +) -> Result, HttpError> { + let buf_list = body.into_inner(); + let nbytes = buf_list.num_bytes(); + let nchunks = buf_list.num_chunks(); + + Ok(HttpResponseOk(DemoChunked { nbytes, nchunks })) +} + #[derive(Deserialize, Serialize, JsonSchema)] pub struct DemoPathImpossible { pub test1: String, diff --git a/dropshot/tests/test_openapi.json b/dropshot/tests/test_openapi.json index 510b99610..92ba16a45 100644 --- a/dropshot/tests/test_openapi.json +++ b/dropshot/tests/test_openapi.json @@ -35,6 +35,36 @@ } } }, + "/datagoeshere-chunked": { + "put": { + "tags": [ + "it" + ], + "operationId": "handler7a", + "requestBody": { + "content": { + "application/octet-stream": { + "schema": { + "type": "string", + "format": "binary" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/dup1": { "get": { "tags": [ diff --git a/dropshot/tests/test_openapi.rs b/dropshot/tests/test_openapi.rs index 6e581f257..14c117bc9 100644 --- a/dropshot/tests/test_openapi.rs +++ b/dropshot/tests/test_openapi.rs @@ -2,10 +2,10 @@ use dropshot::{ endpoint, http_response_found, http_response_see_other, - http_response_temporary_redirect, ApiDescription, FreeformBody, HttpError, - HttpResponseAccepted, HttpResponseCreated, HttpResponseDeleted, - HttpResponseFound, HttpResponseHeaders, HttpResponseOk, - HttpResponseSeeOther, HttpResponseTemporaryRedirect, + http_response_temporary_redirect, ApiDescription, ChunkedBody, + FreeformBody, HttpError, HttpResponseAccepted, HttpResponseCreated, + HttpResponseDeleted, HttpResponseFound, HttpResponseHeaders, + HttpResponseOk, HttpResponseSeeOther, HttpResponseTemporaryRedirect, HttpResponseUpdatedNoContent, PaginationParams, Path, Query, RequestContext, ResultsPage, TagConfig, TagDetails, TypedBody, UntypedBody, }; @@ -172,6 +172,18 @@ async fn handler7( unimplemented!(); } +#[endpoint { + method = PUT, + path = "/datagoeshere-chunked", + tags = ["it"], +}] +async fn handler7a( + _rqctx: Arc>, + _dump: ChunkedBody, +) -> Result { + unimplemented!(); +} + /* * Test that we do not generate duplicate type definitions when the same type is * returned by two different handler functions. @@ -483,6 +495,7 @@ fn make_api( api.register(handler5)?; api.register(handler6)?; api.register(handler7)?; + api.register(handler7a)?; api.register(handler8)?; api.register(handler9)?; api.register(handler10)?; diff --git a/dropshot/tests/test_openapi_fuller.json b/dropshot/tests/test_openapi_fuller.json index e9aaef1bb..cf73a3ae5 100644 --- a/dropshot/tests/test_openapi_fuller.json +++ b/dropshot/tests/test_openapi_fuller.json @@ -43,6 +43,36 @@ } } }, + "/datagoeshere-chunked": { + "put": { + "tags": [ + "it" + ], + "operationId": "handler7a", + "requestBody": { + "content": { + "application/octet-stream": { + "schema": { + "type": "string", + "format": "binary" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/dup1": { "get": { "tags": [ diff --git a/dropshot_endpoint/src/lib.rs b/dropshot_endpoint/src/lib.rs index 5ee6709fa..1b9af4d86 100644 --- a/dropshot_endpoint/src/lib.rs +++ b/dropshot_endpoint/src/lib.rs @@ -87,7 +87,8 @@ const USAGE: &str = "Endpoint handlers must have the following signature: [query_params: Query,] [path_params: Path

,] [body_param: TypedBody,] - [body_param: UntypedBody,] + [body_param: UntypedBody,] + [body_param: ChunkedBody,] ) -> Result"; /// This attribute transforms a handler function into a Dropshot endpoint