From dabab0adc60077709c48bae1b86983d6a11fe2bb Mon Sep 17 00:00:00 2001 From: pq198363-ops <246611021+pq198363-ops@users.noreply.github.com> Date: Sat, 4 Jul 2026 11:52:20 +0800 Subject: [PATCH] feat: add idempotency key support --- README.md | 3 + docs/idempotency.md | 59 +++++++ src/middleware/idempotency.test.ts | 268 +++++++++++++++++++++++++++++ src/middleware/idempotency.ts | 158 +++++++++++++++++ src/routes/usage.ts | 8 +- 5 files changed, 493 insertions(+), 3 deletions(-) create mode 100644 docs/idempotency.md create mode 100644 src/middleware/idempotency.test.ts create mode 100644 src/middleware/idempotency.ts diff --git a/README.md b/README.md index 337ee4f..1b09832 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,9 @@ agentpay-backend/ - [Billing units and settlement semantics](docs/billing-units.md) explains stroops, `priceStroops`, `billedStroops`, `/api/v1/billing/*`, and why `POST /api/v1/settle` drains backend counters without moving funds. +- [Idempotency keys](docs/idempotency.md) documents retry-safe billing writes + for `POST /api/v1/usage`, `POST /api/v1/usage/bulk`, and + `POST /api/v1/settle`. ## Quickstart diff --git a/docs/idempotency.md b/docs/idempotency.md new file mode 100644 index 0000000..8fc735f --- /dev/null +++ b/docs/idempotency.md @@ -0,0 +1,59 @@ +# Idempotency Keys + +`POST /api/v1/usage`, `POST /api/v1/usage/bulk`, and `POST /api/v1/settle` +honor the `Idempotency-Key` header so clients can safely retry billing writes +after a timeout or dropped connection. + +## Replay Behavior + +The backend stores the first JSON response for each `(caller, key)` pair. The +caller namespace is the recognized `X-API-Key` when present, otherwise the +client IP address. API keys are hashed before they are used in the in-memory +idempotency cache key. + +When the same caller retries the same route with the same request body and the +same `Idempotency-Key`, the backend returns the original status and body, and +adds: + +```text +Idempotency-Replayed: true +``` + +The success response shapes are unchanged. For example, a replayed +`POST /api/v1/usage` response still looks like: + +```json +{ + "agent": "agent-alpha", + "serviceId": "embedding-v1", + "total": 3 +} +``` + +## Conflicts + +If the same caller reuses an `Idempotency-Key` with a different request body or +route before the cached entry expires, the backend rejects the request: + +```json +{ + "error": "idempotency_conflict", + "message": "Idempotency-Key was already used with a different request body or route", + "requestId": "..." +} +``` + +The response status is `409 Conflict`. + +## Cache Limits + +The idempotency cache is process-local and in-memory. It resets on restart. Two +optional environment variables control its size and age: + +| Variable | Default | Description | +| ------------------------------- | -------: | ---------------------------------- | +| `IDEMPOTENCY_CACHE_TTL_MS` | `600000` | Entry lifetime in milliseconds | +| `IDEMPOTENCY_CACHE_MAX_ENTRIES` | `1000` | Maximum cached idempotency entries | + +Expired entries are pruned before handling a keyed request. When the cache is +over capacity, the oldest entries are evicted first. diff --git a/src/middleware/idempotency.test.ts b/src/middleware/idempotency.test.ts new file mode 100644 index 0000000..9ce9147 --- /dev/null +++ b/src/middleware/idempotency.test.ts @@ -0,0 +1,268 @@ +import { describe, it, beforeEach } from "node:test"; +import assert from "node:assert"; +import request from "supertest"; +import { createApp } from "../index.js"; +import { apiKeyStore, servicesStore, usageStore } from "../store/state.js"; + +function createAppWithIdempotencyEnv(env: { ttlMs?: number; maxEntries?: number }) { + const previousTtl = process.env.IDEMPOTENCY_CACHE_TTL_MS; + const previousMax = process.env.IDEMPOTENCY_CACHE_MAX_ENTRIES; + + if (env.ttlMs === undefined) { + delete process.env.IDEMPOTENCY_CACHE_TTL_MS; + } else { + process.env.IDEMPOTENCY_CACHE_TTL_MS = String(env.ttlMs); + } + + if (env.maxEntries === undefined) { + delete process.env.IDEMPOTENCY_CACHE_MAX_ENTRIES; + } else { + process.env.IDEMPOTENCY_CACHE_MAX_ENTRIES = String(env.maxEntries); + } + + const app = createApp(); + + if (previousTtl === undefined) { + delete process.env.IDEMPOTENCY_CACHE_TTL_MS; + } else { + process.env.IDEMPOTENCY_CACHE_TTL_MS = previousTtl; + } + + if (previousMax === undefined) { + delete process.env.IDEMPOTENCY_CACHE_MAX_ENTRIES; + } else { + process.env.IDEMPOTENCY_CACHE_MAX_ENTRIES = previousMax; + } + + return app; +} + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +beforeEach(() => { + apiKeyStore.clear(); + servicesStore.clear(); + usageStore.clear(); +}); + +void describe("Idempotency-Key handling", () => { + void it("replays POST /api/v1/usage without incrementing usage again", async () => { + const app = createAppWithIdempotencyEnv({}); + const payload = { + agent: "agent-idem-usage", + serviceId: "svc-idem-usage", + requests: 3, + }; + + const first = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "usage-replay") + .send(payload); + assert.strictEqual(first.status, 201); + assert.deepStrictEqual(first.body, { + agent: "agent-idem-usage", + serviceId: "svc-idem-usage", + total: 3, + }); + + const replay = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "usage-replay") + .send(payload); + assert.strictEqual(replay.status, 201); + assert.strictEqual(replay.headers["idempotency-replayed"], "true"); + assert.deepStrictEqual(replay.body, first.body); + + const total = await request(app).get( + "/api/v1/usage/agent-idem-usage/svc-idem-usage" + ); + assert.strictEqual(total.body.total, 3); + }); + + void it("rejects reuse of the same key with a different body", async () => { + const app = createAppWithIdempotencyEnv({}); + + await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "usage-conflict") + .send({ + agent: "agent-idem-conflict", + serviceId: "svc-idem-conflict", + requests: 1, + }); + + const conflict = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "usage-conflict") + .send({ + agent: "agent-idem-conflict", + serviceId: "svc-idem-conflict", + requests: 2, + }); + assert.strictEqual(conflict.status, 409); + assert.strictEqual(conflict.body.error, "idempotency_conflict"); + assert.ok(conflict.body.requestId); + }); + + void it("replays POST /api/v1/usage/bulk without applying the batch again", async () => { + const app = createAppWithIdempotencyEnv({}); + const payload = { + items: [ + { agent: "agent-idem-bulk", serviceId: "svc-idem-bulk", requests: 2 }, + { agent: "agent-idem-bulk", serviceId: "svc-idem-bulk", requests: 4 }, + ], + }; + + const first = await request(app) + .post("/api/v1/usage/bulk") + .set("Idempotency-Key", "bulk-replay") + .send(payload); + assert.strictEqual(first.status, 201); + assert.deepStrictEqual(first.body.results, [ + { index: 0, ok: true, total: 2 }, + { index: 1, ok: true, total: 6 }, + ]); + + const replay = await request(app) + .post("/api/v1/usage/bulk") + .set("Idempotency-Key", "bulk-replay") + .send(payload); + assert.strictEqual(replay.status, 201); + assert.strictEqual(replay.headers["idempotency-replayed"], "true"); + assert.deepStrictEqual(replay.body, first.body); + + const total = await request(app).get("/api/v1/usage/agent-idem-bulk/svc-idem-bulk"); + assert.strictEqual(total.body.total, 6); + }); + + void it("replays POST /api/v1/settle without draining a second time", async () => { + const app = createAppWithIdempotencyEnv({}); + servicesStore.set("svc-idem-settle", { priceStroops: 10 }); + await request(app) + .post("/api/v1/usage") + .send({ agent: "agent-idem-settle", serviceId: "svc-idem-settle", requests: 5 }); + + const payload = { agent: "agent-idem-settle", serviceId: "svc-idem-settle" }; + const first = await request(app) + .post("/api/v1/settle") + .set("Idempotency-Key", "settle-replay") + .send(payload); + assert.strictEqual(first.status, 200); + assert.strictEqual(first.body.requests, 5); + assert.strictEqual(first.body.billedStroops, 50); + + const replay = await request(app) + .post("/api/v1/settle") + .set("Idempotency-Key", "settle-replay") + .send(payload); + assert.strictEqual(replay.status, 200); + assert.strictEqual(replay.headers["idempotency-replayed"], "true"); + assert.deepStrictEqual(replay.body, first.body); + }); + + void it("namespaces idempotency keys by recognized API key", async () => { + const app = createAppWithIdempotencyEnv({}); + apiKeyStore.set("tenant-a-secret", { label: "tenant-a", createdAt: Date.now() }); + apiKeyStore.set("tenant-b-secret", { label: "tenant-b", createdAt: Date.now() }); + const payload = { + agent: "agent-idem-tenant", + serviceId: "svc-idem-tenant", + requests: 2, + }; + + const first = await request(app) + .post("/api/v1/usage") + .set("X-API-Key", "tenant-a-secret") + .set("Idempotency-Key", "shared-key") + .send(payload); + assert.strictEqual(first.status, 201); + + const tenantAReplay = await request(app) + .post("/api/v1/usage") + .set("X-API-Key", "tenant-a-secret") + .set("Idempotency-Key", "shared-key") + .send(payload); + assert.strictEqual(tenantAReplay.headers["idempotency-replayed"], "true"); + assert.deepStrictEqual(tenantAReplay.body, first.body); + + const tenantBFirst = await request(app) + .post("/api/v1/usage") + .set("X-API-Key", "tenant-b-secret") + .set("Idempotency-Key", "shared-key") + .send(payload); + assert.strictEqual(tenantBFirst.status, 201); + assert.strictEqual(tenantBFirst.headers["idempotency-replayed"], undefined); + assert.strictEqual(tenantBFirst.body.total, 4); + }); + + void it("expires cached responses after the configured TTL", async () => { + const app = createAppWithIdempotencyEnv({ ttlMs: 5 }); + const payload = { + agent: "agent-idem-ttl", + serviceId: "svc-idem-ttl", + requests: 1, + }; + + const first = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "ttl-key") + .send(payload); + assert.strictEqual(first.status, 201); + + const replay = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "ttl-key") + .send(payload); + assert.strictEqual(replay.headers["idempotency-replayed"], "true"); + assert.deepStrictEqual(replay.body, first.body); + + await sleep(15); + + const afterExpiry = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "ttl-key") + .send(payload); + assert.strictEqual(afterExpiry.status, 201); + assert.strictEqual(afterExpiry.headers["idempotency-replayed"], undefined); + assert.strictEqual(afterExpiry.body.total, 2); + }); + + void it("evicts the oldest idempotency entry when the cache is capped", async () => { + const app = createAppWithIdempotencyEnv({ maxEntries: 1 }); + const payloadA = { + agent: "agent-idem-cap-a", + serviceId: "svc-idem-cap", + requests: 1, + }; + const payloadB = { + agent: "agent-idem-cap-b", + serviceId: "svc-idem-cap", + requests: 1, + }; + + const firstA = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "cap-a") + .send(payloadA); + assert.strictEqual(firstA.status, 201); + + const replayA = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "cap-a") + .send(payloadA); + assert.strictEqual(replayA.headers["idempotency-replayed"], "true"); + + await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "cap-b") + .send(payloadB); + + const afterEviction = await request(app) + .post("/api/v1/usage") + .set("Idempotency-Key", "cap-a") + .send(payloadA); + assert.strictEqual(afterEviction.status, 201); + assert.strictEqual(afterEviction.headers["idempotency-replayed"], undefined); + assert.strictEqual(afterEviction.body.total, 2); + }); +}); diff --git a/src/middleware/idempotency.ts b/src/middleware/idempotency.ts new file mode 100644 index 0000000..5cc7c1f --- /dev/null +++ b/src/middleware/idempotency.ts @@ -0,0 +1,158 @@ +import { createHash } from "node:crypto"; +import type { NextFunction, Request, RequestHandler, Response } from "express"; +import type { AgentPayRequest } from "../types.js"; +import { getRequestId } from "../types.js"; + +const DEFAULT_IDEMPOTENCY_TTL_MS = 10 * 60 * 1000; +const DEFAULT_IDEMPOTENCY_MAX_ENTRIES = 1000; + +type IdempotencyEntry = { + createdAt: number; + fingerprint: string; + statusCode: number; + body: unknown; +}; + +type IdempotencyOptions = { + ttlMs?: number; + maxEntries?: number; +}; + +function readPositiveIntegerEnv(name: string, fallback: number): number { + const raw = process.env[name]; + if (raw === undefined) return fallback; + const parsed = Number(raw); + return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback; +} + +function resolveOptions(options: IdempotencyOptions): Required { + return { + ttlMs: + options.ttlMs ?? + readPositiveIntegerEnv("IDEMPOTENCY_CACHE_TTL_MS", DEFAULT_IDEMPOTENCY_TTL_MS), + maxEntries: + options.maxEntries ?? + readPositiveIntegerEnv( + "IDEMPOTENCY_CACHE_MAX_ENTRIES", + DEFAULT_IDEMPOTENCY_MAX_ENTRIES + ), + }; +} + +function digest(value: string): string { + return createHash("sha256").update(value).digest("hex"); +} + +function getCallerNamespace(req: Request): string { + const apiKey = (req as AgentPayRequest).apiKey; + if (apiKey) return `api-key:${digest(apiKey)}`; + return `ip:${req.ip ?? req.socket.remoteAddress ?? "unknown"}`; +} + +function encodePrimitive(value: unknown): string { + const encoded = JSON.stringify(value); + return encoded ?? "undefined"; +} + +function stableStringify(value: unknown): string { + if (value === null || typeof value !== "object") return encodePrimitive(value); + if (Array.isArray(value)) { + return `[${value.map((item) => stableStringify(item)).join(",")}]`; + } + + const record = value as Record; + return `{${Object.keys(record) + .sort() + .map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`) + .join(",")}}`; +} + +function cloneJsonBody(body: unknown): unknown { + if (body === undefined) return undefined; + return JSON.parse(JSON.stringify(body)) as unknown; +} + +function pruneCache( + cache: Map, + now: number, + options: Required +): void { + for (const [key, entry] of cache.entries()) { + if (now - entry.createdAt >= options.ttlMs) { + cache.delete(key); + } + } + + while (cache.size > options.maxEntries) { + const oldest = cache.keys().next().value; + if (oldest === undefined) return; + cache.delete(oldest); + } +} + +/** + * Creates middleware that replays completed JSON responses for repeated + * Idempotency-Key requests from the same API key or client IP. + */ +export function createIdempotencyMiddleware( + options: IdempotencyOptions = {} +): RequestHandler { + const resolved = resolveOptions(options); + const cache = new Map(); + + return (req: Request, res: Response, next: NextFunction): void => { + const idempotencyKey = req.header("idempotency-key")?.trim(); + if (!idempotencyKey) { + next(); + return; + } + + const now = Date.now(); + pruneCache(cache, now, resolved); + + const cacheKey = `${getCallerNamespace(req)}:${digest(idempotencyKey)}`; + const fingerprint = `${req.method.toUpperCase()} ${req.path}\n${stableStringify( + req.body + )}`; + const cached = cache.get(cacheKey); + + if (cached) { + if (cached.fingerprint !== fingerprint) { + res.status(409).json({ + error: "idempotency_conflict", + message: + "Idempotency-Key was already used with a different request body or route", + requestId: getRequestId(req), + }); + return; + } + + res.setHeader("Idempotency-Replayed", "true"); + res.status(cached.statusCode).json(cached.body); + return; + } + + const originalJson = res.json.bind(res) as Response["json"]; + let captured = false; + let capturedBody: unknown; + + res.json = ((body?: unknown) => { + captured = true; + capturedBody = cloneJsonBody(body); + return originalJson(body); + }) as Response["json"]; + + res.on("finish", () => { + if (!captured) return; + cache.set(cacheKey, { + createdAt: now, + fingerprint, + statusCode: res.statusCode, + body: capturedBody, + }); + pruneCache(cache, Date.now(), resolved); + }); + + next(); + }; +} diff --git a/src/routes/usage.ts b/src/routes/usage.ts index cd28a50..d7dfd7f 100644 --- a/src/routes/usage.ts +++ b/src/routes/usage.ts @@ -1,5 +1,6 @@ import { Router, type Request, type Response } from "express"; import { recordEvent } from "../events.js"; +import { createIdempotencyMiddleware } from "../middleware/idempotency.js"; import { servicesDisabled, servicesStore, @@ -26,8 +27,9 @@ type BillingTotalBreakdown = { */ export function createUsageRouter(): Router { const router = Router(); + const idempotency = createIdempotencyMiddleware(); - router.post("/api/v1/usage", (req: Request, res: Response) => { + router.post("/api/v1/usage", idempotency, (req: Request, res: Response) => { const { agent, serviceId, requests } = req.body ?? {}; const requestId = getRequestId(req); @@ -78,7 +80,7 @@ export function createUsageRouter(): Router { res.status(201).json({ agent, serviceId, total }); }); - router.post("/api/v1/usage/bulk", (req: Request, res: Response) => { + router.post("/api/v1/usage/bulk", idempotency, (req: Request, res: Response) => { const requestId = getRequestId(req); const { items } = req.body ?? {}; if (!Array.isArray(items) || items.length === 0 || items.length > 100) { @@ -191,7 +193,7 @@ export function createUsageRouter(): Router { }); }); - router.post("/api/v1/settle", (req: Request, res: Response) => { + router.post("/api/v1/settle", idempotency, (req: Request, res: Response) => { const { agent, serviceId } = req.body ?? {}; const requestId = getRequestId(req); if (typeof agent !== "string" || typeof serviceId !== "string") {