diff --git a/src/lib/jobs/__tests__/idempotency-cleanup.test.ts b/src/lib/jobs/__tests__/idempotency-cleanup.test.ts new file mode 100644 index 0000000..265bf1e --- /dev/null +++ b/src/lib/jobs/__tests__/idempotency-cleanup.test.ts @@ -0,0 +1,47 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import type { PrismaClient } from "@/generated/prisma/client"; +import { cleanupExpiredIdempotencyKeys } from "../idempotency-cleanup"; + +function makePrismaMock(deletedCount: number) { + return { + idempotencyKey: { + deleteMany: vi.fn().mockResolvedValue({ count: deletedCount }), + }, + } as unknown as PrismaClient; +} + +describe("cleanupExpiredIdempotencyKeys", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-04T03:00:00Z")); + }); + + it("deletes all rows whose expires_at is in the past and returns the count", async () => { + const prisma = makePrismaMock(100); + const deleted = await cleanupExpiredIdempotencyKeys(prisma); + + expect(deleted).toBe(100); + expect(prisma.idempotencyKey.deleteMany).toHaveBeenCalledTimes(1); + expect(prisma.idempotencyKey.deleteMany).toHaveBeenCalledWith({ + where: { expiresAt: { lt: new Date("2026-05-04T03:00:00Z") } }, + }); + }); + + it("returns 0 when no rows are expired", async () => { + const prisma = makePrismaMock(0); + const deleted = await cleanupExpiredIdempotencyKeys(prisma); + expect(deleted).toBe(0); + }); + + it("propagates errors from prisma so the handler can warn", async () => { + const prisma = { + idempotencyKey: { + deleteMany: vi.fn().mockRejectedValue(new Error("connection lost")), + }, + } as unknown as PrismaClient; + + await expect(cleanupExpiredIdempotencyKeys(prisma)).rejects.toThrow( + "connection lost", + ); + }); +}); diff --git a/src/lib/jobs/idempotency-cleanup.ts b/src/lib/jobs/idempotency-cleanup.ts new file mode 100644 index 0000000..ec30f8f --- /dev/null +++ b/src/lib/jobs/idempotency-cleanup.ts @@ -0,0 +1,18 @@ +/** + * Daily cleanup for the `idempotency_keys` table. + * + * `withIdempotency()` (src/lib/idempotency.ts) only purges a row lazily + * on the next lookup with the same `(userId, key, method, path)` tuple + * — most rows never see another retry, so without a sweeper the table + * grows unbounded with stale 24h-expired entries. + */ +import type { PrismaClient } from "@/generated/prisma/client"; + +export async function cleanupExpiredIdempotencyKeys( + prisma: PrismaClient, +): Promise { + const { count } = await prisma.idempotencyKey.deleteMany({ + where: { expiresAt: { lt: new Date() } }, + }); + return count; +} diff --git a/src/lib/jobs/reminder-worker.ts b/src/lib/jobs/reminder-worker.ts index 9036bc0..b9b125b 100644 --- a/src/lib/jobs/reminder-worker.ts +++ b/src/lib/jobs/reminder-worker.ts @@ -27,6 +27,7 @@ import { recordError, } from "@/lib/jobs/worker-status"; import { setGlobalBoss } from "@/lib/jobs/boss-instance"; +import { cleanupExpiredIdempotencyKeys } from "@/lib/jobs/idempotency-cleanup"; import { deleteMessage } from "@/lib/telegram"; import { decrypt, encrypt } from "@/lib/crypto"; import { syncMoodLogEntries } from "@/lib/moodlog/sync"; @@ -90,6 +91,8 @@ const DATA_BACKUP_QUEUE = "data-backup"; const DATA_BACKUP_CRON = "0 3 * * 0"; // weekly Sunday at 03:00 const RATE_LIMIT_CLEANUP_QUEUE = "rate-limit-cleanup"; const RATE_LIMIT_CLEANUP_CRON = "*/5 * * * *"; // every 5 minutes +const IDEMPOTENCY_CLEANUP_QUEUE = "idempotency-cleanup"; +const IDEMPOTENCY_CLEANUP_CRON = "0 3 * * *"; // daily at 03:00 (Europe/Berlin) interface ReminderCheckPayload { triggeredAt: string; @@ -141,6 +144,10 @@ interface RateLimitCleanupPayload { triggeredAt: string; } +interface IdempotencyCleanupPayload { + triggeredAt: string; +} + // Re-export timezone utilities under local names for backward compatibility const getUserTodayBounds = getUserTodayBoundsUtil; const getDayOfWeekInTz = getDayOfWeekInTzUtil; @@ -818,6 +825,21 @@ async function handleRateLimitCleanup( }); } +async function handleIdempotencyCleanup( + jobs: Job[], +) { + void jobs; + await withBackgroundEvent("job.idempotency_cleanup", async (evt) => { + const p = getWorkerPrisma(); + try { + const deleted = await cleanupExpiredIdempotencyKeys(p); + evt.addMeta("idempotency_cleanup_deleted", deleted); + } catch (err) { + evt.addWarning(`idempotency-cleanup failed: ${err}`); + } + }); +} + async function handleDataBackup(jobs: Job[]) { void jobs; await withBackgroundEvent("job.data_backup", async (evt) => { @@ -998,6 +1020,7 @@ export async function startReminderWorker() { MOODLOG_SYNC_QUEUE, DATA_BACKUP_QUEUE, RATE_LIMIT_CLEANUP_QUEUE, + IDEMPOTENCY_CLEANUP_QUEUE, ]; for (const q of allQueues) { @@ -1017,6 +1040,7 @@ export async function startReminderWorker() { [MOODLOG_SYNC_QUEUE, MOODLOG_SYNC_CRON], [DATA_BACKUP_QUEUE, DATA_BACKUP_CRON], [RATE_LIMIT_CLEANUP_QUEUE, RATE_LIMIT_CLEANUP_CRON], + [IDEMPOTENCY_CLEANUP_QUEUE, IDEMPOTENCY_CLEANUP_CRON], ]; for (const [name, cron] of schedules) { @@ -1084,6 +1108,11 @@ export async function startReminderWorker() { { localConcurrency: 1 }, handleRateLimitCleanup, ); + await boss.work( + IDEMPOTENCY_CLEANUP_QUEUE, + { localConcurrency: 1 }, + handleIdempotencyCleanup, + ); return boss; }