Skip to content

Commit e82f1fb

Browse files
vvillait88claude
andcommitted
feat(sdk): add verify_webhook_signature + AgentScoreError.status alias
Closes audit items #12, #17. verify_webhook_signature: generic HMAC-SHA256 webhook signature verifier, Stripe-pattern (`t=<unix>,v1=<hex>` header). Useful both when AgentScore eventually ships outbound webhooks and as a generic helper for merchants verifying any HMAC-signed webhook source. Returns VerifyWebhookSignatureResult with `reason` set on failure (no_signatures / no_timestamp / timestamp_too_old / timestamp_in_future / signature_mismatch / malformed_header) so callers can differentiate transient vs permanent failures. Uses hmac.compare_digest for constant-time comparison. AgentScoreError.status property mirrors .status_code — polyglot codebases can use err.status regardless of which SDK raised the error. Both attributes return the same int. 11 new tests; coverage holds at 97.30% (Tier A). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3a052de commit e82f1fb

4 files changed

Lines changed: 269 additions & 0 deletions

File tree

agentscore/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
WalletAuthRequiresSigningBody,
3131
WalletSignerMismatchBody,
3232
)
33+
from agentscore.webhooks import VerifyWebhookSignatureResult, verify_webhook_signature
3334

3435
__version__ = _pkg_version("agentscore-py")
3536

@@ -60,7 +61,9 @@
6061
"SessionCreateResponse",
6162
"SessionPollResponse",
6263
"VerificationLevel",
64+
"VerifyWebhookSignatureResult",
6365
"WalletAuthRequiresSigningBody",
6466
"WalletSignerMismatchBody",
6567
"__version__",
68+
"verify_webhook_signature",
6669
]

agentscore/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,11 @@ def __init__(self, code: str, message: str, status_code: int):
33
super().__init__(message)
44
self.code = code
55
self.status_code = status_code
6+
7+
@property
8+
def status(self) -> int:
9+
"""Alias for ``status_code`` — parity with node-sdk's attribute name.
10+
11+
Polyglot codebases can use ``err.status`` regardless of which SDK raised the error.
12+
"""
13+
return self.status_code

