-
Notifications
You must be signed in to change notification settings - Fork 8
Migrate handler layer to HTTP types (PR 12) #624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/edgezero-pr11-utility-layer-migration-v2
Are you sure you want to change the base?
Changes from all commits
a9dd665
ba9c608
4cd511c
ff2e0cd
6a2ad3f
e8c06e9
3bb3006
ed3c161
7321e79
caaface
c23663c
ca5585c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| use edgezero_core::body::Body as EdgeBody; | ||
| use edgezero_core::http::{ | ||
| header, HeaderName, HeaderValue, Method, Request as HttpRequest, Response as HttpResponse, | ||
| }; | ||
| use error_stack::Report; | ||
| use fastly::http::Method; | ||
| use fastly::{Request, Response}; | ||
| use fastly::http::Method as FastlyMethod; | ||
| use fastly::{Request as FastlyRequest, Response as FastlyResponse}; | ||
|
|
||
| use trusted_server_core::auction::endpoints::handle_auction; | ||
| use trusted_server_core::auction::{build_orchestrator, AuctionOrchestrator}; | ||
|
|
@@ -10,7 +14,7 @@ use trusted_server_core::constants::{ | |
| ENV_FASTLY_IS_STAGING, ENV_FASTLY_SERVICE_VERSION, HEADER_X_GEO_INFO_AVAILABLE, | ||
| HEADER_X_TS_ENV, HEADER_X_TS_VERSION, | ||
| }; | ||
| use trusted_server_core::error::TrustedServerError; | ||
| use trusted_server_core::error::{IntoHttpResponse, TrustedServerError}; | ||
| use trusted_server_core::geo::GeoInfo; | ||
| use trusted_server_core::integrations::IntegrationRegistry; | ||
| use trusted_server_core::platform::RuntimeServices; | ||
|
|
@@ -41,20 +45,17 @@ use crate::platform::{build_runtime_services, open_kv_store, UnavailableKvStore} | |
|
|
||
| /// Entry point for the Fastly Compute program. | ||
| /// | ||
| /// Uses an undecorated `main()` with `Request::from_client()` instead of | ||
| /// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` | ||
| /// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls | ||
| /// `send_to_client()` on the returned `Response`, which is incompatible with | ||
| /// streaming. | ||
| /// Uses an undecorated `main()` with `FastlyRequest::from_client()` instead of | ||
| /// `#[fastly::main]` so we can call `send_to_client()` explicitly when needed. | ||
| fn main() { | ||
| init_logger(); | ||
|
|
||
| let req = Request::from_client(); | ||
| let mut req = FastlyRequest::from_client(); | ||
|
|
||
| // Keep the health probe independent from settings loading and routing so | ||
| // readiness checks still get a cheap liveness response during startup. | ||
| if req.get_method() == Method::GET && req.get_path() == "/health" { | ||
| Response::from_status(200) | ||
| if req.get_method() == FastlyMethod::GET && req.get_path() == "/health" { | ||
| FastlyResponse::from_status(200) | ||
| .with_body_text_plain("ok") | ||
| .send_to_client(); | ||
| return; | ||
|
|
@@ -89,73 +90,63 @@ fn main() { | |
| } | ||
| }; | ||
|
|
||
| // Start with an unavailable KV slot. Consent-dependent routes lazily | ||
| // replace it with the configured store at dispatch time so unrelated | ||
| // routes stay available when consent persistence is misconfigured. | ||
| let kv_store = std::sync::Arc::new(UnavailableKvStore) | ||
| as std::sync::Arc<dyn trusted_server_core::platform::PlatformKvStore>; | ||
| // Strip client-spoofable forwarded headers at the edge before building | ||
| // any request-derived context or converting to the core HTTP types. | ||
| compat::sanitize_fastly_forwarded_headers(&mut req); | ||
|
|
||
| let runtime_services = build_runtime_services(&req, kv_store); | ||
| let http_req = compat::from_fastly_request(req); | ||
|
|
||
| // route_request may send the response directly (streaming path) or | ||
| // return it for us to send (buffered path). | ||
| if let Some(response) = futures::executor::block_on(route_request( | ||
| let mut response = futures::executor::block_on(route_request( | ||
| &settings, | ||
| &orchestrator, | ||
| &integration_registry, | ||
| &runtime_services, | ||
| req, | ||
| )) { | ||
| response.send_to_client(); | ||
| } | ||
| http_req, | ||
| )) | ||
| .unwrap_or_else(|e| http_error_response(&e)); | ||
|
|
||
| let geo_info = if response.status() == edgezero_core::http::StatusCode::UNAUTHORIZED { | ||
| None | ||
| } else { | ||
| runtime_services | ||
| .geo() | ||
| .lookup(runtime_services.client_info().client_ip) | ||
| .unwrap_or_else(|e| { | ||
| log::warn!("geo lookup failed: {e}"); | ||
| None | ||
| }) | ||
| }; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 thinking — 401 responses no longer carry geo headers — silent behavior change. Previously This may be intentional (avoid leaking geo to unauthenticated callers) but it's a silent change. Add a one-line comment to make the rationale explicit: // Skip geo lookup for 401s so geo headers are not exposed to unauthenticated
// callers. Authenticated routes do their own lookups for consent context.
let geo_info = if response.status() == StatusCode::UNAUTHORIZED { … }; |
||
|
|
||
| finalize_response(&settings, geo_info.as_ref(), &mut response); | ||
|
|
||
| compat::to_fastly_response(response).send_to_client(); | ||
| } | ||
|
|
||
| async fn route_request( | ||
| settings: &Settings, | ||
| orchestrator: &AuctionOrchestrator, | ||
| integration_registry: &IntegrationRegistry, | ||
| runtime_services: &RuntimeServices, | ||
| mut req: Request, | ||
| ) -> Option<Response> { | ||
| // Strip client-spoofable forwarded headers at the edge. | ||
| // On Fastly this service IS the first proxy — these headers from | ||
| // clients are untrusted and can hijack URL rewriting (see #409). | ||
| compat::sanitize_fastly_forwarded_headers(&mut req); | ||
|
|
||
| // Look up geo info via the platform abstraction using the client IP | ||
| // already captured in RuntimeServices at the entry point. | ||
| let geo_info = runtime_services | ||
| .geo() | ||
| .lookup(runtime_services.client_info().client_ip) | ||
| .unwrap_or_else(|e| { | ||
| log::warn!("geo lookup failed: {e}"); | ||
| None | ||
| }); | ||
|
|
||
| req: HttpRequest, | ||
| ) -> Result<HttpResponse, Report<TrustedServerError>> { | ||
| // `get_settings()` should already have rejected invalid handler regexes. | ||
| // Keep this fallback so manually-constructed or otherwise unprepared | ||
| // settings still become an error response instead of panicking. | ||
| let auth_req = compat::from_fastly_headers_ref(&req); | ||
| match enforce_basic_auth(settings, &auth_req) { | ||
| Ok(Some(response)) => { | ||
| let mut response = compat::to_fastly_response(response); | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| return Some(response); | ||
| } | ||
| match enforce_basic_auth(settings, &req) { | ||
| Ok(Some(response)) => return Ok(response), | ||
| Ok(None) => {} | ||
| Err(e) => { | ||
| log::error!("Failed to evaluate basic auth: {:?}", e); | ||
| let mut response = to_error_response(&e); | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| return Some(response); | ||
| } | ||
| Err(e) => return Err(e), | ||
| } | ||
|
|
||
| // Get path and method for routing | ||
| let path = req.get_path().to_string(); | ||
| let method = req.get_method().clone(); | ||
| let path = req.uri().path().to_string(); | ||
| let method = req.method().clone(); | ||
|
|
||
| // Match known routes and handle them | ||
| let result = match (method, path.as_str()) { | ||
| match (method, path.as_str()) { | ||
| // Serve the tsjs library | ||
| (Method::GET, path) if path.starts_with("/static/tsjs=") => { | ||
| handle_tsjs_dynamic(&req, integration_registry) | ||
|
|
@@ -201,14 +192,24 @@ async fn route_request( | |
| (Method::POST, "/first-party/proxy-rebuild") => { | ||
| handle_first_party_proxy_rebuild(settings, runtime_services, req).await | ||
| } | ||
| (m, path) if integration_registry.has_route(&m, path) => integration_registry | ||
| .handle_proxy(&m, path, settings, runtime_services, req) | ||
| .await | ||
| .unwrap_or_else(|| { | ||
| Err(Report::new(TrustedServerError::BadRequest { | ||
| message: format!("Unknown integration route: {path}"), | ||
| })) | ||
| }), | ||
| (m, path) if integration_registry.has_route(&m, path) => { | ||
| // TODO(PR13): migrate integration trait to http types here | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 note — |
||
| integration_registry | ||
| .handle_proxy( | ||
| &m, | ||
| path, | ||
| settings, | ||
| runtime_services, | ||
| compat::to_fastly_request(req), | ||
| ) | ||
| .await | ||
| .unwrap_or_else(|| { | ||
| Err(Report::new(TrustedServerError::BadRequest { | ||
| message: format!("Unknown integration route: {path}"), | ||
| })) | ||
| }) | ||
| .map(compat::from_fastly_response) | ||
| } | ||
|
|
||
| // No known route matched, proxy to publisher origin as fallback | ||
| _ => { | ||
|
|
@@ -218,64 +219,48 @@ async fn route_request( | |
| ); | ||
|
|
||
| match runtime_services_for_consent_route(settings, runtime_services) { | ||
| Ok(publisher_services) => { | ||
| match handle_publisher_request( | ||
| settings, | ||
| integration_registry, | ||
| &publisher_services, | ||
| req, | ||
| ) { | ||
| Ok(PublisherResponse::Stream { | ||
| mut response, | ||
| body, | ||
| params, | ||
| }) => { | ||
| // Streaming path: finalize headers, then stream body to client. | ||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| let mut streaming_body = response.stream_to_client(); | ||
| if let Err(e) = stream_publisher_body( | ||
| body, | ||
| &mut streaming_body, | ||
| ¶ms, | ||
| settings, | ||
| integration_registry, | ||
| ) { | ||
| // Headers already committed. Log and abort — client | ||
| // sees a truncated response. Standard proxy behavior. | ||
| log::error!("Streaming processing failed: {e:?}"); | ||
| drop(streaming_body); | ||
| } else if let Err(e) = streaming_body.finish() { | ||
| log::error!("Failed to finish streaming body: {e}"); | ||
| } | ||
| // Response already sent via stream_to_client() | ||
| return None; | ||
| } | ||
| Ok(PublisherResponse::PassThrough { mut response, body }) => { | ||
| // Binary pass-through: reattach body and send via send_to_client(). | ||
| // This preserves Content-Length and avoids chunked encoding overhead. | ||
| // Fastly streams the body from its internal buffer — no WASM | ||
| // memory buffering occurs. | ||
| response.set_body(body); | ||
| Ok(response) | ||
| } | ||
| Ok(PublisherResponse::Buffered(response)) => Ok(response), | ||
| Err(e) => { | ||
| log::error!("Failed to proxy to publisher origin: {:?}", e); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
| Ok(publisher_services) => handle_publisher_request( | ||
| settings, | ||
| integration_registry, | ||
| &publisher_services, | ||
| req, | ||
| ) | ||
| .await | ||
| .and_then(|pub_response| { | ||
| resolve_publisher_response(pub_response, settings, integration_registry) | ||
| }), | ||
| Err(e) => Err(e), | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| // Convert any errors to HTTP error responses | ||
| let mut response = result.unwrap_or_else(|e| to_error_response(&e)); | ||
|
|
||
| finalize_response(settings, geo_info.as_ref(), &mut response); | ||
| } | ||
| } | ||
|
|
||
| Some(response) | ||
| fn resolve_publisher_response( | ||
| publisher_response: PublisherResponse, | ||
| settings: &Settings, | ||
| integration_registry: &IntegrationRegistry, | ||
| ) -> Result<HttpResponse, Report<TrustedServerError>> { | ||
| match publisher_response { | ||
| PublisherResponse::Buffered(response) => Ok(response), | ||
| PublisherResponse::Stream { | ||
| mut response, | ||
| body, | ||
| params, | ||
| } => { | ||
| let mut output = Vec::new(); | ||
| stream_publisher_body(body, &mut output, ¶ms, settings, integration_registry)?; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔧 wrench — Streaming regression: this arm now buffers the full pipeline output instead of streaming chunks to the client. Before this PR, the adapter handled After this PR,
Structural cause: Fix (preferred) — keep the streaming dispatch in the adapter: // Have route_request return Result<HandlerOutcome, …> where HandlerOutcome
// distinguishes streamed vs buffered responses, then in main():
match outcome {
HandlerOutcome::Streaming { mut response, body, params } => {
finalize_response(&settings, geo_info.as_ref(), &mut response);
let mut fastly_resp = compat::to_fastly_response_skeleton(response);
let mut sb = fastly_resp.stream_to_client();
if let Err(e) = stream_publisher_body(body, &mut sb, ¶ms, &settings, &integration_registry) {
log::error!("Streaming processing failed: {e:?}");
drop(sb);
} else if let Err(e) = sb.finish() {
log::error!("Failed to finish streaming body: {e}");
}
}
HandlerOutcome::Buffered(response) => {
compat::to_fastly_response(response).send_to_client();
}
}Alternative — fold this arm into No test today asserts streaming behavior — the publisher tests ( |
||
| response.headers_mut().insert( | ||
| header::CONTENT_LENGTH, | ||
| HeaderValue::from(output.len() as u64), | ||
| ); | ||
| *response.body_mut() = EdgeBody::from(output); | ||
| Ok(response) | ||
| } | ||
| PublisherResponse::PassThrough { mut response, body } => { | ||
| *response.body_mut() = body; | ||
| Ok(response) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn runtime_services_for_consent_route( | ||
|
|
@@ -304,21 +289,48 @@ fn runtime_services_for_consent_route( | |
| /// Header precedence (last write wins): geo headers are set first, then | ||
| /// version/staging, then operator-configured `settings.response_headers`. | ||
| /// This means operators can intentionally override any managed header. | ||
| fn finalize_response(settings: &Settings, geo_info: Option<&GeoInfo>, response: &mut Response) { | ||
| fn finalize_response(settings: &Settings, geo_info: Option<&GeoInfo>, response: &mut HttpResponse) { | ||
| if let Some(geo) = geo_info { | ||
| geo.set_response_headers(response); | ||
| } else { | ||
| response.set_header(HEADER_X_GEO_INFO_AVAILABLE, "false"); | ||
| response.headers_mut().insert( | ||
| HEADER_X_GEO_INFO_AVAILABLE, | ||
| HeaderValue::from_static("false"), | ||
| ); | ||
| } | ||
|
|
||
| if let Ok(v) = ::std::env::var(ENV_FASTLY_SERVICE_VERSION) { | ||
| response.set_header(HEADER_X_TS_VERSION, v); | ||
| if let Ok(value) = HeaderValue::from_str(&v) { | ||
| response.headers_mut().insert(HEADER_X_TS_VERSION, value); | ||
| } else { | ||
| log::warn!("Skipping invalid FASTLY_SERVICE_VERSION response header value"); | ||
| } | ||
| } | ||
| if ::std::env::var(ENV_FASTLY_IS_STAGING).as_deref() == Ok("1") { | ||
| response.set_header(HEADER_X_TS_ENV, "staging"); | ||
| response | ||
| .headers_mut() | ||
| .insert(HEADER_X_TS_ENV, HeaderValue::from_static("staging")); | ||
| } | ||
|
|
||
| for (key, value) in &settings.response_headers { | ||
| response.set_header(key, value); | ||
| let header_name = HeaderName::from_bytes(key.as_bytes()) | ||
| .expect("settings.response_headers validated at load time"); | ||
| let header_value = | ||
| HeaderValue::from_str(value).expect("settings.response_headers validated at load time"); | ||
| response.headers_mut().insert(header_name, header_value); | ||
| } | ||
| } | ||
|
|
||
| fn http_error_response(report: &Report<TrustedServerError>) -> HttpResponse { | ||
| let root_error = report.current_context(); | ||
| log::error!("Error occurred: {:?}", report); | ||
|
|
||
|
prk-Jr marked this conversation as resolved.
|
||
| let mut response = | ||
| HttpResponse::new(EdgeBody::from(format!("{}\n", root_error.user_message()))); | ||
| *response.status_mut() = root_error.status_code(); | ||
| response.headers_mut().insert( | ||
| header::CONTENT_TYPE, | ||
| HeaderValue::from_static("text/plain; charset=utf-8"), | ||
| ); | ||
| response | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.