From dca4950e417bb72afee28c9f33eccaeeda7c9050 Mon Sep 17 00:00:00 2001 From: Rick Crawford Date: Tue, 5 May 2026 22:15:48 -0700 Subject: [PATCH] fix: emit webhook delivery headers Add subscription, delivery, and attempt headers to outbound webhook attempts so customers can distinguish subscriptions and retry deliveries. Co-authored-by: Cursor --- CHANGELOG.md | 6 ++++ crates/sbproxy-observe/src/notify.rs | 41 +++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65db66bf..2561aea2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -155,6 +155,12 @@ of the new YAML fields below until the version that ships them. justification. ([crates/sbproxy-*/src/lib.rs]) +- **Outbound webhook delivery identity headers.** Signed customer + webhooks now include `Sbproxy-Subscription-Id`, + `Sbproxy-Delivery-Id`, and 1-based `Sbproxy-Attempt` headers, with a + fresh delivery ULID on every retry attempt. + ([crates/sbproxy-observe/src/notify.rs]) + - **AI client retry resilience.** `MemoryBatchStore` now uses `parking_lot::Mutex` so a panic in one worker cannot poison the in-memory batch map for every later operation. Provider retries now diff --git a/crates/sbproxy-observe/src/notify.rs b/crates/sbproxy-observe/src/notify.rs index 213e90ad..e3601555 100644 --- a/crates/sbproxy-observe/src/notify.rs +++ b/crates/sbproxy-observe/src/notify.rs @@ -36,6 +36,9 @@ //! - `Content-Type: application/json` //! - `Sbproxy-Event-Id: ` (ULID, stable across retries) //! - `Sbproxy-Event-Type: ` +//! - `Sbproxy-Subscription-Id: ` +//! - `Sbproxy-Delivery-Id: ` (ULID, unique per attempt) +//! - `Sbproxy-Attempt: ` (1-based) //! - `Sbproxy-Tenant: ` //! - `Sbproxy-Timestamp: ` //! - `Sbproxy-Signature: =[, =]` @@ -173,6 +176,8 @@ impl VerificationKey { pub struct Subscription { /// Tenant that owns this subscription. pub tenant_id: String, + /// Stable customer-facing subscription identifier. + pub subscription_id: String, /// Destination URL. `https://` enforced in production deployments; /// `http://` is allowed only for local-dev / e2e fixtures. pub url: String, @@ -523,7 +528,7 @@ async fn deliver_with_retries( let mut last_error = String::new(); for attempt in 1..=max_attempts { - match send_attempt(&client, &sub, &event, &body, attempt_timeout).await { + match send_attempt(&client, &sub, &event, &body, attempt_timeout, attempt).await { Ok(status) if (200..300).contains(&status) => { record_attempt(&event.tenant_id, &event.event_type, "success"); tracing::debug!( @@ -595,8 +600,10 @@ async fn send_attempt( event: &OutboundEvent, body: &[u8], attempt_timeout: Duration, + attempt: u32, ) -> Result { let timestamp_ms = event.created_at.timestamp_millis(); + let delivery_id = ulid::Ulid::new().to_string(); let signing_input = build_signing_input(timestamp_ms, body); let signature_header = build_signature_header(&signing_input, sub); @@ -606,6 +613,9 @@ async fn send_attempt( .header("Content-Type", "application/json") .header("Sbproxy-Event-Id", &event.event_id) .header("Sbproxy-Event-Type", &event.event_type) + .header("Sbproxy-Subscription-Id", &sub.subscription_id) + .header("Sbproxy-Delivery-Id", delivery_id) + .header("Sbproxy-Attempt", attempt.to_string()) .header("Sbproxy-Tenant", &event.tenant_id) .header("Sbproxy-Timestamp", timestamp_ms.to_string()) .header("Sbproxy-Signature", signature_header); @@ -901,6 +911,7 @@ mod tests { let store = Arc::new(InMemoryStore::new()); store.add_subscription(Subscription { tenant_id: "tenant_42".to_string(), + subscription_id: "sub_ed25519".to_string(), url: server.url(), event_types: vec!["wallet.low_balance".to_string()], signing_key: SigningKey::Ed25519 { @@ -934,6 +945,10 @@ mod tests { header_value(req, "sbproxy-event-type").unwrap(), "wallet.low_balance" ); + assert_eq!( + header_value(req, "sbproxy-subscription-id").unwrap(), + "sub_ed25519" + ); assert_eq!(header_value(req, "sbproxy-tenant").unwrap(), "tenant_42"); let key = VerificationKey::Ed25519 { @@ -954,6 +969,7 @@ mod tests { let store = Arc::new(InMemoryStore::new()); store.add_subscription(Subscription { tenant_id: "tenant_42".to_string(), + subscription_id: "sub_hmac".to_string(), url: server.url(), event_types: vec!["wallet.*".to_string()], signing_key: SigningKey::HmacSha256 { @@ -1007,6 +1023,7 @@ mod tests { let store = Arc::new(InMemoryStore::new()); store.add_subscription(Subscription { tenant_id: "tenant_42".to_string(), + subscription_id: "sub_dual_key".to_string(), url: server.url(), event_types: vec!["*".to_string()], signing_key: SigningKey::Ed25519 { @@ -1067,6 +1084,7 @@ mod tests { let store = Arc::new(InMemoryStore::new()); store.add_subscription(Subscription { tenant_id: "tenant_42".to_string(), + subscription_id: "sub_retry".to_string(), url: server.url(), event_types: vec!["*".to_string()], signing_key: SigningKey::Ed25519 { @@ -1098,6 +1116,27 @@ mod tests { "expected 6 attempts (1 initial + 5 retries), got {}", captured.len() ); + let mut delivery_ids = std::collections::HashSet::new(); + for (idx, req) in captured.iter().enumerate() { + let expected_attempt = (idx + 1).to_string(); + assert_eq!( + header_value(req, "sbproxy-subscription-id"), + Some("sub_retry") + ); + assert_eq!( + header_value(req, "sbproxy-attempt"), + Some(expected_attempt.as_str()) + ); + let delivery_id = header_value(req, "sbproxy-delivery-id") + .expect("each attempt must carry a delivery id"); + delivery_id + .parse::() + .expect("delivery id must be a ULID"); + assert!( + delivery_ids.insert(delivery_id.to_string()), + "delivery id must rotate per attempt" + ); + } // Deadletter must hold exactly one entry. assert_eq!(store.deadletter_len(), 1);