agentscore/webhooks.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""Webhook signature verification — HMAC-SHA256, Stripe-pattern.
2+
3+
Use this when AgentScore (or any service that signs outbound webhooks with this
4+
convention) sends a webhook to your endpoint. Validates the
5+
``X-AgentScore-Signature`` (or compatible) header before trusting the payload.
6+
7+
Generic enough to cover any HMAC-signed webhook source: pass the right secret + header
8+
name. Tolerant of multiple signature versions in the same header
9+
(``t=...,v1=...`` style).
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import hashlib
15+
import hmac
16+
import time
17+
from dataclasses import dataclass
18+
from typing import Literal
19+
20+
21+
@dataclass(frozen=True)
22+
class VerifyWebhookSignatureResult:
23+
"""Result of :func:`verify_webhook_signature`."""
24+
25+
valid: bool
26+
reason: Literal[
27+
"no_signatures",
28+
"no_timestamp",
29+
"timestamp_too_old",
30+
"timestamp_in_future",
31+
"signature_mismatch",
32+
"malformed_header",
33+
] | None = None
34+
35+
36+
def verify_webhook_signature(
37+
payload: str | bytes,
38+
signature_header: str,
39+
secret: str,
40+
tolerance_seconds: int = 300,
41+
timestamp_key: str = "t",
42+
signature_key: str = "v1",
43+
) -> VerifyWebhookSignatureResult:
44+
"""Verify an HMAC-SHA256 signed webhook signature, Stripe-compatible.
45+
46+
Header format: ``t=<unix_seconds>,v1=<hex_hmac>``. The signed payload is
47+
``f"{timestamp}.{raw_body}"``. Returns a result with ``reason`` set on failure so
48+
callers can differentiate transient (timestamp drift) from permanent (mismatch).
49+
50+
Args:
51+
payload: Raw request body. MUST be the unparsed body — even one byte of
52+
re-serialization breaks the signature. Capture before any JSON parse.
53+
signature_header: Value of the signature header from the incoming request.
54+
secret: Shared secret the sender uses to sign.
55+
tolerance_seconds: Tolerance in seconds for timestamp-replay protection.
56+
Default 300 (5 min) per Stripe convention. Set to 0 to disable.
57+
timestamp_key: Override the timestamp parameter name. Default ``"t"``.
58+
signature_key: Override the signature parameter name. Default ``"v1"``.
59+
60+
Example::
61+
62+
from flask import request
63+
from agentscore.webhooks import verify_webhook_signature
64+
65+
@app.post("/webhooks/agentscore")
66+
def handle_webhook():
67+
result = verify_webhook_signature(
68+
payload=request.get_data(), # raw bytes — DO NOT parse JSON first
69+
signature_header=request.headers.get("X-AgentScore-Signature", ""),
70+
secret=os.environ["AGENTSCORE_WEBHOOK_SECRET"],
71+
)
72+
if not result.valid:
73+
return {"error": result.reason}, 400
74+
event = request.get_json(force=True)
75+
# ... handle event ...
76+
"""
77+
parts = [p.strip() for p in signature_header.split(",") if p.strip()]
78+
if not parts:
79+
return VerifyWebhookSignatureResult(valid=False, reason="no_signatures")
80+
81+
params: dict[str, list[str]] = {}
82+
for p in parts:
83+
if "=" not in p:
84+
return VerifyWebhookSignatureResult(valid=False, reason="malformed_header")
85+
key, _, value = p.partition("=")
86+
params.setdefault(key, []).append(value)
87+
88+
timestamp_str = params.get(timestamp_key, [None])[0]
89+
if tolerance_seconds > 0:
90+
if not timestamp_str:
91+
return VerifyWebhookSignatureResult(valid=False, reason="no_timestamp")
92+
try:
93+
ts = int(timestamp_str)
94+
except ValueError:
95+
return VerifyWebhookSignatureResult(valid=False, reason="no_timestamp")
96+
now_sec = int(time.time())
97+
if ts < now_sec - tolerance_seconds:
98+
return VerifyWebhookSignatureResult(valid=False, reason="timestamp_too_old")
99+
if ts > now_sec + tolerance_seconds:
100+
return VerifyWebhookSignatureResult(valid=False, reason="timestamp_in_future")
101+
102+
signatures = params.get(signature_key, [])
103+
if not signatures:
104+
return VerifyWebhookSignatureResult(valid=False, reason="no_signatures")
105+
106+
payload_bytes = payload.encode("utf-8") if isinstance(payload, str) else payload
107+
signed_payload = (
108+
f"{timestamp_str}.".encode() + payload_bytes if timestamp_str else payload_bytes
109+
)
110+
111+
expected_hex = hmac.new(secret.encode("utf-8"), signed_payload, hashlib.sha256).hexdigest()
112+
expected_bytes = bytes.fromhex(expected_hex)
113+
114+
for sig_hex in signatures:
115+
try:
116+
actual_bytes = bytes.fromhex(sig_hex)
117+
except ValueError:
118+
continue
119+
if len(actual_bytes) != len(expected_bytes):
120+
continue
121+
if hmac.compare_digest(actual_bytes, expected_bytes):
122+
return VerifyWebhookSignatureResult(valid=True)
123+
124+
return VerifyWebhookSignatureResult(valid=False, reason="signature_mismatch")
125+
126+
127+
__all__ = ["VerifyWebhookSignatureResult", "verify_webhook_signature"]

