Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 392 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ tracing = "0.1"
validator = { version = "0.20", features = ["derive"] }
walkdir = { version = "2" }
worker = { version = "0.6", features = ["http"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
16 changes: 9 additions & 7 deletions crates/anyedge-adapter-axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ license = { workspace = true }

[features]
default = ["axum"]
axum = ["dep:axum", "dep:tokio", "dep:tower", "dep:futures-util"]
axum = ["dep:axum", "dep:tokio", "dep:tower", "dep:futures-util", "dep:reqwest"]
cli = ["dep:anyedge-adapter", "anyedge-adapter/cli", "dep:ctor", "dep:toml", "dep:walkdir"]

[dependencies]
anyhow = { workspace = true }
anyedge-core = { path = "../anyedge-core" }
anyedge-adapter = { path = "../anyedge-adapter", optional = true, features = ["cli"] }
anyedge-core = { path = "../anyedge-core" }
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true, optional = true }
bytes = { workspace = true }
ctor = { workspace = true, optional = true }
futures = { workspace = true }
futures-util = { workspace = true, optional = true }
http = { workspace = true }
log = { workspace = true }
reqwest = { workspace = true, optional = true }
simple_logger = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
toml = { workspace = true, optional = true }
tower = { workspace = true, optional = true }
tracing = { workspace = true }
toml = { workspace = true, optional = true }
walkdir = { workspace = true, optional = true }
log = { workspace = true }
simple_logger = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
axum = { workspace = true, features = ["macros"] }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
52 changes: 52 additions & 0 deletions crates/anyedge-adapter-axum/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::net::SocketAddr;

use anyedge_core::http::Request;

/// Axum-specific context data attached to each request.
#[derive(Clone, Debug)]
pub struct AxumRequestContext {
pub remote_addr: Option<SocketAddr>,
}

impl AxumRequestContext {
pub fn insert(request: &mut Request, context: AxumRequestContext) {
request.extensions_mut().insert(context);
}

pub fn get(request: &Request) -> Option<&AxumRequestContext> {
request.extensions().get::<AxumRequestContext>()
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyedge_core::body::Body;
use anyedge_core::http::request_builder;

#[test]
fn inserts_and_reads_context() {
let mut request = request_builder()
.uri("https://example.com")
.body(Body::empty())
.expect("request");

let context = AxumRequestContext {
remote_addr: Some("127.0.0.1:3000".parse().unwrap()),
};
AxumRequestContext::insert(&mut request, context.clone());

let retrieved = AxumRequestContext::get(&request).expect("context present");
assert_eq!(retrieved.remote_addr, context.remote_addr);
}

#[test]
fn missing_context_returns_none() {
let request = request_builder()
.uri("https://example.com")
.body(Body::empty())
.expect("request");

assert!(AxumRequestContext::get(&request).is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use anyedge_core::router::RouterService;
use log::LevelFilter;
use simple_logger::SimpleLogger;

use super::service::AnyEdgeAxumService;
use crate::service::AnyEdgeAxumService;

/// Configuration used when running the dev server embedding AnyEdge into Axum.
#[derive(Clone)]
Expand Down Expand Up @@ -75,6 +75,7 @@ impl AxumDevServer {
let mut svc = service.clone();
async move { svc.call(req).await }
}));
let make_service = router.into_make_service_with_connect_info::<SocketAddr>();

let shutdown = if config.enable_ctrl_c {
Some(async {
Expand All @@ -84,7 +85,7 @@ impl AxumDevServer {
None
};

let server = axum::serve(listener, router.into_make_service());
let server = axum::serve(listener, make_service);
if let Some(shutdown) = shutdown {
let server = server.with_graceful_shutdown(shutdown);
server.await.context("axum server error")?;
Expand Down
26 changes: 23 additions & 3 deletions crates/anyedge-adapter-axum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
//! Axum adapter for AnyEdge routers and applications.

#[cfg(feature = "axum")]
mod server;

mod context;
#[cfg(feature = "axum")]
mod dev_server;
#[cfg(feature = "axum")]
mod proxy;
#[cfg(feature = "axum")]
pub use server::{run_app, AnyEdgeAxumService, AxumDevServer, AxumDevServerConfig};
mod request;
#[cfg(feature = "axum")]
mod response;
#[cfg(feature = "axum")]
mod service;

#[cfg(feature = "cli")]
pub mod cli;

#[cfg(feature = "axum")]
pub use context::AxumRequestContext;
#[cfg(feature = "axum")]
pub use dev_server::{run_app, AxumDevServer, AxumDevServerConfig};
#[cfg(feature = "axum")]
pub use proxy::AxumProxyClient;
#[cfg(feature = "axum")]
pub use request::into_core_request;
#[cfg(feature = "axum")]
pub use response::into_axum_response;
#[cfg(feature = "axum")]
pub use service::AnyEdgeAxumService;
88 changes: 88 additions & 0 deletions crates/anyedge-adapter-axum/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::time::Duration;

use anyedge_core::body::Body;
use anyedge_core::error::EdgeError;
use anyedge_core::http::{HeaderName, HeaderValue, Method, StatusCode};
use anyedge_core::proxy::{ProxyClient, ProxyRequest, ProxyResponse};
use async_trait::async_trait;
use futures_util::StreamExt;
use reqwest::{header, Client};

pub struct AxumProxyClient {
client: Client,
}

impl Default for AxumProxyClient {
fn default() -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("reqwest client");
Self { client }
}
}

#[async_trait(?Send)]
impl ProxyClient for AxumProxyClient {
async fn send(&self, request: ProxyRequest) -> Result<ProxyResponse, EdgeError> {
let (method, uri, headers, body, _extensions) = request.into_parts();
let reqwest_method = reqwest_method(&method)?;
let mut builder = self.client.request(reqwest_method, uri.to_string());

for (name, value) in headers.iter() {
let header_name = header::HeaderName::from_bytes(name.as_str().as_bytes())
.map_err(EdgeError::internal)?;
let header_value =
header::HeaderValue::from_bytes(value.as_bytes()).map_err(EdgeError::internal)?;
builder = builder.header(header_name, header_value);
}

builder = match body {
Body::Once(bytes) => builder.body(bytes.to_vec()),
Body::Stream(mut stream) => {
let mut buf = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(EdgeError::internal)?;
buf.extend_from_slice(&chunk);
}
builder.body(buf)
}
};

let response = builder.send().await.map_err(EdgeError::internal)?;
let status =
StatusCode::from_u16(response.status().as_u16()).map_err(EdgeError::internal)?;
let mut proxy_response = ProxyResponse::new(status, Body::empty());

for (name, value) in response.headers().iter() {
let header_name =
HeaderName::from_bytes(name.as_str().as_bytes()).map_err(EdgeError::internal)?;
let header_value =
HeaderValue::from_bytes(value.as_bytes()).map_err(EdgeError::internal)?;
proxy_response
.headers_mut()
.insert(header_name, header_value);
}

let bytes = response.bytes().await.map_err(EdgeError::internal)?;
*proxy_response.body_mut() = Body::from(bytes.to_vec());

Ok(proxy_response)
}
}

fn reqwest_method(method: &Method) -> Result<reqwest::Method, EdgeError> {
reqwest::Method::from_bytes(method.as_str().as_bytes()).map_err(EdgeError::internal)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn converts_method_to_reqwest() {
let method = Method::POST;
let req = reqwest_method(&method).expect("reqwest method");
assert_eq!(req, reqwest::Method::POST);
}
}
90 changes: 90 additions & 0 deletions crates/anyedge-adapter-axum/src/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::net::SocketAddr;

use anyedge_core::body::Body;
use anyedge_core::http::Request as CoreRequest;
use anyedge_core::proxy::ProxyHandle;
use axum::body::Body as AxumBody;
use axum::extract::connect_info::ConnectInfo;
use axum::http::Request;

use crate::context::AxumRequestContext;
use crate::proxy::AxumProxyClient;

/// Convert an Axum/Hyper request into an AnyEdge core request while preserving streaming bodies
/// and exposing connection metadata through `AxumRequestContext`.
pub fn into_core_request(request: Request<AxumBody>) -> CoreRequest {
let (parts, body) = request.into_parts();
let stream = body.into_data_stream();
let body = Body::from_stream(stream);
let mut core_request = CoreRequest::from_parts(parts, body);

if let Some(remote_addr) = core_request
.extensions()
.get::<ConnectInfo<SocketAddr>>()
.map(|ConnectInfo(addr)| *addr)
{
core_request
.extensions_mut()
.remove::<ConnectInfo<SocketAddr>>();
AxumRequestContext::insert(
&mut core_request,
AxumRequestContext {
remote_addr: Some(remote_addr),
},
);
}

core_request
.extensions_mut()
.insert(ProxyHandle::with_client(AxumProxyClient::default()));

core_request
}

#[cfg(test)]
mod tests {
use super::*;
use anyedge_core::body::Body;
use anyedge_core::http::Method;

#[test]
fn converts_request_and_records_connect_info() {
let mut request = Request::builder()
.method(Method::POST)
.uri("/demo")
.header("x-test", "1")
.body(AxumBody::from("payload"))
.expect("request");
request
.extensions_mut()
.insert(ConnectInfo::<SocketAddr>("127.0.0.1:4000".parse().unwrap()));

let core_request = into_core_request(request);
assert_eq!(core_request.method(), &Method::POST);
assert_eq!(core_request.uri().path(), "/demo");
assert_eq!(core_request.headers()["x-test"], "1");
match core_request.body() {
Body::Stream(_) => {} // streaming bodies stay streaming
Body::Once(_) => panic!("body should remain streaming"),
}

let context = AxumRequestContext::get(&core_request).expect("context");
assert_eq!(context.remote_addr, Some("127.0.0.1:4000".parse().unwrap()));
assert!(core_request
.extensions()
.get::<ConnectInfo<SocketAddr>>()
.is_none());
}

#[test]
fn missing_connect_info_is_handled_gracefully() {
let request = Request::builder()
.method(Method::GET)
.uri("/demo")
.body(AxumBody::empty())
.expect("request");

let core_request = into_core_request(request);
assert!(AxumRequestContext::get(&core_request).is_none());
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
use axum::body::Body as AxumBody;
use axum::http::{Request, Response, StatusCode};
use axum::http::{Response, StatusCode};
use futures::executor::block_on;
use futures_util::{pin_mut, StreamExt};
use tracing::error;

use anyedge_core::body::Body;
use anyedge_core::http::{Request as CoreRequest, Response as CoreResponse};

/// Convert an Axum/Hyper request into an AnyEdge core request while preserving streaming bodies.
pub fn into_core_request(request: Request<AxumBody>) -> CoreRequest {
let (parts, body) = request.into_parts();
let stream = body.into_data_stream();
let body = Body::from_stream(stream);
CoreRequest::from_parts(parts, body)
}
use anyedge_core::http::Response as CoreResponse;

/// Convert an AnyEdge response into one consumable by Axum/Hyper.
///
Expand Down Expand Up @@ -61,28 +53,9 @@ pub fn into_axum_response(response: CoreResponse) -> Response<AxumBody> {
mod tests {
use super::*;
use anyedge_core::body::Body;
use anyedge_core::http::{response_builder, Method, StatusCode};
use anyedge_core::http::{response_builder, StatusCode};
use futures::stream;

#[test]
fn converts_axum_request_into_core_request() {
let request = Request::builder()
.method(Method::POST)
.uri("/demo")
.header("x-test", "1")
.body(AxumBody::from("payload"))
.expect("request");

let core_request = into_core_request(request);
assert_eq!(core_request.method(), &Method::POST);
assert_eq!(core_request.uri().path(), "/demo");
assert_eq!(core_request.headers()["x-test"], "1");
match core_request.into_body() {
Body::Once(_) => panic!("body should be wrapped as stream"),
Body::Stream(_) => {} // streaming bodies stay streaming
}
}

#[test]
fn converts_core_response_stream_into_axum_body() {
let stream = stream::iter(vec![
Expand Down
6 changes: 0 additions & 6 deletions crates/anyedge-adapter-axum/src/server/mod.rs

This file was deleted.

Loading