diff --git a/package.json b/package.json index 63cedd5..683070f 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,8 @@ "@aws-sdk/s3-request-presigner": "^3.993.0", "@nestjs-modules/ioredis": "^2.0.2", "@nestjs/cache-manager": "^2.0.0", + "@nestjs/bull": "^10.0.1", + "@nestjs/bullmq": "^10.2.3", "@nestjs/common": "^10.0.0", "@nestjs/config": "^3.0.0", "@nestjs/core": "^10.0.0", @@ -54,6 +56,8 @@ "amqplib": "^0.10.3", "axios": "^1.13.5", "bcrypt": "^6.0.0", + "bull": "^4.12.2", + "bullmq": "^5.67.1", "cache-manager": "^5.0.0", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", diff --git a/src/app.module.ts b/src/app.module.ts index 4c2a6e0..7afc3eb 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,5 @@ import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { TypeOrmModule } from '@nestjs/typeorm'; import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler'; @@ -50,6 +51,7 @@ import { EnergyModule } from './energy/energy.module'; import { SkillRatingModule } from './skill-rating/skill-rating.module'; import { WalletAuthModule } from './auth/wallet-auth.module'; import { XpModule } from './xp/xp.module'; +import { WebhooksModule } from './webhooks/webhooks.module'; import { PlayerEventsModule } from './player-events/player-events.module'; import { AccountModule } from './account/account.module'; @@ -87,6 +89,17 @@ import { AccountModule } from './account/account.module'; inject: [ConfigService], }), + BullModule.forRootAsync({ + useFactory: (configService: ConfigService) => ({ + connection: { + host: configService.get('REDIS_HOST', 'localhost'), + port: configService.get('REDIS_PORT', 6379), + password: configService.get('REDIS_PASSWORD') || undefined, + }, + }), + inject: [ConfigService], + }), + // Message broker RabbitMQModule, @@ -142,6 +155,7 @@ import { AccountModule } from './account/account.module'; SkillRatingModule, WalletAuthModule, XpModule, + WebhooksModule, AccountModule, ], controllers: [AppController], diff --git a/src/auth/auth.service.ts b/src/auth/auth.service.ts index 045cb0d..83881df 100644 --- a/src/auth/auth.service.ts +++ b/src/auth/auth.service.ts @@ -1,4 +1,5 @@ import { Injectable, ConflictException, UnauthorizedException, BadRequestException } from "@nestjs/common" +import { EventEmitter2 } from "@nestjs/event-emitter" import { InjectRepository } from "@nestjs/typeorm" import { Repository } from "typeorm" import type { DeepPartial } from "typeorm" @@ -18,6 +19,7 @@ import type { VerifyEmailDto } from "./dto/verify-email.dto" import type { JwtPayload } from "./interfaces/jwt-payload.interface" import { BCRYPT_SALT_ROUNDS, jwtConstants, UserRole } from "./constants" import { v4 as uuidv4 } from "uuid" +import { WEBHOOK_INTERNAL_EVENTS } from "../webhooks/webhook.constants" @Injectable() export class AuthService { @@ -31,6 +33,7 @@ export class AuthService { @InjectRepository(TwoFactorBackupCode) private backupCodesRepository: Repository, private jwtService: JwtService, + private eventEmitter: EventEmitter2, ) { } async hashPassword(password: string): Promise { @@ -94,6 +97,13 @@ export class AuthService { await this.usersRepository.save(user) + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.userRegistered, { + userId: user.id, + email: user.email, + role: role.name, + registeredAt: new Date().toISOString(), + }) + // TODO: Send verification email (mocked for now) console.log(`Verification email sent to ${user.email} with token: ${verificationToken}`) diff --git a/src/game-session/services/game-session.service.ts b/src/game-session/services/game-session.service.ts index ef9c6c3..c4c4293 100644 --- a/src/game-session/services/game-session.service.ts +++ b/src/game-session/services/game-session.service.ts @@ -1,10 +1,12 @@ // services/game-session.service.ts +import { EventEmitter2 } from '@nestjs/event-emitter'; import { Injectable, NotFoundException, ForbiddenException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, LessThan } from 'typeorm'; import { GameSession } from '../entities/game-session.entity'; import { PlayerEventsService } from '../../player-events/player-events.service'; import { PuzzleVersionService } from '../../puzzles/services/puzzle-version.service'; +import { WEBHOOK_INTERNAL_EVENTS } from '../../webhooks/webhook.constants'; import { CacheService } from '../../cache/services/cache.service'; const SUSPENDED_KEY = (id: string) => `session:suspended:${id}`; @@ -18,6 +20,7 @@ export class GameSessionService { private readonly sessionRepo: Repository, private readonly playerEventsService: PlayerEventsService, private readonly puzzleVersionService: PuzzleVersionService, + private readonly eventEmitter: EventEmitter2, private readonly cacheService: CacheService, ) {} @@ -89,6 +92,14 @@ export class GameSessionService { }); } + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.sessionEnded, { + sessionId: savedSession.id, + userId: savedSession.userId, + puzzleId: savedSession.puzzleId, + status: savedSession.status, + endedAt: savedSession.lastActiveAt.toISOString(), + }); + return savedSession; } diff --git a/src/leaderboard/leaderboard.service.ts b/src/leaderboard/leaderboard.service.ts index 22905e3..4f15ded 100644 --- a/src/leaderboard/leaderboard.service.ts +++ b/src/leaderboard/leaderboard.service.ts @@ -1,5 +1,6 @@ import { Injectable, Inject } from '@nestjs/common'; import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { Leaderboard } from './entities/leaderboard.entity'; @@ -8,6 +9,7 @@ import { CreateLeaderboardDto } from './dto/create-leaderboard.dto'; import { CreateLeaderboardEntryDto } from './dto/create-leaderboard-entry.dto'; import { Cache } from 'cache-manager'; import { AchievementsService } from '../achievements/achievements.service'; +import { WEBHOOK_INTERNAL_EVENTS } from '../webhooks/webhook.constants'; @Injectable() export class LeaderboardService { @@ -18,6 +20,7 @@ export class LeaderboardService { private entryRepository: Repository, @Inject(CACHE_MANAGER) private cacheManager: any, private achievementsService: AchievementsService, + private readonly eventEmitter: EventEmitter2, ) { } async createLeaderboard(dto: CreateLeaderboardDto): Promise { @@ -35,6 +38,15 @@ export class LeaderboardService { await this.cacheManager.reset(); // Award leaderboard achievements if criteria met await this.checkAndAwardLeaderboardAchievements(dto.leaderboardId, dto.userId); + + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.leaderboardUpdated, { + leaderboardId: dto.leaderboardId, + entryId: saved.id, + userId: dto.userId, + score: saved.score, + updatedAt: new Date().toISOString(), + }); + return saved; } @@ -131,6 +143,12 @@ export class LeaderboardService { // (If you want to delete, use delete instead of update above) // Invalidate cache await this.cacheManager.reset(); + + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.leaderboardUpdated, { + leaderboardId, + archivedAt: now.toISOString(), + reset: true, + }); } async getUserRankSummary(leaderboardId: number, userId: number) { diff --git a/src/multiplayer/gateways/multiplayer.gateway.ts b/src/multiplayer/gateways/multiplayer.gateway.ts index 2beae97..176704b 100644 --- a/src/multiplayer/gateways/multiplayer.gateway.ts +++ b/src/multiplayer/gateways/multiplayer.gateway.ts @@ -10,6 +10,7 @@ import { } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; import { Logger } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; import { MultiplayerService } from '../services/multiplayer.service'; import { RoomType, Player, RoomStatus, Spectator } from '../interfaces/multiplayer.interface'; import { ValidationService } from '../../game-engine/services/validation.service'; @@ -33,6 +34,7 @@ export class MultiplayerGateway implements OnGatewayInit, OnGatewayConnection, O private readonly validationService: ValidationService, private readonly leaderboardService: LeaderboardService, private readonly puzzlesService: PuzzlesService, + private readonly eventEmitter: EventEmitter2, private readonly spectatorService: SpectatorService, ) { } @@ -298,6 +300,14 @@ export class MultiplayerGateway implements OnGatewayInit, OnGatewayConnection, O puzzleCompleted: allPlayersSolved }); + this.eventEmitter.emit('puzzle.solved', { + userId: data.userId, + puzzleId: data.puzzleId, + score: result.score, + totalScore: player?.score, + }); + + if (room.type === RoomType.COMPETITIVE) { // Broadcast to spectators this.server.to(`${data.roomId}-spectators`).emit('collaborativeSolutionVerified', { userId: data.userId, diff --git a/src/puzzles/services/solution-submission.service.ts b/src/puzzles/services/solution-submission.service.ts index 9e5bf83..b307111 100644 --- a/src/puzzles/services/solution-submission.service.ts +++ b/src/puzzles/services/solution-submission.service.ts @@ -2,11 +2,12 @@ import { Injectable, Logger, ConflictException, - BadRequestException, + // BadRequestException, NotFoundException, HttpException, HttpStatus, } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, DataSource, MoreThan } from 'typeorm'; import * as crypto from 'crypto'; @@ -26,6 +27,7 @@ import { ViolationType } from '../../anti-cheat/constants'; import { XpService } from '../../xp/xp.service'; import { PlayerEventsService } from '../../player-events/player-events.service'; import { PuzzleVersionService } from './puzzle-version.service'; +import { WEBHOOK_INTERNAL_EVENTS } from '../../webhooks/webhook.constants'; // ──────────────────────────────────────────────────────────────────────────── // Constants @@ -56,6 +58,7 @@ export class SolutionSubmissionService { private readonly xpService: XpService, private readonly playerEventsService: PlayerEventsService, private readonly puzzleVersionService: PuzzleVersionService, + private readonly eventEmitter: EventEmitter2, ) { } // ────────────────────────────────────────────────────────────────────────── @@ -251,6 +254,15 @@ export class SolutionSubmissionService { }, }); + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.puzzleSolved, { + userId, + puzzleId, + solveTimeSeconds: timeTakenSeconds, + hintsUsed: dto.hintsUsed ?? 0, + scoreAwarded, + solvedAt: now.toISOString(), + }); + // Each achievement unlocked should produce an audit event for (const achievement of result.rewards?.achievements ?? []) { void this.playerEventsService.emitPlayerEvent({ @@ -264,6 +276,15 @@ export class SolutionSubmissionService { hintsUsed: dto.hintsUsed ?? 0, }, }); + + this.eventEmitter.emit(WEBHOOK_INTERNAL_EVENTS.achievementUnlocked, { + userId, + puzzleId, + achievementId: achievement, + unlockedAt: now.toISOString(), + hintsUsed: dto.hintsUsed ?? 0, + solveTimeSeconds: timeTakenSeconds, + }); } } diff --git a/src/webhooks/dto/create-webhook.dto.ts b/src/webhooks/dto/create-webhook.dto.ts new file mode 100644 index 0000000..1b53bbb --- /dev/null +++ b/src/webhooks/dto/create-webhook.dto.ts @@ -0,0 +1,28 @@ +import { + IsUrl, + IsArray, + IsString, + ArrayNotEmpty, + IsOptional, + IsIn, + MinLength, +} from 'class-validator'; +import { SUPPORTED_WEBHOOK_EVENTS, WebhookEvent } from '../webhook.constants'; + +export class CreateWebhookDto { + @IsUrl({ protocols: ['https'], require_protocol: true }, { message: 'URL must be a valid HTTPS URL' }) + url: string; + + @IsString() + @MinLength(1) + secret: string; + + @IsArray() + @ArrayNotEmpty() + @IsIn(SUPPORTED_WEBHOOK_EVENTS, { each: true }) + events: WebhookEvent[]; + + @IsOptional() + @IsString() + appId?: string; +} \ No newline at end of file diff --git a/src/webhooks/entities/webhook-delivery.entity.ts b/src/webhooks/entities/webhook-delivery.entity.ts new file mode 100644 index 0000000..7fe75f5 --- /dev/null +++ b/src/webhooks/entities/webhook-delivery.entity.ts @@ -0,0 +1,59 @@ +import { + Entity, + Column, + PrimaryGeneratedColumn, + CreateDateColumn, + ManyToOne, + JoinColumn, + Index, +} from 'typeorm'; +import { Webhook } from './webhook.entity'; + +export type DeliveryStatus = 'pending' | 'success' | 'failed' | 'retry'; + +@Entity('webhook_deliveries') +@Index(['webhookId', 'createdAt'], { order: { createdAt: 'DESC' } }) +export class WebhookDelivery { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column('uuid') + webhookId: string; + + @ManyToOne(() => Webhook, { onDelete: 'CASCADE' }) + @JoinColumn({ name: 'webhookId' }) + webhook: Webhook; + + @Column() + event: string; + + @Column('jsonb') + payload: any; + + @Column({ type: 'text', nullable: true }) + signature: string; + + @Column({ default: 'pending' }) + status: DeliveryStatus; + + @Column({ nullable: true }) + responseCode?: number; + + @Column({ type: 'text', nullable: true }) + responseBody?: string; + + @Column({ type: 'text', nullable: true }) + error?: string; + + @Column({ default: 0 }) + retryCount: number; + + @Column({ type: 'timestamptz', nullable: true }) + nextRetryAt?: Date; + + @CreateDateColumn() + createdAt: Date; + + @Column({ type: 'timestamptz', nullable: true }) + deliveredAt?: Date; +} \ No newline at end of file diff --git a/src/webhooks/entities/webhook.entity.ts b/src/webhooks/entities/webhook.entity.ts new file mode 100644 index 0000000..777e135 --- /dev/null +++ b/src/webhooks/entities/webhook.entity.ts @@ -0,0 +1,45 @@ +import { + Entity, + Column, + PrimaryGeneratedColumn, + CreateDateColumn, + UpdateDateColumn, + OneToMany, + Index, +} from 'typeorm'; +import { WebhookDelivery } from './webhook-delivery.entity'; +import { WebhookEvent } from '../webhook.constants'; + +@Entity('webhooks') +@Index(['userId', 'appId']) +export class Webhook { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column() + url: string; + + @Column({ select: false }) + secret: string; + + @Column('text', { array: true }) + events: WebhookEvent[]; + + @Column({ default: true }) + active: boolean; + + @Column({ type: 'uuid' }) + userId: string; + + @Column({ nullable: true }) + appId?: string; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; + + @OneToMany(() => WebhookDelivery, (delivery) => delivery.webhook) + deliveries: WebhookDelivery[]; +} \ No newline at end of file diff --git a/src/webhooks/processors/webhook-delivery.processor.spec.ts b/src/webhooks/processors/webhook-delivery.processor.spec.ts new file mode 100644 index 0000000..6f25b70 --- /dev/null +++ b/src/webhooks/processors/webhook-delivery.processor.spec.ts @@ -0,0 +1,74 @@ +import axios from 'axios'; +import { WebhookDeliveryProcessor } from './webhook-delivery.processor'; + +jest.mock('axios'); + +describe('WebhookDeliveryProcessor', () => { + const mockedAxios = axios as jest.Mocked; + + it('marks a delivery for retry on a transient failure', async () => { + const deliveryRepository = { + findOne: jest.fn().mockResolvedValue({ + id: 'delivery-1', + event: 'puzzle.solved', + payload: { puzzleId: 'puzzle-1' }, + signature: 'sha256=abc', + createdAt: new Date('2026-03-27T00:00:00.000Z'), + webhook: { url: 'https://example.com/hook' }, + }), + save: jest.fn().mockResolvedValue(undefined), + }; + const processor = new WebhookDeliveryProcessor(deliveryRepository as any); + + mockedAxios.post.mockRejectedValueOnce({ + message: 'timeout', + response: { status: 503, data: { error: 'unavailable' } }, + }); + + await expect( + processor.process({ data: { deliveryId: 'delivery-1' }, attemptsMade: 0 } as any), + ).rejects.toMatchObject({ message: 'timeout' }); + + expect(deliveryRepository.save).toHaveBeenCalledWith( + expect.objectContaining({ + status: 'retry', + retryCount: 1, + responseCode: 503, + responseBody: JSON.stringify({ error: 'unavailable' }), + }), + ); + }); + + it('marks a delivery as failed after the last retry', async () => { + const deliveryRepository = { + findOne: jest.fn().mockResolvedValue({ + id: 'delivery-2', + event: 'session.ended', + payload: { sessionId: 'session-1' }, + signature: 'sha256=abc', + createdAt: new Date('2026-03-27T00:00:00.000Z'), + webhook: { url: 'https://example.com/hook' }, + }), + save: jest.fn().mockResolvedValue(undefined), + }; + const processor = new WebhookDeliveryProcessor(deliveryRepository as any); + + mockedAxios.post.mockRejectedValueOnce({ + message: 'permanent failure', + response: { status: 500, data: 'boom' }, + }); + + await expect( + processor.process({ data: { deliveryId: 'delivery-2' }, attemptsMade: 3 } as any), + ).rejects.toMatchObject({ message: 'permanent failure' }); + + expect(deliveryRepository.save).toHaveBeenCalledWith( + expect.objectContaining({ + status: 'failed', + retryCount: 3, + responseCode: 500, + responseBody: 'boom', + }), + ); + }); +}); \ No newline at end of file diff --git a/src/webhooks/processors/webhook-delivery.processor.ts b/src/webhooks/processors/webhook-delivery.processor.ts new file mode 100644 index 0000000..cdc2ce8 --- /dev/null +++ b/src/webhooks/processors/webhook-delivery.processor.ts @@ -0,0 +1,104 @@ +import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable, Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { WebhookDelivery } from '../entities/webhook-delivery.entity'; +import axios, { AxiosError } from 'axios'; +import { + WEBHOOK_INITIAL_RETRY_DELAY_MS, + WEBHOOK_MAX_RETRIES, + WEBHOOK_QUEUE, +} from '../webhook.constants'; + +@Injectable() +@Processor(WEBHOOK_QUEUE) +export class WebhookDeliveryProcessor extends WorkerHost { + private readonly logger = new Logger(WebhookDeliveryProcessor.name); + + constructor( + @InjectRepository(WebhookDelivery) + private readonly deliveryRepository: Repository, + ) { + super(); + } + + async process(job: Job<{ deliveryId: string }>): Promise { + const { deliveryId } = job.data; + + const delivery = await this.deliveryRepository.findOne({ + where: { id: deliveryId }, + relations: ['webhook'], + }); + + if (!delivery) { + this.logger.error(`Delivery ${deliveryId} not found`); + return; + } + + try { + const response = await axios.post(delivery.webhook.url, delivery.payload, { + headers: { + 'Content-Type': 'application/json', + 'X-Webhook-Signature': delivery.signature, + 'X-Webhook-Event': delivery.event, + 'X-Webhook-Delivery-ID': delivery.id, + 'X-Webhook-Timestamp': delivery.createdAt.toISOString(), + }, + timeout: 10000, + }); + + delivery.status = 'success'; + delivery.responseCode = response.status; + delivery.responseBody = this.serializeResponseBody(response.data); + delivery.error = undefined; + delivery.nextRetryAt = undefined; + delivery.retryCount = job.attemptsMade; + delivery.deliveredAt = new Date(); + + this.logger.log(`Webhook delivered successfully: ${deliveryId}`); + } catch (error) { + const axiosError = error as AxiosError; + const currentAttempt = job.attemptsMade + 1; + const shouldRetry = currentAttempt <= WEBHOOK_MAX_RETRIES; + + delivery.responseCode = axiosError.response?.status; + delivery.responseBody = this.serializeResponseBody(axiosError.response?.data); + delivery.error = axiosError.message; + delivery.retryCount = Math.min(currentAttempt, WEBHOOK_MAX_RETRIES); + + if (shouldRetry) { + delivery.status = 'retry'; + delivery.nextRetryAt = new Date( + Date.now() + WEBHOOK_INITIAL_RETRY_DELAY_MS * Math.pow(2, currentAttempt - 1), + ); + + this.logger.warn( + `Webhook delivery failed, retrying (${delivery.retryCount}/${WEBHOOK_MAX_RETRIES}): ${deliveryId}`, + ); + } else { + delivery.status = 'failed'; + delivery.nextRetryAt = undefined; + this.logger.error(`Webhook delivery failed permanently: ${deliveryId}`); + } + + await this.deliveryRepository.save(delivery); + throw error; + } + + await this.deliveryRepository.save(delivery); + } + + @OnWorkerEvent('failed') + onFailed(job: Job, err: Error) { + this.logger.error(`Job ${job.id} failed with error: ${err.message}`); + } + + private serializeResponseBody(data: unknown): string | undefined { + if (data == null) { + return undefined; + } + + return typeof data === 'string' ? data : JSON.stringify(data); + } +} \ No newline at end of file diff --git a/src/webhooks/webhook-events.listener.ts b/src/webhooks/webhook-events.listener.ts new file mode 100644 index 0000000..acfa621 --- /dev/null +++ b/src/webhooks/webhook-events.listener.ts @@ -0,0 +1,34 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { WebhooksService } from './webhooks.service'; +import { WEBHOOK_INTERNAL_EVENTS } from './webhook.constants'; + +@Injectable() +export class WebhookEventsListener { + constructor(private readonly webhooksService: WebhooksService) {} + + @OnEvent(WEBHOOK_INTERNAL_EVENTS.puzzleSolved, { async: true }) + async handlePuzzleSolved(payload: Record) { + await this.webhooksService.enqueueEvent('puzzle.solved', payload); + } + + @OnEvent(WEBHOOK_INTERNAL_EVENTS.achievementUnlocked, { async: true }) + async handleAchievementUnlocked(payload: Record) { + await this.webhooksService.enqueueEvent('achievement.unlocked', payload); + } + + @OnEvent(WEBHOOK_INTERNAL_EVENTS.sessionEnded, { async: true }) + async handleSessionEnded(payload: Record) { + await this.webhooksService.enqueueEvent('session.ended', payload); + } + + @OnEvent(WEBHOOK_INTERNAL_EVENTS.userRegistered, { async: true }) + async handleUserRegistered(payload: Record) { + await this.webhooksService.enqueueEvent('user.registered', payload); + } + + @OnEvent(WEBHOOK_INTERNAL_EVENTS.leaderboardUpdated, { async: true }) + async handleLeaderboardUpdated(payload: Record) { + await this.webhooksService.enqueueEvent('leaderboard.updated', payload); + } +} \ No newline at end of file diff --git a/src/webhooks/webhook-signature.util.spec.ts b/src/webhooks/webhook-signature.util.spec.ts new file mode 100644 index 0000000..24f96cb --- /dev/null +++ b/src/webhooks/webhook-signature.util.spec.ts @@ -0,0 +1,9 @@ +import { createWebhookSignature } from './webhook-signature.util'; + +describe('createWebhookSignature', () => { + it('generates an HMAC-SHA256 signature with the sha256 prefix', () => { + const signature = createWebhookSignature('{"hello":"world"}', 'top-secret'); + + expect(signature).toBe('sha256=afd00617ceb8f63e65ea5c310f06bf78c3901e7a713db532e25da26ad63c7236'); + }); +}); \ No newline at end of file diff --git a/src/webhooks/webhook-signature.util.ts b/src/webhooks/webhook-signature.util.ts new file mode 100644 index 0000000..021547e --- /dev/null +++ b/src/webhooks/webhook-signature.util.ts @@ -0,0 +1,6 @@ +import * as crypto from 'crypto'; + +export function createWebhookSignature(payload: string, secret: string): string { + const digest = crypto.createHmac('sha256', secret).update(payload).digest('hex'); + return `sha256=${digest}`; +} \ No newline at end of file diff --git a/src/webhooks/webhook-url-validator.service.ts b/src/webhooks/webhook-url-validator.service.ts new file mode 100644 index 0000000..7c675ef --- /dev/null +++ b/src/webhooks/webhook-url-validator.service.ts @@ -0,0 +1,67 @@ +import { BadRequestException, Injectable } from '@nestjs/common'; +import axios from 'axios'; + +@Injectable() +export class WebhookUrlValidatorService { + async validate(url: string): Promise { + const parsed = this.parseHttpsUrl(url); + + try { + const response = await axios.head(parsed.toString(), { + timeout: 5000, + maxRedirects: 5, + validateStatus: (status) => status < 500, + }); + + if (response.status < 400 || response.status === 403) { + return; + } + + if (response.status !== 405) { + throw new BadRequestException(`Webhook URL is not reachable: ${response.status}`); + } + } catch (error) { + if (axios.isAxiosError(error) && error.response?.status && error.response.status !== 405) { + throw new BadRequestException(`Webhook URL is not reachable: ${error.response.status}`); + } + + if (!axios.isAxiosError(error) || !error.response) { + throw new BadRequestException('Webhook URL is not reachable'); + } + } + + try { + const response = await axios.get(parsed.toString(), { + timeout: 5000, + maxRedirects: 5, + validateStatus: (status) => status < 500, + }); + + if (response.status >= 400 && response.status !== 403) { + throw new BadRequestException(`Webhook URL is not reachable: ${response.status}`); + } + } catch (error) { + if (axios.isAxiosError(error) && error.response?.status) { + throw new BadRequestException(`Webhook URL is not reachable: ${error.response.status}`); + } + + throw new BadRequestException('Webhook URL is not reachable'); + } + } + + private parseHttpsUrl(url: string): URL { + let parsed: URL; + + try { + parsed = new URL(url); + } catch { + throw new BadRequestException('Webhook URL must be a valid HTTPS URL'); + } + + if (parsed.protocol !== 'https:') { + throw new BadRequestException('Webhook URL must use HTTPS'); + } + + return parsed; + } +} \ No newline at end of file diff --git a/src/webhooks/webhook.constants.ts b/src/webhooks/webhook.constants.ts new file mode 100644 index 0000000..d7ba764 --- /dev/null +++ b/src/webhooks/webhook.constants.ts @@ -0,0 +1,25 @@ +export const SUPPORTED_WEBHOOK_EVENTS = [ + 'puzzle.solved', + 'achievement.unlocked', + 'session.ended', + 'user.registered', + 'leaderboard.updated', +] as const; + +export type WebhookEvent = (typeof SUPPORTED_WEBHOOK_EVENTS)[number]; + +export const WEBHOOK_INTERNAL_EVENTS = { + puzzleSolved: 'webhook.puzzle.solved', + achievementUnlocked: 'webhook.achievement.unlocked', + sessionEnded: 'webhook.session.ended', + userRegistered: 'webhook.user.registered', + leaderboardUpdated: 'webhook.leaderboard.updated', +} as const; + +export const WEBHOOK_QUEUE = 'webhook-delivery'; +export const WEBHOOK_JOB = 'deliver-webhook'; +export const WEBHOOK_MAX_RETRIES = 3; +export const WEBHOOK_TOTAL_ATTEMPTS = WEBHOOK_MAX_RETRIES + 1; +export const WEBHOOK_INITIAL_RETRY_DELAY_MS = 1000; +export const WEBHOOK_DELIVERY_RETENTION_LIMIT = 100; +export const WEBHOOK_DELIVERY_TTL_DAYS = 30; \ No newline at end of file diff --git a/src/webhooks/webhooks.controller.ts b/src/webhooks/webhooks.controller.ts new file mode 100644 index 0000000..ad7a717 --- /dev/null +++ b/src/webhooks/webhooks.controller.ts @@ -0,0 +1,71 @@ +import { + Body, + Controller, + DefaultValuePipe, + Delete, + Get, + HttpCode, + HttpStatus, + Param, + ParseIntPipe, + Post, + Query, + UnauthorizedException, + UseGuards, +} from '@nestjs/common'; +import { ActiveUser } from '../auth/decorators/active-user.decorator'; +import { JwtAuthGuard } from '../auth/guards/jwt-auth.guard'; +import { WebhooksService } from './webhooks.service'; +import { CreateWebhookDto } from './dto/create-webhook.dto'; + +@UseGuards(JwtAuthGuard) +@Controller('webhooks') +export class WebhooksController { + constructor(private readonly webhooksService: WebhooksService) {} + + @Post() + async create( + @ActiveUser() user: { userId?: string; sub?: string; id?: string }, + @Body() createWebhookDto: CreateWebhookDto, + ) { + const webhook = await this.webhooksService.create(this.getOwnerId(user), createWebhookDto); + return this.webhooksService.toResponse(webhook); + } + + @Get() + async findAll( + @ActiveUser() user: { userId?: string; sub?: string; id?: string }, + @Query('appId') appId?: string, + ) { + const webhooks = await this.webhooksService.findAll(this.getOwnerId(user), appId); + return webhooks.map((webhook) => this.webhooksService.toResponse(webhook)); + } + + @Delete(':id') + @HttpCode(HttpStatus.NO_CONTENT) + async remove( + @ActiveUser() user: { userId?: string; sub?: string; id?: string }, + @Param('id') id: string, + ) { + await this.webhooksService.remove(this.getOwnerId(user), id); + } + + @Get(':id/deliveries') + getDeliveries( + @ActiveUser() user: { userId?: string; sub?: string; id?: string }, + @Param('id') id: string, + @Query('limit', new DefaultValuePipe(100), ParseIntPipe) limit: number, + ) { + return this.webhooksService.getDeliveries(this.getOwnerId(user), id, limit); + } + + private getOwnerId(user: { userId?: string; sub?: string; id?: string }): string { + const ownerId = user?.userId ?? user?.sub ?? user?.id; + + if (!ownerId) { + throw new UnauthorizedException('Authenticated user not found'); + } + + return ownerId; + } +} \ No newline at end of file diff --git a/src/webhooks/webhooks.module.ts b/src/webhooks/webhooks.module.ts new file mode 100644 index 0000000..f0c727b --- /dev/null +++ b/src/webhooks/webhooks.module.ts @@ -0,0 +1,29 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { BullModule } from '@nestjs/bullmq'; +import { WebhooksService } from './webhooks.service'; +import { WebhooksController } from './webhooks.controller'; +import { WebhookDeliveryProcessor } from './processors/webhook-delivery.processor'; +import { Webhook } from './entities/webhook.entity'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { WEBHOOK_QUEUE } from './webhook.constants'; +import { WebhookUrlValidatorService } from './webhook-url-validator.service'; +import { WebhookEventsListener } from './webhook-events.listener'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([Webhook, WebhookDelivery]), + BullModule.registerQueue({ + name: WEBHOOK_QUEUE, + }), + ], + controllers: [WebhooksController], + providers: [ + WebhooksService, + WebhookDeliveryProcessor, + WebhookUrlValidatorService, + WebhookEventsListener, + ], + exports: [WebhooksService], +}) +export class WebhooksModule {} \ No newline at end of file diff --git a/src/webhooks/webhooks.service.spec.ts b/src/webhooks/webhooks.service.spec.ts new file mode 100644 index 0000000..f286b7f --- /dev/null +++ b/src/webhooks/webhooks.service.spec.ts @@ -0,0 +1,101 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { getQueueToken } from '@nestjs/bullmq'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { WebhooksService } from './webhooks.service'; +import { Webhook } from './entities/webhook.entity'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { WebhookUrlValidatorService } from './webhook-url-validator.service'; +import { WEBHOOK_QUEUE } from './webhook.constants'; + +describe('WebhooksService', () => { + let service: WebhooksService; + let webhookRepository: any; + let deliveryRepository: any; + let validator: any; + + beforeEach(async () => { + webhookRepository = { + create: jest.fn().mockImplementation((value) => value), + save: jest.fn().mockImplementation(async (value) => ({ + id: 'webhook-1', + createdAt: new Date('2026-03-27T00:00:00.000Z'), + updatedAt: new Date('2026-03-27T00:00:00.000Z'), + active: true, + ...value, + })), + find: jest.fn(), + findOne: jest.fn(), + remove: jest.fn(), + createQueryBuilder: jest.fn(), + }; + + deliveryRepository = { + find: jest.fn().mockResolvedValue([{ id: 'delivery-1', webhookId: 'webhook-1', status: 'success' }]), + createQueryBuilder: jest.fn(), + delete: jest.fn(), + save: jest.fn(), + create: jest.fn().mockImplementation((value) => value), + }; + + validator = { validate: jest.fn().mockResolvedValue(undefined) }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WebhooksService, + { provide: getRepositoryToken(Webhook), useValue: webhookRepository }, + { provide: getRepositoryToken(WebhookDelivery), useValue: deliveryRepository }, + { provide: getQueueToken(WEBHOOK_QUEUE), useValue: { add: jest.fn() } }, + { provide: WebhookUrlValidatorService, useValue: validator }, + { provide: ConfigService, useValue: { get: jest.fn().mockImplementation((key: string, fallback: unknown) => fallback) } }, + ], + }).compile(); + + service = module.get(WebhooksService); + }); + + it('registers a webhook for the authenticated user', async () => { + webhookRepository.findOne.mockResolvedValue({ + id: 'webhook-1', + url: 'https://example.com/hook', + events: ['puzzle.solved'], + active: true, + userId: 'user-1', + appId: 'mesh', + createdAt: new Date('2026-03-27T00:00:00.000Z'), + updatedAt: new Date('2026-03-27T00:00:00.000Z'), + }); + + const webhook = await service.create('user-1', { + url: 'https://example.com/hook', + secret: 'secret', + events: ['puzzle.solved'], + appId: 'mesh', + }); + + expect(validator.validate).toHaveBeenCalledWith('https://example.com/hook'); + expect(webhookRepository.create).toHaveBeenCalledWith({ + url: 'https://example.com/hook', + secret: 'secret', + events: ['puzzle.solved'], + appId: 'mesh', + userId: 'user-1', + active: true, + }); + expect(webhook.userId).toBe('user-1'); + }); + + it('returns delivery logs for the owning user', async () => { + webhookRepository.findOne.mockResolvedValue({ id: 'webhook-1', userId: 'user-1' }); + + const deliveries = await service.getDeliveries('user-1', 'webhook-1'); + + expect(webhookRepository.findOne).toHaveBeenCalledWith({ where: { id: 'webhook-1', userId: 'user-1' } }); + expect(deliveryRepository.find).toHaveBeenCalledWith({ + where: { webhookId: 'webhook-1' }, + order: { createdAt: 'DESC' }, + take: 100, + }); + expect(deliveries).toHaveLength(1); + }); +}); \ No newline at end of file diff --git a/src/webhooks/webhooks.service.ts b/src/webhooks/webhooks.service.ts new file mode 100644 index 0000000..50a368e --- /dev/null +++ b/src/webhooks/webhooks.service.ts @@ -0,0 +1,200 @@ +import { Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { ConfigService } from '@nestjs/config'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { Queue } from 'bullmq'; +import { Webhook } from './entities/webhook.entity'; +import { WebhookDelivery } from './entities/webhook-delivery.entity'; +import { CreateWebhookDto } from './dto/create-webhook.dto'; +import { + WEBHOOK_DELIVERY_RETENTION_LIMIT, + WEBHOOK_DELIVERY_TTL_DAYS, + WEBHOOK_INITIAL_RETRY_DELAY_MS, + WEBHOOK_JOB, + WEBHOOK_QUEUE, + WEBHOOK_TOTAL_ATTEMPTS, + WebhookEvent, +} from './webhook.constants'; +import { createWebhookSignature } from './webhook-signature.util'; +import { WebhookUrlValidatorService } from './webhook-url-validator.service'; + +@Injectable() +export class WebhooksService { + private readonly logger = new Logger(WebhooksService.name); + + constructor( + @InjectRepository(Webhook) + private readonly webhookRepository: Repository, + @InjectRepository(WebhookDelivery) + private readonly deliveryRepository: Repository, + @InjectQueue(WEBHOOK_QUEUE) + private readonly deliveryQueue: Queue, + private readonly webhookUrlValidator: WebhookUrlValidatorService, + private readonly configService: ConfigService, + ) {} + + async create(ownerUserId: string, createWebhookDto: CreateWebhookDto): Promise { + await this.webhookUrlValidator.validate(createWebhookDto.url); + + const webhook = this.webhookRepository.create({ + ...createWebhookDto, + userId: ownerUserId, + active: true, + }); + + const savedWebhook = await this.webhookRepository.save(webhook); + return this.findOwnedWebhook(ownerUserId, savedWebhook.id); + } + + async findAll(ownerUserId: string, appId?: string): Promise { + const where: Record = { userId: ownerUserId }; + + if (appId) { + where.appId = appId; + } + + return this.webhookRepository.find({ + where, + order: { createdAt: 'DESC' }, + }); + } + + async findOwnedWebhook(ownerUserId: string, id: string): Promise { + const webhook = await this.webhookRepository.findOne({ where: { id, userId: ownerUserId } }); + + if (!webhook) { + throw new NotFoundException('Webhook not found'); + } + + return webhook; + } + + async remove(ownerUserId: string, id: string): Promise { + const webhook = await this.findOwnedWebhook(ownerUserId, id); + await this.webhookRepository.remove(webhook); + } + + async getDeliveries(ownerUserId: string, webhookId: string, limit = 100): Promise { + await this.findOwnedWebhook(ownerUserId, webhookId); + + return this.deliveryRepository.find({ + where: { webhookId }, + order: { createdAt: 'DESC' }, + take: Math.min(limit, WEBHOOK_DELIVERY_RETENTION_LIMIT), + }); + } + + async enqueueEvent(event: WebhookEvent, payload: Record): Promise { + const webhooks = await this.webhookRepository + .createQueryBuilder('webhook') + .addSelect('webhook.secret') + .where('webhook.active = :active', { active: true }) + .andWhere(':event = ANY(webhook.events)', { event }) + .getMany(); + + for (const webhook of webhooks) { + await this.queueDelivery(webhook, event, payload); + } + } + + toResponse(webhook: Webhook) { + return { + id: webhook.id, + url: webhook.url, + events: webhook.events, + active: webhook.active, + userId: webhook.userId, + appId: webhook.appId, + createdAt: webhook.createdAt, + updatedAt: webhook.updatedAt, + }; + } + + buildSignature(payload: Record | string, secret: string): string { + const serializedPayload = typeof payload === 'string' ? payload : JSON.stringify(payload); + return createWebhookSignature(serializedPayload, secret); + } + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async cleanupExpiredDeliveries(): Promise { + const ttlDays = this.configService.get('WEBHOOK_DELIVERY_TTL_DAYS', WEBHOOK_DELIVERY_TTL_DAYS); + + if (ttlDays <= 0) { + return; + } + + const cutoff = new Date(Date.now() - ttlDays * 24 * 60 * 60 * 1000); + const result = await this.deliveryRepository + .createQueryBuilder() + .delete() + .from(WebhookDelivery) + .where('createdAt < :cutoff', { cutoff }) + .execute(); + + if (result.affected) { + this.logger.log(`Cleaned up ${result.affected} expired webhook deliveries`); + } + } + + private async queueDelivery( + webhook: Webhook & { secret?: string }, + event: WebhookEvent, + payload: Record, + ): Promise { + const signature = this.buildSignature(payload, webhook.secret || ''); + + const delivery = await this.deliveryRepository.save( + this.deliveryRepository.create({ + webhookId: webhook.id, + event, + payload, + signature, + status: 'pending', + }), + ); + + await this.pruneDeliveryHistory(webhook.id); + + const ttlDays = this.configService.get('WEBHOOK_DELIVERY_TTL_DAYS', WEBHOOK_DELIVERY_TTL_DAYS); + const ageSeconds = Math.max(ttlDays, 1) * 24 * 60 * 60; + + await this.deliveryQueue.add( + WEBHOOK_JOB, + { deliveryId: delivery.id }, + { + jobId: delivery.id, + attempts: WEBHOOK_TOTAL_ATTEMPTS, + backoff: { + type: 'exponential', + delay: WEBHOOK_INITIAL_RETRY_DELAY_MS, + }, + removeOnComplete: { + count: WEBHOOK_DELIVERY_RETENTION_LIMIT, + age: ageSeconds, + }, + removeOnFail: { + count: WEBHOOK_DELIVERY_RETENTION_LIMIT, + age: ageSeconds, + }, + }, + ); + } + + private async pruneDeliveryHistory(webhookId: string): Promise { + const staleDeliveries = await this.deliveryRepository + .createQueryBuilder('delivery') + .select('delivery.id', 'id') + .where('delivery.webhookId = :webhookId', { webhookId }) + .orderBy('delivery.createdAt', 'DESC') + .offset(WEBHOOK_DELIVERY_RETENTION_LIMIT) + .getRawMany<{ id: string }>(); + + if (staleDeliveries.length === 0) { + return; + } + + await this.deliveryRepository.delete(staleDeliveries.map((delivery) => delivery.id)); + } +} \ No newline at end of file