Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/lib/jobs/__tests__/idempotency-cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import type { PrismaClient } from "@/generated/prisma/client";
import { cleanupExpiredIdempotencyKeys } from "../idempotency-cleanup";

function makePrismaMock(deletedCount: number) {
return {
idempotencyKey: {
deleteMany: vi.fn().mockResolvedValue({ count: deletedCount }),
},
} as unknown as PrismaClient;
}

describe("cleanupExpiredIdempotencyKeys", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-04T03:00:00Z"));
});

it("deletes all rows whose expires_at is in the past and returns the count", async () => {
const prisma = makePrismaMock(100);
const deleted = await cleanupExpiredIdempotencyKeys(prisma);

expect(deleted).toBe(100);
expect(prisma.idempotencyKey.deleteMany).toHaveBeenCalledTimes(1);
expect(prisma.idempotencyKey.deleteMany).toHaveBeenCalledWith({
where: { expiresAt: { lt: new Date("2026-05-04T03:00:00Z") } },
});
});

it("returns 0 when no rows are expired", async () => {
const prisma = makePrismaMock(0);
const deleted = await cleanupExpiredIdempotencyKeys(prisma);
expect(deleted).toBe(0);
});

it("propagates errors from prisma so the handler can warn", async () => {
const prisma = {
idempotencyKey: {
deleteMany: vi.fn().mockRejectedValue(new Error("connection lost")),
},
} as unknown as PrismaClient;

await expect(cleanupExpiredIdempotencyKeys(prisma)).rejects.toThrow(
"connection lost",
);
});
});
18 changes: 18 additions & 0 deletions src/lib/jobs/idempotency-cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Daily cleanup for the `idempotency_keys` table.
*
* `withIdempotency()` (src/lib/idempotency.ts) only purges a row lazily
* on the next lookup with the same `(userId, key, method, path)` tuple
* — most rows never see another retry, so without a sweeper the table
* grows unbounded with stale 24h-expired entries.
*/
import type { PrismaClient } from "@/generated/prisma/client";

export async function cleanupExpiredIdempotencyKeys(
prisma: PrismaClient,
): Promise<number> {
const { count } = await prisma.idempotencyKey.deleteMany({
where: { expiresAt: { lt: new Date() } },
});
return count;
}
29 changes: 29 additions & 0 deletions src/lib/jobs/reminder-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
recordError,
} from "@/lib/jobs/worker-status";
import { setGlobalBoss } from "@/lib/jobs/boss-instance";
import { cleanupExpiredIdempotencyKeys } from "@/lib/jobs/idempotency-cleanup";
import { deleteMessage } from "@/lib/telegram";
import { decrypt, encrypt } from "@/lib/crypto";
import { syncMoodLogEntries } from "@/lib/moodlog/sync";
Expand Down Expand Up @@ -90,6 +91,8 @@ const DATA_BACKUP_QUEUE = "data-backup";
const DATA_BACKUP_CRON = "0 3 * * 0"; // weekly Sunday at 03:00
const RATE_LIMIT_CLEANUP_QUEUE = "rate-limit-cleanup";
const RATE_LIMIT_CLEANUP_CRON = "*/5 * * * *"; // every 5 minutes
const IDEMPOTENCY_CLEANUP_QUEUE = "idempotency-cleanup";
const IDEMPOTENCY_CLEANUP_CRON = "0 3 * * *"; // daily at 03:00 (Europe/Berlin)

interface ReminderCheckPayload {
triggeredAt: string;
Expand Down Expand Up @@ -141,6 +144,10 @@ interface RateLimitCleanupPayload {
triggeredAt: string;
}

interface IdempotencyCleanupPayload {
triggeredAt: string;
}

// Re-export timezone utilities under local names for backward compatibility
const getUserTodayBounds = getUserTodayBoundsUtil;
const getDayOfWeekInTz = getDayOfWeekInTzUtil;
Expand Down Expand Up @@ -818,6 +825,21 @@ async function handleRateLimitCleanup(
});
}

async function handleIdempotencyCleanup(
jobs: Job<IdempotencyCleanupPayload>[],
) {
void jobs;
await withBackgroundEvent("job.idempotency_cleanup", async (evt) => {
const p = getWorkerPrisma();
try {
const deleted = await cleanupExpiredIdempotencyKeys(p);
evt.addMeta("idempotency_cleanup_deleted", deleted);
} catch (err) {
evt.addWarning(`idempotency-cleanup failed: ${err}`);
}
});
}

async function handleDataBackup(jobs: Job<DataBackupPayload>[]) {
void jobs;
await withBackgroundEvent("job.data_backup", async (evt) => {
Expand Down Expand Up @@ -998,6 +1020,7 @@ export async function startReminderWorker() {
MOODLOG_SYNC_QUEUE,
DATA_BACKUP_QUEUE,
RATE_LIMIT_CLEANUP_QUEUE,
IDEMPOTENCY_CLEANUP_QUEUE,
];

for (const q of allQueues) {
Expand All @@ -1017,6 +1040,7 @@ export async function startReminderWorker() {
[MOODLOG_SYNC_QUEUE, MOODLOG_SYNC_CRON],
[DATA_BACKUP_QUEUE, DATA_BACKUP_CRON],
[RATE_LIMIT_CLEANUP_QUEUE, RATE_LIMIT_CLEANUP_CRON],
[IDEMPOTENCY_CLEANUP_QUEUE, IDEMPOTENCY_CLEANUP_CRON],
];

for (const [name, cron] of schedules) {
Expand Down Expand Up @@ -1084,6 +1108,11 @@ export async function startReminderWorker() {
{ localConcurrency: 1 },
handleRateLimitCleanup,
);
await boss.work<IdempotencyCleanupPayload>(
IDEMPOTENCY_CLEANUP_QUEUE,
{ localConcurrency: 1 },
handleIdempotencyCleanup,
);

return boss;
}
Expand Down
Loading