Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ run-proxy:
cargo run proxy --config-file-path example_config_proxy.yaml
.PHONY: run-proxy

run-ingest-router:
cargo run ingest-router --config-file-path example_config_ingest_router.yaml
run-ingest-router: generate-credentials
cargo run ingest-router --config-file-path example_config_ingest_router.yaml --credentials-path relay-credentials.json
.PHONY: run-ingest-router

generate-credentials:
Expand Down
2 changes: 1 addition & 1 deletion devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ services:

synapse-ingest-router:
image: us-docker.pkg.dev/sentryio/synapse/image:latest
command: ["ingest-router", "--config-file-path", "/app/ingest-router.yaml"]
command: ["ingest-router", "--config-file-path", "/app/ingest-router.yaml", "--credentials-path", "/app/relay-credentials.json"]
ports:
- "13002:3000"
- "13003:3001"
Expand Down
11 changes: 3 additions & 8 deletions ingest-router/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

/// Signature freshness window, matching Sentry
/// https://github.com/getsentry/sentry/blob/c9138b328e9aad58f95f087c0f8a8843a06dbbe9/src/sentry/api/authentication.py#L260
Expand Down Expand Up @@ -70,7 +69,6 @@ struct Credentials {
}

/// Signs outgoing requests with synapse's relay credentials.
#[derive(Clone)]
pub struct RelaySigner {
signing_key: SigningKey,
relay_id: HeaderValue,
Expand Down Expand Up @@ -200,10 +198,9 @@ pub struct RelayInfo {
/// The trusted set is fixed and small, so keys are configured statically rather than resolved
/// at runtime via Sentry's `publickeys` endpoint (the mechanism relay-to-relay verification uses
/// for a dynamic relay set).
#[derive(Clone, Default)]
pub struct RelayVerifier {
/// Trusted downstream relays, keyed by relay id (a UUID).
trusted_relays: Arc<HashMap<String, VerifyingKey>>,
trusted_relays: HashMap<String, VerifyingKey>,
}

impl RelayVerifier {
Expand All @@ -214,9 +211,7 @@ impl RelayVerifier {
.into_iter()
.map(|(id, info)| Ok((id.clone(), parse_public_key(&info.public_key, &id)?)))
.collect::<Result<HashMap<_, _>, VerifyError>>()?;
Ok(Self {
trusted_relays: Arc::new(trusted_relays),
})
Ok(Self { trusted_relays })
}

/// Verifies the `X-Sentry-Relay-Id` / `X-Sentry-Relay-Signature` headers against `body`.
Expand Down Expand Up @@ -483,7 +478,7 @@ mod tests {
#[test]
fn rejects_untrusted_relay() {
let (signer, _) = signer_and_verifier();
let verifier = RelayVerifier::default(); // trusts nobody
let verifier = RelayVerifier::from_relays(HashMap::new()).unwrap(); // trusts nobody
let body = b"body";
let headers = signed_headers(&signer, body);
assert_eq!(
Expand Down
3 changes: 3 additions & 0 deletions ingest-router/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ pub enum IngestRouterError {

#[error("Relay verifier configuration error: {0}")]
RelayVerifierError(#[from] crate::auth::VerifyError),

#[error("Relay signer configuration error: {0}")]
RelaySignerError(#[from] crate::auth::SigningError),
}
116 changes: 112 additions & 4 deletions ingest-router/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::auth::{RelaySigner, RelayVerifier};
use crate::config::RelayTimeouts;
use crate::errors::IngestRouterError;
use crate::handler::{CellId, ExecutionMode, Handler};
Expand Down Expand Up @@ -26,26 +27,49 @@ static UPSTREAM_REQUEST_COUNT: AtomicU64 = AtomicU64::new(0);
pub struct Executor {
client: Client<HttpConnector, Full<Bytes>>,
timeouts: RelayTimeouts,
verifier: Arc<RelayVerifier>,
signer: Arc<RelaySigner>,
}

impl Executor {
pub fn new(timeouts: RelayTimeouts) -> Self {
pub fn new(timeouts: RelayTimeouts, verifier: RelayVerifier, signer: RelaySigner) -> Self {
let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new());
Self { client, timeouts }
Self {
client,
timeouts,
verifier: Arc::new(verifier),
signer: Arc::new(signer),
}
}

// Splits, executes, and merges the responses using the provided handler.
// Verifies, splits, executes, and merges the responses using the provided handler.
pub async fn execute(
&self,
handler: Arc<dyn Handler>,
request: Request<Bytes>,
cells: Cells,
) -> Response<Bytes> {
let (split_requests, metadata) = match handler.split_request(request, &cells).await {
if handler.requires_relay_auth()
&& let Err(err) = self
.verifier
.verify_request(request.headers(), request.body())
{
tracing::warn!(error = %err, handler = handler.name(), "relay signature verification failed");
return make_error_response(StatusCode::UNAUTHORIZED);
}

let (mut split_requests, metadata) = match handler.split_request(request, &cells).await {
Ok(result) => result,
Err(_e) => return make_error_response(StatusCode::INTERNAL_SERVER_ERROR),
};

if handler.requires_relay_auth() {
for (_cell_id, request) in split_requests.iter_mut() {
let body = request.body().clone();
self.signer.sign_request(request.headers_mut(), &body);
}
}

let results = match handler.execution_mode() {
ExecutionMode::Parallel => self.execute_parallel(split_requests, cells).await,
ExecutionMode::Failover => self.execute_failover(split_requests, cells).await,
Expand Down Expand Up @@ -220,3 +244,87 @@ async fn send_to_cell(

result
}

#[cfg(test)]
mod tests {
use super::*;
use crate::handler::SplitMetadata;
use crate::testutils::make_signing_keypair;
use async_trait::async_trait;

/// Minimal handler that requires relay auth; its split is never reached because verification
/// rejects the request first.
struct MockHandler {
requires_auth: bool,
}

#[async_trait]
impl Handler for MockHandler {
fn name(&self) -> &'static str {
"MockHandler"
}

fn execution_mode(&self) -> ExecutionMode {
ExecutionMode::Parallel
}

fn requires_relay_auth(&self) -> bool {
self.requires_auth
}

async fn split_request(
&self,
_request: Request<Bytes>,
_cells: &Cells,
) -> Result<(Vec<(CellId, Request<Bytes>)>, SplitMetadata), IngestRouterError> {
unreachable!("request is rejected by verification before the split")
}

async fn merge_responses(
&self,
_responses: Vec<(CellId, Result<Response<Bytes>, IngestRouterError>)>,
_metadata: SplitMetadata,
) -> Response<Bytes> {
unreachable!("request is rejected by verification before the split")
}
}

fn test_cells() -> Cells {
use crate::config::CellConfig;
use crate::locality::Localities;
use std::collections::HashMap;
use url::Url;

Localities::new(HashMap::from([(
"us".to_string(),
vec![CellConfig {
id: "us1".to_string(),
sentry_url: Url::parse("http://localhost:8080").unwrap(),
relay_url: Url::parse("http://localhost:8090").unwrap(),
}],
)]))
.get_cells("us")
.unwrap()
}

#[tokio::test]
async fn execute_rejects_request_with_no_signature_when_handler_requires_auth() {
let (signer, verifier) = make_signing_keypair();
let executor = Executor::new(RelayTimeouts::default(), verifier, signer);

// An inbound request carrying no relay signature headers is rejected with 401 before the
// handler is ever asked to split it (MockHandler::split_request would panic if reached).
let request = Request::new(Bytes::from_static(b"body"));
let response = executor
.execute(
Arc::new(MockHandler {
requires_auth: true,
}),
request,
test_cells(),
)
.await;

assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}
}
54 changes: 18 additions & 36 deletions ingest-router/src/ingest_router_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@ static INFLIGHT: AtomicU64 = AtomicU64::new(0);
pub struct IngestRouterService {
router: router::Router,
executor: executor::Executor,
verifier: auth::RelayVerifier,
}

impl IngestRouterService {
pub fn new(
router: router::Router,
timeouts: config::RelayTimeouts,
verifier: auth::RelayVerifier,
signer: auth::RelaySigner,
) -> Self {
let executor = executor::Executor::new(timeouts);
Self {
router,
executor,
verifier,
}
let executor = executor::Executor::new(timeouts, verifier, signer);
Self { router, executor }
}
}

Expand All @@ -59,32 +55,16 @@ where
let resolved = self.router.resolve(&req);
let (parts, body) = req.into_parts();
let executor = self.executor.clone();
// Clone verifier only for requests that require it
let maybe_verifier = match &resolved {
Some((handler, _)) if handler.requires_relay_auth() => Some(self.verifier.clone()),
_ => None,
};

Box::pin(async move {
let (response, handler_name): (Response<Full<Bytes>>, &str) = match resolved {
Some((handler, cells)) => {
let handler_name = handler.name();
match body.collect().await {
Ok(c) => {
let body_bytes = c.to_bytes();
if let Some(verifier) = &maybe_verifier
&& let Err(err) =
verifier.verify_request(&parts.headers, &body_bytes)
{
tracing::warn!(error = %err, handler = handler_name, "relay signature verification failed");
let response =
make_error_response(StatusCode::UNAUTHORIZED).map(Full::new);
(response, handler_name)
} else {
let request = Request::from_parts(parts, body_bytes);
let response = executor.execute(handler, request, cells).await;
(response.map(Full::new), handler_name)
}
let request = Request::from_parts(parts, c.to_bytes());
let response = executor.execute(handler, request, cells).await;
(response.map(Full::new), handler_name)
}
Err(_) => {
let response =
Expand Down Expand Up @@ -211,6 +191,17 @@ mod tests {

let (signer, verifier) = make_signing_keypair();

// Project configs request — must be signed by a trusted relay. Sign it before the signer
// is moved into the service (which re-signs the outbound per-cell requests with it).
let body = r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#;
let mut request = Request::builder()
.method(Method::POST)
.uri("/api/0/relays/projectconfigs/")
.header(HOST, "us.sentry.io")
.body(Full::new(Bytes::from(body)))
.unwrap();
signer.sign_request(request.headers_mut(), body.as_bytes());

let service = IngestRouterService::new(
router::Router::new(routes_config, localities, locator),
config::RelayTimeouts {
Expand All @@ -219,18 +210,9 @@ mod tests {
task_subsequent_timeout_secs: 10000,
},
verifier,
signer,
);

// Project configs request — must be signed by a trusted relay
let body = r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#;
let mut request = Request::builder()
.method(Method::POST)
.uri("/api/0/relays/projectconfigs/")
.header(HOST, "us.sentry.io")
.body(Full::new(Bytes::from(body)))
.unwrap();
signer.sign_request(request.headers_mut(), body.as_bytes());

let response = service.call(request).await.unwrap();

let (parts, body) = response.into_parts();
Expand Down
7 changes: 5 additions & 2 deletions ingest-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ pub mod router;
mod testutils;

use crate::errors::IngestRouterError;
use auth::RelayVerifier;
use auth::{RelaySigner, RelayVerifier};
use locator::client::Locator;
use shared::http::run_http_service;
use std::path::Path;

use shared::admin_service::AdminService;

pub async fn run(config: config::Config) -> Result<(), IngestRouterError> {
pub async fn run(config: config::Config, credentials_path: &Path) -> Result<(), IngestRouterError> {
let locator = Locator::new(config.locator.to_client_config()).await?;

let verifier = RelayVerifier::from_relays(config.relay_keys)?;
let signer = RelaySigner::from_file(credentials_path)?;

let ingest_router_service = ingest_router_service::IngestRouterService::new(
router::Router::new(config.routes, config.localities, locator.clone()),
config.relay_timeouts,
verifier,
signer,
);
let admin_service = AdminService::new({
let locator = locator.clone();
Expand Down
7 changes: 6 additions & 1 deletion synapse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ fn cli() -> Result<(), CliError> {
.ingest_router
.ok_or(CliError::InvalidConfig("Missing ingest-router config"))?;

run_async(ingest_router::run(ingest_router_config))?;
run_async(ingest_router::run(
ingest_router_config,
&ingest_router_args.credentials_path,
))?;

Ok(())
}
Expand Down Expand Up @@ -252,6 +255,8 @@ struct ProxyArgs {
struct IngestRouterArgs {
#[command(flatten)]
base: BaseArgs,
#[arg(long)]
credentials_path: PathBuf,
}

#[derive(Args, Debug)]
Expand Down
Loading