Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dropshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "web-programming::http-server"]
async-stream = "0.3.3"
async-trait = "0.1.60"
base64 = "0.20.0"
buf-list = "0.1.3"
bytes = "1"
futures = "0.3.25"
hostname = "0.3.0"
Expand Down
92 changes: 70 additions & 22 deletions dropshot/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

use super::error::HttpError;
use super::http_util::http_extract_path_params;
use super::http_util::http_read_body;
use super::http_util::http_read_body_bytes;
use super::http_util::CONTENT_TYPE_JSON;
use super::http_util::CONTENT_TYPE_OCTET_STREAM;
use super::server::DropshotState;
Expand All @@ -46,13 +46,15 @@ use crate::api_description::ApiEndpointParameterLocation;
use crate::api_description::ApiEndpointResponse;
use crate::api_description::ApiSchemaGenerator;
use crate::api_description::{ApiEndpointBodyContentType, ExtensionMode};
use crate::http_util::http_read_body_buf_list;
use crate::pagination::PaginationParams;
use crate::pagination::PAGINATION_PARAM_SENTINEL;
use crate::router::VariableSet;
use crate::to_map::to_map;
use crate::websocket::WEBSOCKET_PARAM_SENTINEL;

use async_trait::async_trait;
use buf_list::BufList;
use bytes::Bytes;
use futures::lock::Mutex;
use http::HeaderMap;
Expand Down Expand Up @@ -972,7 +974,7 @@ where
{
let server = &rqctx.server;
let mut request = rqctx.request.lock().await;
let body = http_read_body(
let body = http_read_body_bytes(
request.body_mut(),
server.config.request_body_max_bytes,
)
Expand Down Expand Up @@ -1072,6 +1074,9 @@ where
/**
* `UntypedBody` is an extractor for reading in the contents of the HTTP request
* body and making the raw bytes directly available to the consumer.
*
* `UntypedBody` stores its content as a single, contiguous `Bytes`, and is
* recommended for small request bodies. For larger ones, use `ChunkedBody`.
*/
#[derive(Debug)]
pub struct UntypedBody {
Expand Down Expand Up @@ -1110,7 +1115,7 @@ impl Extractor for UntypedBody {
) -> Result<UntypedBody, HttpError> {
let server = &rqctx.server;
let mut request = rqctx.request.lock().await;
let body_bytes = http_read_body(
let body_bytes = http_read_body_bytes(
request.body_mut(),
server.config.request_body_max_bytes,
)
Expand All @@ -1121,25 +1126,68 @@ impl Extractor for UntypedBody {
fn metadata(
_content_type: ApiEndpointBodyContentType,
) -> ExtractorMetadata {
ExtractorMetadata {
parameters: vec![ApiEndpointParameter::new_body(
ApiEndpointBodyContentType::Bytes,
true,
ApiSchemaGenerator::Static {
schema: Box::new(
SchemaObject {
instance_type: Some(InstanceType::String.into()),
format: Some(String::from("binary")),
..Default::default()
}
.into(),
),
dependencies: indexmap::IndexMap::default(),
},
vec![],
)],
extension_mode: ExtensionMode::None,
}
untyped_metadata()
}
}

/// An extractor that makes the raw bytes directly available to the consumer as a
/// [`BufList`].
///
/// `ChunkedBody` stores its content as a [`BufList`], and is recommended for
/// large request bodies.
#[derive(Debug)]
pub struct ChunkedBody {
content: BufList,
}

impl ChunkedBody {
/// Returns the contents as a `BufList`.
pub fn into_inner(self) -> BufList {
self.content
}
}

#[async_trait]
impl Extractor for ChunkedBody {
async fn from_request<Context: ServerContext>(
rqctx: Arc<RequestContext<Context>>,
) -> Result<ChunkedBody, HttpError> {
let server = &rqctx.server;
let mut request = rqctx.request.lock().await;
let content = http_read_body_buf_list(
request.body_mut(),
server.config.request_body_max_bytes,
)
.await?;
Ok(ChunkedBody { content })
}

fn metadata(
_content_type: ApiEndpointBodyContentType,
) -> ExtractorMetadata {
untyped_metadata()
}
}

fn untyped_metadata() -> ExtractorMetadata {
ExtractorMetadata {
parameters: vec![ApiEndpointParameter::new_body(
ApiEndpointBodyContentType::Bytes,
true,
ApiSchemaGenerator::Static {
schema: Box::new(
SchemaObject {
instance_type: Some(InstanceType::String.into()),
format: Some(String::from("binary")),
..Default::default()
}
.into(),
),
dependencies: indexmap::IndexMap::default(),
},
vec![],
)],
extension_mode: ExtensionMode::None,
}
}

Expand Down
86 changes: 81 additions & 5 deletions dropshot/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
* General-purpose HTTP-related facilities
*/

use buf_list::BufList;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use hyper::body::HttpBody;
use serde::de::DeserializeOwned;

Expand All @@ -23,17 +25,91 @@ pub const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
/** MIME type for form/urlencoded data */
pub const CONTENT_TYPE_URL_ENCODED: &str = "application/x-www-form-urlencoded";

/// Reads the rest of the body from the request up to the given number of bytes, as a `Bytes`.
///
/// This is intended for smaller bodies (e.g. a few kilobytes).
///
/// # Errors
///
/// Errors if the body length exceeds the given cap.
pub async fn http_read_body_bytes<T>(
body: &mut T,
cap: usize,
) -> Result<Bytes, HttpError>
where
T: HttpBody<Data = Bytes, Error = hyper::Error> + std::marker::Unpin,
{
http_read_body::<BytesMut, _>(body, cap).await
}

/// Reads the rest of the body from the request up to the given number of bytes, as a `BufList`.
///
/// This is intended for smaller bodies (e.g. a megabyte or larger).
///
/// # Errors
///
/// Errors if the body length exceeds the given cap.
pub async fn http_read_body_buf_list<T>(
body: &mut T,
cap: usize,
) -> Result<BufList, HttpError>
where
T: HttpBody<Data = Bytes, Error = hyper::Error> + std::marker::Unpin,
{
http_read_body::<BufList, _>(body, cap).await
}

trait BufListLike {
type Output;
fn new() -> Self;
fn push_chunk(&mut self, chunk: Bytes);
fn finish(self) -> Self::Output;
}

impl BufListLike for BytesMut {
type Output = Bytes;

fn new() -> Self {
BytesMut::new()
}

fn push_chunk(&mut self, chunk: Bytes) {
self.put(chunk);
}

fn finish(self) -> Self::Output {
self.freeze()
}
}

impl BufListLike for BufList {
type Output = BufList;

fn new() -> Self {
BufList::new()
}

fn push_chunk(&mut self, chunk: Bytes) {
self.push_chunk(chunk);
}

fn finish(self) -> Self::Output {
self
}
}

/**
* Reads the rest of the body from the request up to the given number of bytes.
* If the body fits within the specified cap, a buffer is returned with all the
* bytes read. If not, an error is returned.
*/
pub async fn http_read_body<T>(
async fn http_read_body<B, T>(
body: &mut T,
cap: usize,
) -> Result<Bytes, HttpError>
) -> Result<B::Output, HttpError>
where
T: HttpBody<Data = Bytes, Error = hyper::Error> + std::marker::Unpin,
B: BufListLike,
{
/*
* This looks a lot like the implementation of hyper::body::to_bytes(), but
Expand All @@ -47,7 +123,7 @@ where
* work too?
* TODO do we need to use saturating_add() here?
*/
let mut parts = std::vec::Vec::new();
let mut parts = B::new();
let mut nbytesread: usize = 0;
while let Some(maybebuf) = body.data().await {
let buf = maybebuf?;
Expand All @@ -63,7 +139,7 @@ where
}

nbytesread += bufsize;
parts.put(buf);
parts.push_chunk(buf);
}

/*
Expand All @@ -78,7 +154,7 @@ where
// assert!(body.is_end_stream());
// assert!(body.data().await.is_none());
// assert!(body.trailers().await?.is_none());
Ok(parts.into())
Ok(parts.finish())
}

/**
Expand Down
14 changes: 9 additions & 5 deletions dropshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@
* [query_params: Query<Q>,]
* [path_params: Path<P>,]
* [body_param: TypedBody<J>,]
* [body_param: UntypedBody<J>,]
* [body_param: UntypedBody,]
* [body_param: ChunkedBody,]
* ) -> Result<HttpResponse*, HttpError>
* ```
*
Expand All @@ -237,11 +238,13 @@
* body as JSON (or form/url-encoded) and deserializing it into an instance
* of type `J`. `J` must implement `serde::Deserialize` and `schemars::JsonSchema`.
* * [`UntypedBody`] extracts the raw bytes of the request body.
* * [`ChunkedBody`] extracts the raw bytes of the request body as a list of bytes.
* This is recommended for larger bodies.
*
* If the handler takes a `Query<Q>`, `Path<P>`, `TypedBody<J>`, or
* `UntypedBody`, and the corresponding extraction cannot be completed, the
* request fails with status code 400 and an error message reflecting a
* validation error.
* If the handler takes a `Query<Q>`, `Path<P>`, `TypedBody<J>`,
* `UntypedBody`, or `ChunkedBody, and the corresponding extraction cannot be
* completed, the request fails with status code 400 and an error message
* reflecting a validation error.
*
* As with any serde-deserializable type, you can make fields optional by having
* the corresponding property of the type be an `Option`. Here's an example of
Expand Down Expand Up @@ -637,6 +640,7 @@ pub use error::HttpErrorResponseBody;
pub use handler::http_response_found;
pub use handler::http_response_see_other;
pub use handler::http_response_temporary_redirect;
pub use handler::ChunkedBody;
pub use handler::Extractor;
pub use handler::ExtractorMetadata;
pub use handler::FreeformBody;
Expand Down
13 changes: 13 additions & 0 deletions dropshot/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ pub fn test_setup(
TestContext::new(api, 0 as usize, &config_dropshot, Some(logctx), log)
}

pub fn test_setup_with_large_request_bodies(
test_name: &str,
api: ApiDescription<usize>,
request_body_max_bytes: usize,
) -> TestContext<usize> {
let config_dropshot: ConfigDropshot =
ConfigDropshot { request_body_max_bytes, ..Default::default() };

let logctx = create_log_context(test_name);
let log = logctx.log.new(o!());
TestContext::new(api, 0 as usize, &config_dropshot, Some(logctx), log)
}

pub fn create_log_context(test_name: &str) -> LogContext {
let log_config = ConfigLogging::File {
level: ConfigLoggingLevel::Debug,
Expand Down
3 changes: 2 additions & 1 deletion dropshot/tests/fail/bad_endpoint1.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature:
[query_params: Query<Q>,]
[path_params: Path<P>,]
[body_param: TypedBody<J>,]
[body_param: UntypedBody<J>,]
[body_param: UntypedBody,]
[body_param: ChunkedBody,]
) -> Result<HttpResponse*, HttpError>
--> tests/fail/bad_endpoint1.rs:20:1
|
Expand Down
3 changes: 2 additions & 1 deletion dropshot/tests/fail/bad_endpoint11.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature:
[query_params: Query<Q>,]
[path_params: Path<P>,]
[body_param: TypedBody<J>,]
[body_param: UntypedBody<J>,]
[body_param: UntypedBody,]
[body_param: ChunkedBody,]
) -> Result<HttpResponse*, HttpError>
--> tests/fail/bad_endpoint11.rs:13:1
|
Expand Down
3 changes: 2 additions & 1 deletion dropshot/tests/fail/bad_endpoint13.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ error: Endpoint handlers must have the following signature:
[query_params: Query<Q>,]
[path_params: Path<P>,]
[body_param: TypedBody<J>,]
[body_param: UntypedBody<J>,]
[body_param: UntypedBody,]
[body_param: ChunkedBody,]
) -> Result<HttpResponse*, HttpError>
--> tests/fail/bad_endpoint13.rs:19:1
|
Expand Down
Loading