Skip to content

Commit 422fb7d

Browse files
committed
feat(webhooks): add event dispatch signatures retries and delivery logs
- Integrates: webhook event/signature/queue pipeline via app/server/src/modules/webhooks/{events.ts,signature.ts,queue.ts,emitter.ts}, worker/plugin wiring in app/server/src/plugins/webhook-queue.ts, and server bootstrap/type decorations updates. - Security/Behavior: signs outbound payloads with timestamped HMAC headers, enforces 5s delivery timeout, records each attempt in delivery logs, and requeues failures using the configured backoff ladder while emitting events from core auth actions. - Validation: added webhook signature, queue, and emitter coverage in app/server/test/webhook-signature.test.ts, app/server/test/webhook-queue.test.ts, and app/server/test/webhook-emitter.test.ts; runtime execution remains pending because dependencies are not installed locally.
1 parent a54fdbf commit 422fb7d

16 files changed

Lines changed: 796 additions & 4 deletions

app/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export async function buildServer(): Promise<FastifyInstance> {
2525
await server.register(import('./plugins/cache'))
2626
await server.register(import('./plugins/email'))
2727
await server.register(import('./plugins/email-queue'))
28+
await server.register(import('./plugins/webhook-queue'))
2829
await server.register(import('./plugins/auth'))
2930

3031
await server.register(import('./routes/auth'), { prefix: '/v1/auth' })

app/server/src/modules/auth/refresh.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,23 @@ async function handleReuseDetection(
8989
},
9090
})
9191

92+
if (typeof request.server.emitWebhookEvent === 'function') {
93+
try {
94+
await request.server.emitWebhookEvent({
95+
type: 'session.compromised',
96+
projectId: marker.projectId,
97+
data: {
98+
user: {
99+
id: marker.userId,
100+
},
101+
reason: 'refresh_token_reuse_detected',
102+
},
103+
})
104+
} catch (error) {
105+
request.log.warn({ error }, 'Failed to enqueue compromised session webhook event')
106+
}
107+
}
108+
92109
throw Errors.TOKEN_REUSE_DETECTED()
93110
}
94111

