forked from openai/openai-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhooks.py
More file actions
210 lines (172 loc) · 7.64 KB
/
webhooks.py
File metadata and controls
210 lines (172 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
from __future__ import annotations
import hmac
import json
import time
import base64
import hashlib
from typing import cast
from .._types import HeadersLike
from .._utils import get_required_header
from .._models import construct_type
from .._resource import SyncAPIResource, AsyncAPIResource
from .._exceptions import InvalidWebhookSignatureError
from ..types.webhooks.unwrap_webhook_event import UnwrapWebhookEvent
__all__ = ["Webhooks", "AsyncWebhooks"]
class Webhooks(SyncAPIResource):
def unwrap(
self,
payload: str | bytes,
headers: HeadersLike,
*,
secret: str | None = None,
) -> UnwrapWebhookEvent:
"""Validates that the given payload was sent by OpenAI and parses the payload."""
if secret is None:
secret = self._client.webhook_secret
self.verify_signature(payload=payload, headers=headers, secret=secret)
return cast(
UnwrapWebhookEvent,
construct_type(
type_=UnwrapWebhookEvent,
value=json.loads(payload),
),
)
def verify_signature(
self,
payload: str | bytes,
headers: HeadersLike,
*,
secret: str | None = None,
tolerance: int = 300,
) -> None:
"""Validates whether or not the webhook payload was sent by OpenAI.
Args:
payload: The webhook payload
headers: The webhook headers
secret: The webhook secret (optional, will use client secret if not provided)
tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
"""
if secret is None:
secret = self._client.webhook_secret
if secret is None:
raise ValueError(
"The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
"on the client class, OpenAI(webhook_secret='123'), or passed to this function"
)
signature_header = get_required_header(headers, "webhook-signature")
timestamp = get_required_header(headers, "webhook-timestamp")
webhook_id = get_required_header(headers, "webhook-id")
# Validate timestamp to prevent replay attacks
try:
timestamp_seconds = int(timestamp)
except ValueError:
raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
now = int(time.time())
if now - timestamp_seconds > tolerance:
raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
if timestamp_seconds > now + tolerance:
raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
# Extract signatures from v1,<base64> format
# The signature header can have multiple values, separated by spaces.
# Each value is in the format v1,<base64>. We should accept if any match.
signatures: list[str] = []
for part in signature_header.split():
if part.startswith("v1,"):
signatures.append(part[3:])
else:
signatures.append(part)
# Decode the secret if it starts with whsec_
if secret.startswith("whsec_"):
decoded_secret = base64.b64decode(secret[6:])
else:
decoded_secret = secret.encode()
body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
# Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
signed_payload = f"{webhook_id}.{timestamp}.{body}"
expected_signature = base64.b64encode(
hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
).decode()
# Accept if any signature matches
if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
raise InvalidWebhookSignatureError(
"The given webhook signature does not match the expected signature"
) from None
class AsyncWebhooks(AsyncAPIResource):
def unwrap(
self,
payload: str | bytes,
headers: HeadersLike,
*,
secret: str | None = None,
) -> UnwrapWebhookEvent:
"""Validates that the given payload was sent by OpenAI and parses the payload."""
if secret is None:
secret = self._client.webhook_secret
self.verify_signature(payload=payload, headers=headers, secret=secret)
body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
return cast(
UnwrapWebhookEvent,
construct_type(
type_=UnwrapWebhookEvent,
value=json.loads(body),
),
)
def verify_signature(
self,
payload: str | bytes,
headers: HeadersLike,
*,
secret: str | None = None,
tolerance: int = 300,
) -> None:
"""Validates whether or not the webhook payload was sent by OpenAI.
Args:
payload: The webhook payload
headers: The webhook headers
secret: The webhook secret (optional, will use client secret if not provided)
tolerance: Maximum age of the webhook in seconds (default: 300 = 5 minutes)
"""
if secret is None:
secret = self._client.webhook_secret
if secret is None:
raise ValueError(
"The webhook secret must either be set using the env var, OPENAI_WEBHOOK_SECRET, "
"on the client class, OpenAI(webhook_secret='123'), or passed to this function"
) from None
signature_header = get_required_header(headers, "webhook-signature")
timestamp = get_required_header(headers, "webhook-timestamp")
webhook_id = get_required_header(headers, "webhook-id")
# Validate timestamp to prevent replay attacks
try:
timestamp_seconds = int(timestamp)
except ValueError:
raise InvalidWebhookSignatureError("Invalid webhook timestamp format") from None
now = int(time.time())
if now - timestamp_seconds > tolerance:
raise InvalidWebhookSignatureError("Webhook timestamp is too old") from None
if timestamp_seconds > now + tolerance:
raise InvalidWebhookSignatureError("Webhook timestamp is too new") from None
# Extract signatures from v1,<base64> format
# The signature header can have multiple values, separated by spaces.
# Each value is in the format v1,<base64>. We should accept if any match.
signatures: list[str] = []
for part in signature_header.split():
if part.startswith("v1,"):
signatures.append(part[3:])
else:
signatures.append(part)
# Decode the secret if it starts with whsec_
if secret.startswith("whsec_"):
decoded_secret = base64.b64decode(secret[6:])
else:
decoded_secret = secret.encode()
body = payload.decode("utf-8") if isinstance(payload, bytes) else payload
# Prepare the signed payload (OpenAI uses webhookId.timestamp.payload format)
signed_payload = f"{webhook_id}.{timestamp}.{body}"
expected_signature = base64.b64encode(
hmac.new(decoded_secret, signed_payload.encode(), hashlib.sha256).digest()
).decode()
# Accept if any signature matches
if not any(hmac.compare_digest(expected_signature, sig) for sig in signatures):
raise InvalidWebhookSignatureError("The given webhook signature does not match the expected signature")