Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ipfs-api-backend-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ with-builder = ["ipfs-api-prelude/with-builder"]

[dependencies]
actix-http = "3"
actix-multipart-rfc7578 = "0.10"
actix-multipart-rfc7578 = "0.11"
actix-tls = "3"
awc = "3"
async-trait = "0.1"
bytes = "1"
futures = "0.3"
http = "0.2"
http = "1"
http_02 = { version = "0.2", package = "http" } # pending https://github.com/actix/actix-web/issues/3384
ipfs-api-prelude = { version = "0.6", path = "../ipfs-api-prelude" }
thiserror = "1"
thiserror = "2"
30 changes: 24 additions & 6 deletions ipfs-api-backend-actix/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use http::{
};
use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
use multipart::client::multipart;
use std::time::Duration;
use std::{borrow::Borrow, time::Duration};

const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90);

Expand Down Expand Up @@ -66,6 +66,14 @@ impl ActixBackend {
}
}

// Pending until https://github.com/actix/actix-web/issues/3384
fn to_http_0_2(method: http::Method) -> http_02::Method {
match method {
http::Method::POST => http_02::Method::POST,
_ => todo!("Not used by codebase"),
}
}

#[async_trait(?Send)]
impl Backend for ActixBackend {
type HttpRequest = awc::SendClientRequest;
Expand All @@ -91,7 +99,9 @@ impl Backend for ActixBackend {
Req: ApiRequest,
{
let url = req.absolute_url(&self.base)?;
let req = self.client.request(Req::METHOD, url);
let req = self
.client
.request(to_http_0_2(Req::METHOD), url.to_string());
let req = if let Some((username, password)) = &self.credentials {
req.basic_auth(username, password)
} else {
Expand All @@ -107,8 +117,11 @@ impl Backend for ActixBackend {
Ok(req)
}

fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
res.headers().get(key)
fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<impl Borrow<HeaderValue>> {
// mapping is needed until https://github.com/actix/actix-web/issues/3384 is done
res.headers()
.get(key.as_str())
.and_then(|v| HeaderValue::from_bytes(v.clone().as_bytes()).ok())
}

async fn request_raw<Req>(
Expand All @@ -125,7 +138,12 @@ impl Backend for ActixBackend {
let body = res.body().await?;

// FIXME: Actix compat with bytes 1.0
Ok((status, body))
Ok((
// mapping is needed until https://github.com/actix/actix-web/issues/3384 is done
StatusCode::from_u16(status.as_u16())
.expect("failed mapping http 0.2 to http 1.0 status code"),
body,
))
}

fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
Expand All @@ -146,7 +164,7 @@ impl Backend for ActixBackend {
.err_into()
.map_ok(move |mut res| {
match res.status() {
StatusCode::OK => process(res).right_stream(),
http_02::StatusCode::OK => process(res).right_stream(),
// If the server responded with an error status code, the body
// still needs to be read so an error can be built. This block will
// read the entire body stream, then immediately return an error.
Expand Down
16 changes: 9 additions & 7 deletions ipfs-api-backend-hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ with-send-sync = ["ipfs-api-prelude/with-send-sync"]

[dependencies]
async-trait = "0.1"
base64 = "0.13"
base64 = "0.22"
bytes = "1"
futures = "0.3"
http = "0.2"
hyper = { version = "0.14", features = ["http1", "http2", "client", "tcp"] }
hyper-multipart-rfc7578 = "0.8"
hyper-rustls = { version = "0.23", features = ["rustls-native-certs"], optional = true }
hyper-tls = { version = "0.5", optional = true }
http = "1"
http-body-util = "0.1"
hyper = { version = "1.8", features = ["http1", "http2", "client"] }
hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-legacy"] }
hyper-multipart-rfc7578 = "0.9"
hyper-rustls = { version = "0.27", features = ["rustls-native-certs", "http2"], optional = true }
hyper-tls = { version = "0.6", optional = true }
ipfs-api-prelude = { version = "0.6", path = "../ipfs-api-prelude" }
thiserror = "1"
thiserror = "2"
58 changes: 39 additions & 19 deletions ipfs-api-backend-hyper/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,30 @@
// copied, modified, or distributed except according to those terms.
//

use std::borrow::Borrow;

use crate::error::Error;
use async_trait::async_trait;
use base64::Engine as _;
use bytes::Bytes;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use http::{
header::{HeaderName, HeaderValue},
uri::Scheme,
StatusCode, Uri,
};
use hyper::{
body,
client::{self, connect::Connect, Builder, HttpConnector},
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::body;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::{
client::legacy::{self as client, connect::Connect},
rt::TokioExecutor,
};
use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
use multipart::client::multipart;

type RequestBody = BoxBody<body::Bytes, hyper_multipart_rfc7578::client::Error>;

macro_rules! impl_default {
($http_connector:path) => {
impl_default!($http_connector, <$http_connector>::new());
Expand All @@ -33,7 +41,7 @@ macro_rules! impl_default {
C: Connect + Clone + Send + Sync + 'static,
{
base: Uri,
client: client::Client<C, hyper::Body>,
client: client::Client<C, RequestBody>,

/// Username and password
credentials: Option<(String, String)>,
Expand All @@ -52,7 +60,7 @@ macro_rules! impl_default {

impl TryFromUri for HyperBackend<$http_connector> {
fn build_with_base_uri(base: Uri) -> Self {
let client = Builder::default().build($constructor);
let client = client::Builder::new(TokioExecutor::new()).build($constructor);

HyperBackend {
base,
Expand Down Expand Up @@ -83,8 +91,9 @@ impl_default!(
hyper_rustls::HttpsConnector<HttpConnector>,
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("Could not start native TLS roots")
.https_or_http()
.enable_http1()
.enable_all_versions()
.build()
);

Expand All @@ -104,7 +113,7 @@ impl<C: Connect + Clone + Send + Sync + 'static> HyperBackend<C> {
fn basic_authorization(&self) -> Option<String> {
self.credentials.as_ref().map(|(username, password)| {
let credentials = format!("{}:{}", username, password);
let encoded = base64::encode(credentials);
let encoded = base64::prelude::BASE64_STANDARD.encode(credentials);

format!("Basic {}", encoded)
})
Expand All @@ -117,9 +126,9 @@ impl<C> Backend for HyperBackend<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
type HttpRequest = http::Request<hyper::Body>;
type HttpRequest = http::Request<RequestBody>;

type HttpResponse = http::Response<hyper::Body>;
type HttpResponse = http::Response<hyper::body::Incoming>;

type Error = Error;

Expand Down Expand Up @@ -149,17 +158,21 @@ where
} else {
builder
};

let req = if let Some(form) = form {
form.set_body_convert::<hyper::Body, multipart::Body>(builder)
form.set_body::<multipart::Body>(builder)?
.map(|body| body.boxed())
} else {
builder.body(hyper::Body::empty())
}?;
builder.body(
http_body_util::Empty::new()
.map_err(|never| match never {})
.boxed(),
)?
};

Ok(req)
}

fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<impl Borrow<HeaderValue>> {
res.headers().get(key)
}

Expand All @@ -174,13 +187,19 @@ where
let req = self.build_base_request(req, form)?;
let res = self.client.request(req).await?;
let status = res.status();
let body = body::to_bytes(res.into_body()).await?;
let body = res.into_body().collect().await?.to_bytes();

Ok((status, body))
}

fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
Box::new(res.into_body().err_into())
Box::new(
res.into_body()
.collect()
.into_stream()
.map_ok(|body| body.to_bytes())
.err_into(),
)
}

fn request_stream<Res, F>(
Expand All @@ -202,10 +221,11 @@ where
// still needs to be read so an error can be built. This block will
// read the entire body stream, then immediately return an error.
//
_ => body::to_bytes(res.into_body())
.boxed()
_ => res
.into_body()
.collect()
.map(|maybe_body| match maybe_body {
Ok(body) => Err(Self::process_error_from_body(body)),
Ok(body) => Err(Self::process_error_from_body(body.to_bytes())),
Err(e) => Err(e.into()),
})
.into_stream()
Expand Down
6 changes: 6 additions & 0 deletions ipfs-api-backend-hyper/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ pub enum Error {
#[error("hyper client error `{0}`")]
Client(#[from] hyper::Error),

#[error("hyper client error `{0}`")]
LegacyClient(#[from] hyper_util::client::legacy::Error),

#[error("multipart parsing error `{0}`")]
MultipartParse(#[from] multipart::client::Error),

#[error("http error `{0}`")]
Http(#[from] http::Error),

Expand Down
6 changes: 3 additions & 3 deletions ipfs-api-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ maintenance = { status = "passively-maintained" }

[features]
default = ["with-hyper"]
with-hyper = ["ipfs-api-backend-hyper", "tokio/macros", "tokio/rt-multi-thread"]
with-hyper = ["ipfs-api-backend-hyper", "tokio/macros", "tokio/rt-multi-thread", "ipfs-api-backend-hyper/with-builder"]
with-hyper-tls = ["with-hyper", "ipfs-api-backend-hyper/with-hyper-tls"]
with-hyper-rustls = ["with-hyper", "ipfs-api-backend-hyper/with-hyper-rustls"]
with-actix = ["ipfs-api-backend-actix", "actix-rt"]
with-actix = ["ipfs-api-backend-actix", "actix-rt", "ipfs-api-backend-actix/with-builder"]

[dependencies]
actix-rt = { version = "2.6", optional = true }
futures = "0.3"
ipfs-api-backend-actix = { version = "0.7", path = "../ipfs-api-backend-actix", optional = true }
ipfs-api-backend-hyper = { version = "0.6", path = "../ipfs-api-backend-hyper", optional = true }
tar = "0.4"
thiserror = "1"
thiserror = "2"
tokio = { version = "1", features = ["time"] }
tokio-stream = { version = "0.1", features = ["time"] }
tracing-subscriber = { version = "0.3", features = ["fmt"] }
12 changes: 6 additions & 6 deletions ipfs-api-prelude/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ with-send-sync = []
async-trait = "0.1"
bytes = "1"
cfg-if = "1"
common-multipart-rfc7578 = "0.6"
dirs = "4"
common-multipart-rfc7578 = "0.7"
dirs = "6"
futures = "0.3"
http = "0.2"
multiaddr = "0.17"
http = "1"
multiaddr = "0.18"
multibase = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_urlencoded = "0.7"
thiserror = "1"
thiserror = "2"
tokio = "1"
tokio-util = { version = "0.7", features = ["codec"] }
tracing = "0.1"
typed-builder = { version = "0.10", optional = true }
typed-builder = { version = "0.23", optional = true }
walkdir = "2.3"

[dev-dependencies]
Expand Down
12 changes: 8 additions & 4 deletions ipfs-api-prelude/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use http::{
StatusCode,
};
use serde::Deserialize;
use std::fmt::{Debug, Display};
use std::{
borrow::Borrow,
fmt::{Debug, Display},
};
use tokio_util::codec::{Decoder, FramedRead};

cfg_if::cfg_if! {
Expand Down Expand Up @@ -64,7 +67,8 @@ pub trait Backend {

/// Get the value of a header from an HTTP response.
///
fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue>;
// TODO: replace `impl Borrow` with `&` after https://github.com/actix/actix-web/issues/3384 is done
fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<impl Borrow<HeaderValue>>;

/// Generates a request, and returns the unprocessed response future.
///
Expand Down Expand Up @@ -214,11 +218,11 @@ pub trait Backend {
// is used to indicate that there was an error while streaming
// data with Ipfs.
//
if trailer == X_STREAM_ERROR_KEY {
if trailer.borrow() == X_STREAM_ERROR_KEY {
true
} else {
let err = crate::Error::UnrecognizedTrailerHeader(
String::from_utf8_lossy(trailer.as_ref()).into(),
String::from_utf8_lossy(trailer.borrow().as_bytes()).into(),
);

// There was an unrecognized trailer value. If that is the case,
Expand Down
8 changes: 4 additions & 4 deletions ipfs-api-prelude/src/global_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use common_multipart_rfc7578::client::multipart;
use serde::{Serialize, Serializer};
use std::time::Duration;
use std::{borrow::Borrow, time::Duration};

/// Options valid on any IPFS Api request
///
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<Back: Backend> BackendWithGlobalOptions<Back> {
}
}

fn combine<Req>(&self, req: Req) -> OptCombiner<Req>
fn combine<Req>(&'_ self, req: Req) -> OptCombiner<'_, Req>
where
Req: ApiRequest,
{
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<Back: Backend + Send + Sync> Backend for BackendWithGlobalOptions<Back> {
fn get_header(
res: &Self::HttpResponse,
key: http::header::HeaderName,
) -> Option<&http::HeaderValue> {
) -> Option<impl Borrow<http::HeaderValue>> {
Back::get_header(res, key)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ impl<Back: Backend> Backend for BackendWithGlobalOptions<Back> {
fn get_header(
res: &Self::HttpResponse,
key: http::header::HeaderName,
) -> Option<&http::HeaderValue> {
) -> Option<impl Borrow<http::HeaderValue>> {
Back::get_header(res, key)
}

Expand Down
Loading