From a7c1d114aaa4d1669660c85f9e90c72b525b55a0 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 18 Jun 2026 11:40:20 -0700 Subject: [PATCH] ingest-router: Implement signing of requests that modify request bodies - Wires up synapse signing of requests for handlers where `needs_relay_auth` is true - All relay auth (verification and signing) is consolidated into the executor - Adds a `--credentials-path` flag on the ingest router CLI where the relay formatted credentials file should be passed - Simplifies cloning - `RelaySigner` and `RelayVerifier` no longer need to be `Clone` and unnecessary inner `Arc` removed - Adds executor signature rejection test --- Makefile | 4 +- devservices/config.yml | 2 +- ingest-router/src/auth.rs | 11 +- ingest-router/src/errors.rs | 3 + ingest-router/src/executor.rs | 116 ++++++++++++++++++++- ingest-router/src/ingest_router_service.rs | 54 ++++------ ingest-router/src/lib.rs | 7 +- synapse/src/main.rs | 7 +- 8 files changed, 150 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index 0faa641..cc86de4 100644 --- a/Makefile +++ b/Makefile @@ -98,8 +98,8 @@ run-proxy: cargo run proxy --config-file-path example_config_proxy.yaml .PHONY: run-proxy -run-ingest-router: - cargo run ingest-router --config-file-path example_config_ingest_router.yaml +run-ingest-router: generate-credentials + cargo run ingest-router --config-file-path example_config_ingest_router.yaml --credentials-path relay-credentials.json .PHONY: run-ingest-router generate-credentials: diff --git a/devservices/config.yml b/devservices/config.yml index 4b31d09..cc6fe97 100644 --- a/devservices/config.yml +++ b/devservices/config.yml @@ -35,7 +35,7 @@ services: synapse-ingest-router: image: us-docker.pkg.dev/sentryio/synapse/image:latest - command: ["ingest-router", "--config-file-path", "/app/ingest-router.yaml"] + command: ["ingest-router", "--config-file-path", "/app/ingest-router.yaml", "--credentials-path", "/app/relay-credentials.json"] ports: - "13002:3000" - "13003:3001" diff --git a/ingest-router/src/auth.rs b/ingest-router/src/auth.rs index cd5edcf..a20ea39 100644 --- a/ingest-router/src/auth.rs +++ b/ingest-router/src/auth.rs @@ -26,7 +26,6 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; -use std::sync::Arc; /// Signature freshness window, matching Sentry /// https://github.com/getsentry/sentry/blob/c9138b328e9aad58f95f087c0f8a8843a06dbbe9/src/sentry/api/authentication.py#L260 @@ -70,7 +69,6 @@ struct Credentials { } /// Signs outgoing requests with synapse's relay credentials. -#[derive(Clone)] pub struct RelaySigner { signing_key: SigningKey, relay_id: HeaderValue, @@ -200,10 +198,9 @@ pub struct RelayInfo { /// The trusted set is fixed and small, so keys are configured statically rather than resolved /// at runtime via Sentry's `publickeys` endpoint (the mechanism relay-to-relay verification uses /// for a dynamic relay set). -#[derive(Clone, Default)] pub struct RelayVerifier { /// Trusted downstream relays, keyed by relay id (a UUID). - trusted_relays: Arc>, + trusted_relays: HashMap, } impl RelayVerifier { @@ -214,9 +211,7 @@ impl RelayVerifier { .into_iter() .map(|(id, info)| Ok((id.clone(), parse_public_key(&info.public_key, &id)?))) .collect::, VerifyError>>()?; - Ok(Self { - trusted_relays: Arc::new(trusted_relays), - }) + Ok(Self { trusted_relays }) } /// Verifies the `X-Sentry-Relay-Id` / `X-Sentry-Relay-Signature` headers against `body`. @@ -483,7 +478,7 @@ mod tests { #[test] fn rejects_untrusted_relay() { let (signer, _) = signer_and_verifier(); - let verifier = RelayVerifier::default(); // trusts nobody + let verifier = RelayVerifier::from_relays(HashMap::new()).unwrap(); // trusts nobody let body = b"body"; let headers = signed_headers(&signer, body); assert_eq!( diff --git a/ingest-router/src/errors.rs b/ingest-router/src/errors.rs index 09145dd..f84915d 100644 --- a/ingest-router/src/errors.rs +++ b/ingest-router/src/errors.rs @@ -54,4 +54,7 @@ pub enum IngestRouterError { #[error("Relay verifier configuration error: {0}")] RelayVerifierError(#[from] crate::auth::VerifyError), + + #[error("Relay signer configuration error: {0}")] + RelaySignerError(#[from] crate::auth::SigningError), } diff --git a/ingest-router/src/executor.rs b/ingest-router/src/executor.rs index d92ac64..5168786 100644 --- a/ingest-router/src/executor.rs +++ b/ingest-router/src/executor.rs @@ -1,3 +1,4 @@ +use crate::auth::{RelaySigner, RelayVerifier}; use crate::config::RelayTimeouts; use crate::errors::IngestRouterError; use crate::handler::{CellId, ExecutionMode, Handler}; @@ -26,26 +27,49 @@ static UPSTREAM_REQUEST_COUNT: AtomicU64 = AtomicU64::new(0); pub struct Executor { client: Client>, timeouts: RelayTimeouts, + verifier: Arc, + signer: Arc, } impl Executor { - pub fn new(timeouts: RelayTimeouts) -> Self { + pub fn new(timeouts: RelayTimeouts, verifier: RelayVerifier, signer: RelaySigner) -> Self { let client = Client::builder(TokioExecutor::new()).build(HttpConnector::new()); - Self { client, timeouts } + Self { + client, + timeouts, + verifier: Arc::new(verifier), + signer: Arc::new(signer), + } } - // Splits, executes, and merges the responses using the provided handler. + // Verifies, splits, executes, and merges the responses using the provided handler. pub async fn execute( &self, handler: Arc, request: Request, cells: Cells, ) -> Response { - let (split_requests, metadata) = match handler.split_request(request, &cells).await { + if handler.requires_relay_auth() + && let Err(err) = self + .verifier + .verify_request(request.headers(), request.body()) + { + tracing::warn!(error = %err, handler = handler.name(), "relay signature verification failed"); + return make_error_response(StatusCode::UNAUTHORIZED); + } + + let (mut split_requests, metadata) = match handler.split_request(request, &cells).await { Ok(result) => result, Err(_e) => return make_error_response(StatusCode::INTERNAL_SERVER_ERROR), }; + if handler.requires_relay_auth() { + for (_cell_id, request) in split_requests.iter_mut() { + let body = request.body().clone(); + self.signer.sign_request(request.headers_mut(), &body); + } + } + let results = match handler.execution_mode() { ExecutionMode::Parallel => self.execute_parallel(split_requests, cells).await, ExecutionMode::Failover => self.execute_failover(split_requests, cells).await, @@ -220,3 +244,87 @@ async fn send_to_cell( result } + +#[cfg(test)] +mod tests { + use super::*; + use crate::handler::SplitMetadata; + use crate::testutils::make_signing_keypair; + use async_trait::async_trait; + + /// Minimal handler that requires relay auth; its split is never reached because verification + /// rejects the request first. + struct MockHandler { + requires_auth: bool, + } + + #[async_trait] + impl Handler for MockHandler { + fn name(&self) -> &'static str { + "MockHandler" + } + + fn execution_mode(&self) -> ExecutionMode { + ExecutionMode::Parallel + } + + fn requires_relay_auth(&self) -> bool { + self.requires_auth + } + + async fn split_request( + &self, + _request: Request, + _cells: &Cells, + ) -> Result<(Vec<(CellId, Request)>, SplitMetadata), IngestRouterError> { + unreachable!("request is rejected by verification before the split") + } + + async fn merge_responses( + &self, + _responses: Vec<(CellId, Result, IngestRouterError>)>, + _metadata: SplitMetadata, + ) -> Response { + unreachable!("request is rejected by verification before the split") + } + } + + fn test_cells() -> Cells { + use crate::config::CellConfig; + use crate::locality::Localities; + use std::collections::HashMap; + use url::Url; + + Localities::new(HashMap::from([( + "us".to_string(), + vec![CellConfig { + id: "us1".to_string(), + sentry_url: Url::parse("http://localhost:8080").unwrap(), + relay_url: Url::parse("http://localhost:8090").unwrap(), + }], + )])) + .get_cells("us") + .unwrap() + } + + #[tokio::test] + async fn execute_rejects_request_with_no_signature_when_handler_requires_auth() { + let (signer, verifier) = make_signing_keypair(); + let executor = Executor::new(RelayTimeouts::default(), verifier, signer); + + // An inbound request carrying no relay signature headers is rejected with 401 before the + // handler is ever asked to split it (MockHandler::split_request would panic if reached). + let request = Request::new(Bytes::from_static(b"body")); + let response = executor + .execute( + Arc::new(MockHandler { + requires_auth: true, + }), + request, + test_cells(), + ) + .await; + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + } +} diff --git a/ingest-router/src/ingest_router_service.rs b/ingest-router/src/ingest_router_service.rs index d791e14..6a917b6 100644 --- a/ingest-router/src/ingest_router_service.rs +++ b/ingest-router/src/ingest_router_service.rs @@ -23,7 +23,6 @@ static INFLIGHT: AtomicU64 = AtomicU64::new(0); pub struct IngestRouterService { router: router::Router, executor: executor::Executor, - verifier: auth::RelayVerifier, } impl IngestRouterService { @@ -31,13 +30,10 @@ impl IngestRouterService { router: router::Router, timeouts: config::RelayTimeouts, verifier: auth::RelayVerifier, + signer: auth::RelaySigner, ) -> Self { - let executor = executor::Executor::new(timeouts); - Self { - router, - executor, - verifier, - } + let executor = executor::Executor::new(timeouts, verifier, signer); + Self { router, executor } } } @@ -59,11 +55,6 @@ where let resolved = self.router.resolve(&req); let (parts, body) = req.into_parts(); let executor = self.executor.clone(); - // Clone verifier only for requests that require it - let maybe_verifier = match &resolved { - Some((handler, _)) if handler.requires_relay_auth() => Some(self.verifier.clone()), - _ => None, - }; Box::pin(async move { let (response, handler_name): (Response>, &str) = match resolved { @@ -71,20 +62,9 @@ where let handler_name = handler.name(); match body.collect().await { Ok(c) => { - let body_bytes = c.to_bytes(); - if let Some(verifier) = &maybe_verifier - && let Err(err) = - verifier.verify_request(&parts.headers, &body_bytes) - { - tracing::warn!(error = %err, handler = handler_name, "relay signature verification failed"); - let response = - make_error_response(StatusCode::UNAUTHORIZED).map(Full::new); - (response, handler_name) - } else { - let request = Request::from_parts(parts, body_bytes); - let response = executor.execute(handler, request, cells).await; - (response.map(Full::new), handler_name) - } + let request = Request::from_parts(parts, c.to_bytes()); + let response = executor.execute(handler, request, cells).await; + (response.map(Full::new), handler_name) } Err(_) => { let response = @@ -211,6 +191,17 @@ mod tests { let (signer, verifier) = make_signing_keypair(); + // Project configs request — must be signed by a trusted relay. Sign it before the signer + // is moved into the service (which re-signs the outbound per-cell requests with it). + let body = r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#; + let mut request = Request::builder() + .method(Method::POST) + .uri("/api/0/relays/projectconfigs/") + .header(HOST, "us.sentry.io") + .body(Full::new(Bytes::from(body))) + .unwrap(); + signer.sign_request(request.headers_mut(), body.as_bytes()); + let service = IngestRouterService::new( router::Router::new(routes_config, localities, locator), config::RelayTimeouts { @@ -219,18 +210,9 @@ mod tests { task_subsequent_timeout_secs: 10000, }, verifier, + signer, ); - // Project configs request — must be signed by a trusted relay - let body = r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#; - let mut request = Request::builder() - .method(Method::POST) - .uri("/api/0/relays/projectconfigs/") - .header(HOST, "us.sentry.io") - .body(Full::new(Bytes::from(body))) - .unwrap(); - signer.sign_request(request.headers_mut(), body.as_bytes()); - let response = service.call(request).await.unwrap(); let (parts, body) = response.into_parts(); diff --git a/ingest-router/src/lib.rs b/ingest-router/src/lib.rs index 5d20f22..41e3857 100644 --- a/ingest-router/src/lib.rs +++ b/ingest-router/src/lib.rs @@ -14,21 +14,24 @@ pub mod router; mod testutils; use crate::errors::IngestRouterError; -use auth::RelayVerifier; +use auth::{RelaySigner, RelayVerifier}; use locator::client::Locator; use shared::http::run_http_service; +use std::path::Path; use shared::admin_service::AdminService; -pub async fn run(config: config::Config) -> Result<(), IngestRouterError> { +pub async fn run(config: config::Config, credentials_path: &Path) -> Result<(), IngestRouterError> { let locator = Locator::new(config.locator.to_client_config()).await?; let verifier = RelayVerifier::from_relays(config.relay_keys)?; + let signer = RelaySigner::from_file(credentials_path)?; let ingest_router_service = ingest_router_service::IngestRouterService::new( router::Router::new(config.routes, config.localities, locator.clone()), config.relay_timeouts, verifier, + signer, ); let admin_service = AdminService::new({ let locator = locator.clone(); diff --git a/synapse/src/main.rs b/synapse/src/main.rs index 477d920..68cee8a 100644 --- a/synapse/src/main.rs +++ b/synapse/src/main.rs @@ -84,7 +84,10 @@ fn cli() -> Result<(), CliError> { .ingest_router .ok_or(CliError::InvalidConfig("Missing ingest-router config"))?; - run_async(ingest_router::run(ingest_router_config))?; + run_async(ingest_router::run( + ingest_router_config, + &ingest_router_args.credentials_path, + ))?; Ok(()) } @@ -252,6 +255,8 @@ struct ProxyArgs { struct IngestRouterArgs { #[command(flatten)] base: BaseArgs, + #[arg(long)] + credentials_path: PathBuf, } #[derive(Args, Debug)]