app/server/src/modules/auth/signin.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,46 @@ export async function signinHandler(
109109

110110
setRefreshTokenCookie(reply, refresh.token, config.nodeEnv === 'production')
111111

112+
if (typeof request.server.emitWebhookEvent === 'function') {
113+
try {
114+
await request.server.emitWebhookEvent({
115+
type: 'user.signed_in',
116+
projectId,
117+
data: {
118+
user: {
119+
id: user.id,
120+
email: user.email,
121+
displayName: user.displayName,
122+
},
123+
session: {
124+
id: session.id,
125+
ipAddress: session.ipAddress,
126+
userAgent: session.userAgent,
127+
},
128+
},
129+
})
130+
131+
await request.server.emitWebhookEvent({
132+
type: 'session.created',
133+
projectId,
134+
data: {
135+
user: {
136+
id: user.id,
137+
email: user.email,
138+
displayName: user.displayName,
139+
},
140+
session: {
141+
id: session.id,
142+
ipAddress: session.ipAddress,
143+
userAgent: session.userAgent,
144+
},
145+
},
146+
})
147+
} catch (error) {
148+
request.log.warn({ error }, 'Failed to enqueue signin webhook events')
149+
}
150+
}
151+
112152
reply.send({
113153
data: {
114154
user,

app/server/src/modules/auth/signout.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,44 @@ export async function signoutHandler(
3030

3131
if (session) {
3232
await request.server.dbAdapter.revokeSession(session.tokenHash)
33+
34+
if (typeof request.server.emitWebhookEvent === 'function') {
35+
try {
36+
await request.server.emitWebhookEvent({
37+
type: 'user.signed_out',
38+
projectId: payload.pid,
39+
data: {
40+
user: {
41+
id: payload.sub,
42+
email: payload.email,
43+
},
44+
session: {
45+
id: session.id,
46+
ipAddress: session.ipAddress,
47+
userAgent: session.userAgent,
48+
},
49+
},
50+
})
51+
52+
await request.server.emitWebhookEvent({
53+
type: 'session.revoked',
54+
projectId: payload.pid,
55+
data: {
56+
user: {
57+
id: payload.sub,
58+
email: payload.email,
59+
},
60+
session: {
61+
id: session.id,
62+
ipAddress: session.ipAddress,
63+
userAgent: session.userAgent,
64+
},
65+
},
66+
})
67+
} catch (error) {
68+
request.log.warn({ error }, 'Failed to enqueue signout webhook events')
69+
}
70+
}
3371
}
3472

3573
clearRefreshTokenCookie(reply, config.nodeEnv === 'production')

app/server/src/modules/auth/signup.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,41 @@ export async function signupHandler(
8888

8989
setRefreshTokenCookie(reply, refresh.token, config.nodeEnv === 'production')
9090

91+
if (typeof request.server.emitWebhookEvent === 'function') {
92+
try {
93+
await request.server.emitWebhookEvent({
94+
type: 'user.created',
95+
projectId,
96+
data: {
97+
user: {
98+
id: user.id,
99+
email: user.email,
100+
displayName: user.displayName,
101+
},
102+
},
103+
})
104+
105+
await request.server.emitWebhookEvent({
106+
type: 'session.created',
107+
projectId,
108+
data: {
109+
user: {
110+
id: user.id,
111+
email: user.email,
112+
displayName: user.displayName,
113+
},
114+
session: {
115+
id: session.id,
116+
ipAddress: session.ipAddress,
117+
userAgent: session.userAgent,
118+
},
119+
},
120+
})
121+
} catch (error) {
122+
request.log.warn({ error }, 'Failed to enqueue signup webhook events')
123+
}
124+
}
125+
91126
reply.code(201).send({
92127
data: {
93128
user,

app/server/src/modules/oauth/disconnect.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,23 @@ export async function oauthDisconnectHandler(
4545

4646
await request.server.dbAdapter.deleteOAuthAccount(auth.sub, providerId)
4747

48+
if (typeof request.server.emitWebhookEvent === 'function') {
49+
try {
50+
await request.server.emitWebhookEvent({
51+
type: 'oauth.disconnected',
52+
projectId: auth.pid,
53+
data: {
54+
user: {
55+
id: auth.sub,
56+
},
57+
provider: providerId,
58+
},
59+
})
60+
} catch (error) {
61+
request.log.warn({ error }, 'Failed to enqueue oauth disconnected webhook event')
62+
}
63+
}
64+
4865
await request.server.dbAdapter.createAuditLog({
4966
projectId: auth.pid,
5067
userId: auth.sub,

app/server/src/modules/oauth/handlers.ts

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,16 @@ async function resolveOAuthUser(
7575
avatarUrl: string | null
7676
rawProfile: Record<string, unknown>
7777
},
78-
) {
78+
): Promise<{
79+
user: {
80+
id: string
81+
email: string
82+
emailVerified: boolean
83+
displayName: string | null
84+
bannedAt: Date | null
85+
}
86+
linkedAccountCreated: boolean
87+
}> {
7988
const existingAccount = await request.server.dbAdapter.getOAuthAccount(
8089
data.projectId,
8190
data.provider,
@@ -89,7 +98,10 @@ async function resolveOAuthUser(
8998
throw Errors.UNAUTHORIZED()
9099
}
91100

92-
return existingUser
101+
return {
102+
user: existingUser,
103+
linkedAccountCreated: false,
104+
}
93105
}
94106

95107
if (!data.email) {
@@ -109,6 +121,8 @@ async function resolveOAuthUser(
109121
})
110122
}
111123

124+
let linkedAccountCreated = false
125+
112126
try {
113127
await request.server.dbAdapter.createOAuthAccount({
114128
userId: user.id,
@@ -117,6 +131,7 @@ async function resolveOAuthUser(
117131
providerUserId: data.providerUserId,
118132
rawProfile: data.rawProfile,
119133
})
134+
linkedAccountCreated = true
120135
} catch {
121136
const account = await request.server.dbAdapter.getOAuthAccount(
122137
data.projectId,
@@ -129,7 +144,10 @@ async function resolveOAuthUser(
129144
}
130145
}
131146

132-
return user
147+
return {
148+
user,
149+
linkedAccountCreated,
150+
}
133151
}
134152

135153
export async function oauthBeginHandler(
@@ -187,7 +205,7 @@ export async function oauthCallbackHandler(
187205
const tokenSet = await exchangeOAuthCode(providerConfig, query.code)
188206
const profile = await fetchOAuthProfile(providerConfig.id, providerConfig.userInfoUrl, tokenSet.accessToken)
189207

190-
const user = await resolveOAuthUser(request, {
208+
const oauthResult = await resolveOAuthUser(request, {
191209
projectId: state.projectId,
192210
provider: providerConfig.id,
193211
providerUserId: profile.providerUserId,
@@ -197,6 +215,7 @@ export async function oauthCallbackHandler(
197215
avatarUrl: profile.avatarUrl,
198216
rawProfile: profile.rawProfile,
199217
})
218+
const user = oauthResult.user
200219

201220
if (user.bannedAt) {
202221
throw Errors.ACCOUNT_BANNED()
@@ -231,6 +250,60 @@ export async function oauthCallbackHandler(
231250

232251
setRefreshTokenCookie(reply, refresh.token, config.nodeEnv === 'production')
233252

253+
if (typeof request.server.emitWebhookEvent === 'function') {
254+
try {
255+
if (oauthResult.linkedAccountCreated) {
256+
await request.server.emitWebhookEvent({
257+
type: 'oauth.connected',
258+
projectId: state.projectId,
259+
data: {
260+
user: {
261+
id: user.id,
262+
email: user.email,
263+
},
264+
provider: providerConfig.id,
265+
},
266+
})
267+
}
268+
269+
await request.server.emitWebhookEvent({
270+
type: 'user.signed_in',
271+
projectId: state.projectId,
272+
data: {
273+
user: {
274+
id: user.id,
275+
email: user.email,
276+
displayName: user.displayName,
277+
},
278+
session: {
279+
id: session.id,
280+
ipAddress: session.ipAddress,
281+
userAgent: session.userAgent,
282+
},
283+
},
284+
})
285+
286+
await request.server.emitWebhookEvent({
287+
type: 'session.created',
288+
projectId: state.projectId,
289+
data: {
290+
user: {
291+
id: user.id,
292+
email: user.email,
293+
displayName: user.displayName,
294+
},
295+
session: {
296+
id: session.id,
297+
ipAddress: session.ipAddress,
298+
userAgent: session.userAgent,
299+
},
300+
},
301+
})
302+
} catch (error) {
303+
request.log.warn({ error }, 'Failed to enqueue OAuth webhook events')
304+
}
305+
}
306+
234307
const location = appendHash(
235308
appendQuery(state.redirectUrl, {
236309
provider: providerConfig.id,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { FastifyInstance } from 'fastify'
2+
3+
import {
4+
createWebhookEventPayload,
5+
type WebhookEventInput,
6+
} from './events'
7+
8+
export async function emitWebhookEvent(
9+
server: FastifyInstance,
10+
eventInput: WebhookEventInput,
11+
): Promise<void> {
12+
const endpoints = await server.dbAdapter.listWebhookEndpoints(eventInput.projectId)
13+
14+
if (endpoints.length === 0) {
15+
return
16+
}
17+
18+
const payload = createWebhookEventPayload(eventInput)
19+
20+
const matchingEndpoints = endpoints.filter(
21+
(endpoint) => endpoint.enabled && endpoint.events.includes(eventInput.type),
22+
)
23+
24+
await Promise.all(
25+
matchingEndpoints.map(async (endpoint) => {
26+
await server.webhookQueue.add(
27+
'deliver-webhook',
28+
{
29+
endpointId: endpoint.id,
30+
url: endpoint.url,
31+
secret: endpoint.secret,
32+
event: eventInput.type,
33+
payload,
34+
attempt: 1,
35+
},
36+
{
37+
delay: 0,
38+
removeOnComplete: true,
39+
removeOnFail: true,
40+
attempts: 1,
41+
},
42+
)
43+
}),
44+
)
45+
}

0 commit comments

Comments
 (0)