Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ of the new YAML fields below until the version that ships them.
justification.
([crates/sbproxy-*/src/lib.rs])

- **Outbound webhook delivery identity headers.** Signed customer
webhooks now include `Sbproxy-Subscription-Id`,
`Sbproxy-Delivery-Id`, and 1-based `Sbproxy-Attempt` headers, with a
fresh delivery ULID on every retry attempt.
([crates/sbproxy-observe/src/notify.rs])

- **AI client retry resilience.** `MemoryBatchStore` now uses
`parking_lot::Mutex` so a panic in one worker cannot poison the
in-memory batch map for every later operation. Provider retries now
Expand Down
41 changes: 40 additions & 1 deletion crates/sbproxy-observe/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
//! - `Content-Type: application/json`
//! - `Sbproxy-Event-Id: <event_id>` (ULID, stable across retries)
//! - `Sbproxy-Event-Type: <event_type>`
//! - `Sbproxy-Subscription-Id: <subscription_id>`
//! - `Sbproxy-Delivery-Id: <delivery_id>` (ULID, unique per attempt)
//! - `Sbproxy-Attempt: <attempt>` (1-based)
//! - `Sbproxy-Tenant: <tenant_id>`
//! - `Sbproxy-Timestamp: <unix-ms>`
//! - `Sbproxy-Signature: <kid>=<base64-sig>[, <prev-kid>=<base64-prev-sig>]`
Expand Down Expand Up @@ -173,6 +176,8 @@ impl VerificationKey {
pub struct Subscription {
/// Tenant that owns this subscription.
pub tenant_id: String,
/// Stable customer-facing subscription identifier.
pub subscription_id: String,
/// Destination URL. `https://` enforced in production deployments;
/// `http://` is allowed only for local-dev / e2e fixtures.
pub url: String,
Expand Down Expand Up @@ -523,7 +528,7 @@ async fn deliver_with_retries(
let mut last_error = String::new();

for attempt in 1..=max_attempts {
match send_attempt(&client, &sub, &event, &body, attempt_timeout).await {
match send_attempt(&client, &sub, &event, &body, attempt_timeout, attempt).await {
Ok(status) if (200..300).contains(&status) => {
record_attempt(&event.tenant_id, &event.event_type, "success");
tracing::debug!(
Expand Down Expand Up @@ -595,8 +600,10 @@ async fn send_attempt(
event: &OutboundEvent,
body: &[u8],
attempt_timeout: Duration,
attempt: u32,
) -> Result<u16, reqwest::Error> {
let timestamp_ms = event.created_at.timestamp_millis();
let delivery_id = ulid::Ulid::new().to_string();
let signing_input = build_signing_input(timestamp_ms, body);
let signature_header = build_signature_header(&signing_input, sub);

Expand All @@ -606,6 +613,9 @@ async fn send_attempt(
.header("Content-Type", "application/json")
.header("Sbproxy-Event-Id", &event.event_id)
.header("Sbproxy-Event-Type", &event.event_type)
.header("Sbproxy-Subscription-Id", &sub.subscription_id)
.header("Sbproxy-Delivery-Id", delivery_id)
.header("Sbproxy-Attempt", attempt.to_string())
.header("Sbproxy-Tenant", &event.tenant_id)
.header("Sbproxy-Timestamp", timestamp_ms.to_string())
.header("Sbproxy-Signature", signature_header);
Expand Down Expand Up @@ -901,6 +911,7 @@ mod tests {
let store = Arc::new(InMemoryStore::new());
store.add_subscription(Subscription {
tenant_id: "tenant_42".to_string(),
subscription_id: "sub_ed25519".to_string(),
url: server.url(),
event_types: vec!["wallet.low_balance".to_string()],
signing_key: SigningKey::Ed25519 {
Expand Down Expand Up @@ -934,6 +945,10 @@ mod tests {
header_value(req, "sbproxy-event-type").unwrap(),
"wallet.low_balance"
);
assert_eq!(
header_value(req, "sbproxy-subscription-id").unwrap(),
"sub_ed25519"
);
assert_eq!(header_value(req, "sbproxy-tenant").unwrap(), "tenant_42");

let key = VerificationKey::Ed25519 {
Expand All @@ -954,6 +969,7 @@ mod tests {
let store = Arc::new(InMemoryStore::new());
store.add_subscription(Subscription {
tenant_id: "tenant_42".to_string(),
subscription_id: "sub_hmac".to_string(),
url: server.url(),
event_types: vec!["wallet.*".to_string()],
signing_key: SigningKey::HmacSha256 {
Expand Down Expand Up @@ -1007,6 +1023,7 @@ mod tests {
let store = Arc::new(InMemoryStore::new());
store.add_subscription(Subscription {
tenant_id: "tenant_42".to_string(),
subscription_id: "sub_dual_key".to_string(),
url: server.url(),
event_types: vec!["*".to_string()],
signing_key: SigningKey::Ed25519 {
Expand Down Expand Up @@ -1067,6 +1084,7 @@ mod tests {
let store = Arc::new(InMemoryStore::new());
store.add_subscription(Subscription {
tenant_id: "tenant_42".to_string(),
subscription_id: "sub_retry".to_string(),
url: server.url(),
event_types: vec!["*".to_string()],
signing_key: SigningKey::Ed25519 {
Expand Down Expand Up @@ -1098,6 +1116,27 @@ mod tests {
"expected 6 attempts (1 initial + 5 retries), got {}",
captured.len()
);
let mut delivery_ids = std::collections::HashSet::new();
for (idx, req) in captured.iter().enumerate() {
let expected_attempt = (idx + 1).to_string();
assert_eq!(
header_value(req, "sbproxy-subscription-id"),
Some("sub_retry")
);
assert_eq!(
header_value(req, "sbproxy-attempt"),
Some(expected_attempt.as_str())
);
let delivery_id = header_value(req, "sbproxy-delivery-id")
.expect("each attempt must carry a delivery id");
delivery_id
.parse::<ulid::Ulid>()
.expect("delivery id must be a ULID");
assert!(
delivery_ids.insert(delivery_id.to_string()),
"delivery id must rotate per attempt"
);
}

// Deadletter must hold exactly one entry.
assert_eq!(store.deadletter_len(), 1);
Expand Down
Loading