tests/test_webhooks.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""Tests for verify_webhook_signature."""
2+
3+
import hashlib
4+
import hmac
5+
import time
6+
7+
from agentscore import verify_webhook_signature
8+
from agentscore.errors import AgentScoreError
9+
10+
SECRET = "whsec_testsecret"
11+
12+
13+
def _sign(payload: str, ts: int, secret: str = SECRET) -> str:
14+
signed = f"{ts}.{payload}".encode()
15+
return hmac.new(secret.encode("utf-8"), signed, hashlib.sha256).hexdigest()
16+
17+
18+
def test_accepts_valid_signature_with_current_timestamp():
19+
payload = '{"event":"test"}'
20+
ts = int(time.time())
21+
sig = _sign(payload, ts)
22+
result = verify_webhook_signature(
23+
payload=payload,
24+
signature_header=f"t={ts},v1={sig}",
25+
secret=SECRET,
26+
)
27+
assert result.valid is True
28+
29+
30+
def test_accepts_bytes_payload():
31+
payload = b'{"event":"test"}'
32+
ts = int(time.time())
33+
sig = _sign(payload.decode("utf-8"), ts)
34+
result = verify_webhook_signature(
35+
payload=payload,
36+
signature_header=f"t={ts},v1={sig}",
37+
secret=SECRET,
38+
)
39+
assert result.valid is True
40+
41+
42+
def test_rejects_timestamp_older_than_tolerance():
43+
payload = "{}"
44+
ts = int(time.time()) - 600
45+
sig = _sign(payload, ts)
46+
result = verify_webhook_signature(
47+
payload=payload,
48+
signature_header=f"t={ts},v1={sig}",
49+
secret=SECRET,
50+
tolerance_seconds=300,
51+
)
52+
assert result.valid is False
53+
assert result.reason == "timestamp_too_old"
54+
55+
56+
def test_rejects_timestamp_in_future():
57+
payload = "{}"
58+
ts = int(time.time()) + 600
59+
sig = _sign(payload, ts)
60+
result = verify_webhook_signature(
61+
payload=payload,
62+
signature_header=f"t={ts},v1={sig}",
63+
secret=SECRET,
64+
)
65+
assert result.valid is False
66+
assert result.reason == "timestamp_in_future"
67+
68+
69+
def test_rejects_signature_mismatch():
70+
payload = "{}"
71+
ts = int(time.time())
72+
sig = _sign(payload, ts, secret="wrong_secret")
73+
result = verify_webhook_signature(
74+
payload=payload,
75+
signature_header=f"t={ts},v1={sig}",
76+
secret=SECRET,
77+
)
78+
assert result.valid is False
79+
assert result.reason == "signature_mismatch"
80+
81+
82+
def test_no_signatures_for_empty_header():
83+
result = verify_webhook_signature(payload="{}", signature_header="", secret=SECRET)
84+
assert result.valid is False
85+
assert result.reason == "no_signatures"
86+
87+
88+
def test_malformed_header():
89+
result = verify_webhook_signature(payload="{}", signature_header="just_a_value", secret=SECRET)
90+
assert result.valid is False
91+
assert result.reason == "malformed_header"
92+
93+
94+
def test_no_timestamp_when_missing_and_tolerance_positive():
95+
payload = "{}"
96+
sig = hmac.new(SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest()
97+
result = verify_webhook_signature(payload=payload, signature_header=f"v1={sig}", secret=SECRET)
98+
assert result.valid is False
99+
assert result.reason == "no_timestamp"
100+
101+
102+
def test_tolerance_zero_skips_timestamp_check():
103+
payload = '{"event":"test"}'
104+
sig = hmac.new(SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest()
105+
result = verify_webhook_signature(
106+
payload=payload,
107+
signature_header=f"v1={sig}",
108+
secret=SECRET,
109+
tolerance_seconds=0,
110+
)
111+
assert result.valid is True
112+
113+
114+
def test_multiple_signatures_any_match():
115+
payload = "{}"
116+
ts = int(time.time())
117+
sig_good = _sign(payload, ts)
118+
sig_bad = hmac.new(b"wrong", f"{ts}.{payload}".encode(), hashlib.sha256).hexdigest()
119+
result = verify_webhook_signature(
120+
payload=payload,
121+
signature_header=f"t={ts},v1={sig_bad},v1={sig_good}",
122+
secret=SECRET,
123+
)
124+
assert result.valid is True
125+
126+
127+
def test_status_alias_matches_status_code():
128+
"""AgentScoreError.status property mirrors .status_code (parity with node-sdk)."""
129+
err = AgentScoreError(code="rate_limited", message="too many", status_code=429)
130+
assert err.status == 429
131+
assert err.status == err.status_code

0 commit comments

Comments
 (0)