From 8d391be9eb7dc67d660423ee965e4f0b594d4a1c Mon Sep 17 00:00:00 2001 From: Nathan Toups Date: Sun, 3 May 2026 07:53:57 -0600 Subject: [PATCH 1/7] feat(cron): structured logger + per-step debug instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit src/lib/log.ts ships a tiny Logger interface plus a console-backed provider that emits one JSON line per call via console.log. Vercel auto-parses JSON in stdout into searchable fields, so this is the platform's idiomatic shape — no external deps. Cron routes bind a child logger with their event name. Library code (stats.ts, ingest.ts, sync-geoip.ts) holds a module-level child and emits log.debug between every await — every SQL query, blob get/put, MaxMind fetch, tar extract, per-blob ingest fetch. --- src/app/api/cron/tripwire-asn-update/route.ts | 9 +- .../api/cron/tripwire-build-stats/route.ts | 14 +-- src/app/api/cron/tripwire-ingest/route.ts | 11 +- src/lib/cron-helpers.ts | 20 +--- src/lib/log.test.ts | 102 ++++++++++++++++++ src/lib/log.ts | 78 ++++++++++++++ src/lib/tripwire/ingest.ts | 44 +++++++- src/lib/tripwire/stats.ts | 56 +++++++++- src/lib/tripwire/sync-geoip.ts | 31 +++++- 9 files changed, 328 insertions(+), 37 deletions(-) create mode 100644 src/lib/log.test.ts create mode 100644 src/lib/log.ts diff --git a/src/app/api/cron/tripwire-asn-update/route.ts b/src/app/api/cron/tripwire-asn-update/route.ts index 4d6f575..c2e22af 100644 --- a/src/app/api/cron/tripwire-asn-update/route.ts +++ b/src/app/api/cron/tripwire-asn-update/route.ts @@ -7,7 +7,8 @@ import { NextResponse, type NextRequest } from "next/server" import { syncGeoipToBlob } from "@/lib/tripwire/sync-geoip" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -18,11 +19,11 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_asn_update", startedAt) + const cronLog = log.child({ event: "cron.tripwire_asn_update" }) - log({ step: "start" }) + cronLog.info({ step: "start" }) const result = await syncGeoipToBlob() - log({ step: "done", ...result }) + cronLog.info({ step: "done", elapsed_ms: Date.now() - startedAt, ...result }) return NextResponse.json({ ok: true, diff --git a/src/app/api/cron/tripwire-build-stats/route.ts b/src/app/api/cron/tripwire-build-stats/route.ts index 697a88c..9b05b2e 100644 --- a/src/app/api/cron/tripwire-build-stats/route.ts +++ b/src/app/api/cron/tripwire-build-stats/route.ts @@ -8,7 +8,8 @@ import { NextResponse, type NextRequest } from "next/server" import { revalidateTag } from "next/cache" import { buildAggregates, publishAggregates } from "@/lib/tripwire/stats" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -19,21 +20,22 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_build_stats", startedAt) + const cronLog = log.child({ event: "cron.tripwire_build_stats" }) - log({ step: "build_start" }) + cronLog.info({ step: "build_start" }) const aggregates = await buildAggregates() - log({ + cronLog.info({ step: "build_done", + elapsed_ms: Date.now() - startedAt, total: aggregates.lifetime.totalEvents, ips: aggregates.lifetime.distinctIps, asns: aggregates.lifetime.distinctAsns, }) - log({ step: "publish_start" }) + cronLog.info({ step: "publish_start" }) await publishAggregates(aggregates) revalidateTag("tripwire-aggregates", "max") - log({ step: "publish_done" }) + cronLog.info({ step: "publish_done", elapsed_ms: Date.now() - startedAt }) return NextResponse.json({ ok: true, diff --git a/src/app/api/cron/tripwire-ingest/route.ts b/src/app/api/cron/tripwire-ingest/route.ts index 9421e85..97ef833 100644 --- a/src/app/api/cron/tripwire-ingest/route.ts +++ b/src/app/api/cron/tripwire-ingest/route.ts @@ -7,7 +7,8 @@ import { NextResponse, type NextRequest } from "next/server" import { ingestNewEvents } from "@/lib/tripwire/ingest" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -20,14 +21,14 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_ingest", startedAt) + const cronLog = log.child({ event: "cron.tripwire_ingest" }) - log({ step: "start" }) + cronLog.info({ step: "start" }) const result = await ingestNewEvents({ - onProgress: log, + onProgress: (e) => cronLog.info(e), deadlineMs: startedAt + INGEST_DEADLINE_MS, }) - log({ step: "done", ...result }) + cronLog.info({ step: "done", elapsed_ms: Date.now() - startedAt, ...result }) return NextResponse.json({ ok: true, diff --git a/src/lib/cron-helpers.ts b/src/lib/cron-helpers.ts index e42977d..090db8e 100644 --- a/src/lib/cron-helpers.ts +++ b/src/lib/cron-helpers.ts @@ -1,15 +1,16 @@ // src/lib/cron-helpers.ts // // Shared boilerplate for the tripwire cron route handlers. Each route -// does ONE thing — this just removes the repeated auth check and the -// structured-log scaffolding from those handlers. +// does ONE thing — this just removes the repeated auth check. Logging +// goes through the singleton in src/lib/log.ts. import { NextResponse, type NextRequest } from "next/server" +import { log } from "@/lib/log" export function checkCronAuth(req: NextRequest): NextResponse | null { const secret = process.env.CRON_SECRET if (!secret) { - console.error("[cron] CRON_SECRET not configured") + log.error({ event: "cron.auth", reason: "no_secret" }) return NextResponse.json({ ok: false, error: "not_configured" }, { status: 500 }) } if (req.headers.get("authorization") !== `Bearer ${secret}`) { @@ -17,16 +18,3 @@ export function checkCronAuth(req: NextRequest): NextResponse | null { } return null } - -export type CronLogger = (fields: Record) => void - -export function makeCronLogger(eventName: string, startedAt: number): CronLogger { - return (fields) => - console.log( - JSON.stringify({ - event: eventName, - elapsed_ms: Date.now() - startedAt, - ...fields, - }), - ) -} diff --git a/src/lib/log.test.ts b/src/lib/log.test.ts new file mode 100644 index 0000000..0a773cf --- /dev/null +++ b/src/lib/log.test.ts @@ -0,0 +1,102 @@ +// src/lib/log.test.ts +import { describe, test, expect, mock } from "bun:test" +import { consoleLogger } from "./log" + +function captureConsole(): { lines: unknown[][]; restore: () => void } { + const lines: unknown[][] = [] + const original = console.log + const spy = mock((...args: unknown[]) => { + lines.push(args) + }) + console.log = spy + return { lines, restore: () => { console.log = original } } +} + +describe("consoleLogger", () => { + test("emits one JSON line per call with time + level + fields", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "debug" }) + log.info({ event: "boot", step: "ok" }) + expect(lines).toHaveLength(1) + expect(lines[0]).toHaveLength(1) + const record = JSON.parse(lines[0][0] as string) + expect(record.level).toBe("info") + expect(record.event).toBe("boot") + expect(record.step).toBe("ok") + expect(typeof record.time).toBe("string") + expect(Number.isNaN(Date.parse(record.time))).toBe(false) + } finally { + restore() + } + }) + + test("filters records below the threshold", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "warn" }) + log.debug({ x: 1 }) + log.info({ x: 2 }) + log.warn({ x: 3 }) + log.error({ x: 4 }) + const levels = lines.map((l) => JSON.parse(l[0] as string).level) + expect(levels).toEqual(["warn", "error"]) + } finally { + restore() + } + }) + + test("silent mutes everything", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "silent" }) + log.error({ x: 1 }) + expect(lines).toHaveLength(0) + } finally { + restore() + } + }) + + test("child merges bindings; later bindings win on key collision", () => { + const { lines, restore } = captureConsole() + try { + const root = consoleLogger({ level: "debug", bindings: { service: "tripwire", env: "test" } }) + const child = root.child({ event: "cron.ingest", env: "prod" }) + child.info({ step: "start" }) + const record = JSON.parse(lines[0][0] as string) + expect(record.service).toBe("tripwire") + expect(record.event).toBe("cron.ingest") + expect(record.env).toBe("prod") + expect(record.step).toBe("start") + } finally { + restore() + } + }) + + test("call-site fields override parent + child bindings", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "debug", bindings: { event: "from-root" } }).child({ + event: "from-child", + }) + log.info({ event: "from-call" }) + const record = JSON.parse(lines[0][0] as string) + expect(record.event).toBe("from-call") + } finally { + restore() + } + }) + + test("child inherits the parent level", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "warn" }).child({ scope: "x" }) + log.info({ skip: true }) + log.warn({ keep: true }) + expect(lines).toHaveLength(1) + expect(JSON.parse(lines[0][0] as string).keep).toBe(true) + } finally { + restore() + } + }) +}) diff --git a/src/lib/log.ts b/src/lib/log.ts new file mode 100644 index 0000000..5e8d613 --- /dev/null +++ b/src/lib/log.ts @@ -0,0 +1,78 @@ +// src/lib/log.ts +// +// Tiny structured logger. One `Logger` interface, one provider that +// writes JSON lines via console.log. Vercel auto-parses JSON in stdout +// into searchable fields in runtime logs and forwards the same to log +// drains, so this is the idiomatic shape for this platform. +// +// Why a wrapper at all instead of raw `console.log(JSON.stringify(...))`: +// the interface gives us levels, bound context (`child`), and a single +// place to swap implementations later (an HTTP drain, a no-op for tests, +// a prefixed logger for CLI scripts) without touching call sites. +// +// What this is not: it has no transports, no formatters, no redaction, +// no async I/O. If we ever need any of those, they're additive. + +export type Level = "debug" | "info" | "warn" | "error" | "silent" + +export interface Logger { + debug(fields: Record): void + info(fields: Record): void + warn(fields: Record): void + error(fields: Record): void + child(bindings: Record): Logger +} + +const RANK: Record, number> = { + debug: 10, + info: 20, + warn: 30, + error: 40, +} + +interface ConsoleLoggerOptions { + level?: Level + bindings?: Record +} + +export function consoleLogger(opts: ConsoleLoggerOptions = {}): Logger { + const level = opts.level ?? "info" + const bindings = opts.bindings ?? {} + const threshold = level === "silent" ? Number.POSITIVE_INFINITY : RANK[level] + + function emit(name: Exclude, fields: Record): void { + if (RANK[name] < threshold) return + console.log( + JSON.stringify({ + time: new Date().toISOString(), + level: name, + ...bindings, + ...fields, + }), + ) + } + + return { + debug: (f) => emit("debug", f), + info: (f) => emit("info", f), + warn: (f) => emit("warn", f), + error: (f) => emit("error", f), + child: (b) => consoleLogger({ level, bindings: { ...bindings, ...b } }), + } +} + +function readLevel(): Level { + const raw = (process.env.LOG_LEVEL ?? "info").toLowerCase() + if ( + raw === "debug" || + raw === "info" || + raw === "warn" || + raw === "error" || + raw === "silent" + ) { + return raw + } + return "info" +} + +export const log: Logger = consoleLogger({ level: readLevel() }) diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index 36a1180..57f4e09 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -15,8 +15,11 @@ import { list, get } from "@vercel/blob" import { inArray } from "drizzle-orm" import { getDb, schema } from "@/db" import { streamToText } from "@/lib/blob-stream" +import { log } from "@/lib/log" import { isTripwireEvent, type TripwireEvent } from "@/lib/tripwire/patterns" +const ilog = log.child({ event: "tripwire.ingest" }) + const DEFAULT_BATCH = 200 const ID_LOOKUP_CHUNK = 1000 @@ -65,9 +68,20 @@ async function listAllBlobs( const refs: BlobRef[] = [] let unrecognized = 0 let cursor: string | undefined + let page = 0 do { - const page = await list({ prefix: "events/", cursor }) - for (const blob of page.blobs) { + page++ + const t0 = Date.now() + ilog.debug({ step: "list.page_start", page, cursor: cursor ?? null }) + const result = await list({ prefix: "events/", cursor }) + ilog.debug({ + step: "list.page_done", + page, + elapsed_ms: Date.now() - t0, + blobs: result.blobs.length, + has_cursor: Boolean(result.cursor), + }) + for (const blob of result.blobs) { const id = idFromPathname(blob.pathname) if (!id) { unrecognized++ @@ -76,7 +90,7 @@ async function listAllBlobs( } refs.push({ pathname: blob.pathname, url: blob.url, id }) } - cursor = page.cursor + cursor = result.cursor } while (cursor) return { refs, unrecognized } } @@ -85,12 +99,25 @@ async function fetchEvent( url: string, log: (e: IngestLogEvent) => void, ): Promise { + const t0 = Date.now() + ilog.debug({ step: "fetch_event.get_start", url }) const file = await get(url, { access: "private" }) + ilog.debug({ + step: "fetch_event.get_done", + elapsed_ms: Date.now() - t0, + status: file?.statusCode ?? null, + }) if (!file || file.statusCode !== 200) { log({ step: "fetch_event.bad_status", url, statusCode: file?.statusCode ?? null }) return null } + const t1 = Date.now() const text = await streamToText(file.stream) + ilog.debug({ + step: "fetch_event.drain_done", + elapsed_ms: Date.now() - t1, + bytes: text.length, + }) let parsed: unknown try { parsed = JSON.parse(text) @@ -131,10 +158,18 @@ async function existingIds(ids: string[]): Promise> { const db = getDb() for (let i = 0; i < ids.length; i += ID_LOOKUP_CHUNK) { const chunk = ids.slice(i, i + ID_LOOKUP_CHUNK) + const t0 = Date.now() + ilog.debug({ step: "dedup.chunk_start", offset: i, size: chunk.length }) const rows = await db .select({ id: schema.tripwireEvents.id }) .from(schema.tripwireEvents) .where(inArray(schema.tripwireEvents.id, chunk)) + ilog.debug({ + step: "dedup.chunk_done", + offset: i, + elapsed_ms: Date.now() - t0, + matched: rows.length, + }) for (const r of rows) out.add(r.id) } return out @@ -143,7 +178,10 @@ async function existingIds(ids: string[]): Promise> { async function insertBatch(rows: schema.NewTripwireEventRow[]): Promise { if (rows.length === 0) return 0 const db = getDb() + const t0 = Date.now() + ilog.debug({ step: "insert.start", rows: rows.length }) await db.insert(schema.tripwireEvents).values(rows).onConflictDoNothing() + ilog.debug({ step: "insert.done", rows: rows.length, elapsed_ms: Date.now() - t0 }) return rows.length } diff --git a/src/lib/tripwire/stats.ts b/src/lib/tripwire/stats.ts index 55aac01..406a58a 100644 --- a/src/lib/tripwire/stats.ts +++ b/src/lib/tripwire/stats.ts @@ -16,12 +16,14 @@ import { Reader, type Asn, type ReaderModel } from "@maxmind/geoip2-node" import { sql } from "drizzle-orm" import { getDb } from "@/db" import { streamToBuffer } from "@/lib/blob-stream" +import { log } from "@/lib/log" import { STATS_BLOB_KEY, DEFAULT_TOP_PATHS, type Aggregates, } from "@/lib/tripwire/aggregate-shape" +const slog = log.child({ event: "tripwire.stats" }) const ASN_BLOB_KEY = "geoip/GeoLite2-ASN.mmdb" // Re-export so existing callers can keep importing from "@/lib/tripwire/stats". @@ -32,16 +34,35 @@ export { STATS_BLOB_KEY, DEFAULT_TOP_PATHS, type Aggregates } let cachedAsnReader: ReaderModel | null = null async function getAsnReader(): Promise { - if (cachedAsnReader) return cachedAsnReader + if (cachedAsnReader) { + slog.debug({ step: "asn.cache_hit" }) + return cachedAsnReader + } + const t0 = Date.now() + slog.debug({ step: "asn.blob_get_start", key: ASN_BLOB_KEY }) const file = await get(ASN_BLOB_KEY, { access: "private" }) + slog.debug({ + step: "asn.blob_get_done", + elapsed_ms: Date.now() - t0, + status: file?.statusCode ?? null, + }) if (!file || file.statusCode !== 200) { throw new Error( `Failed to fetch ${ASN_BLOB_KEY} from blob (status: ${file?.statusCode ?? "no response"}). ` + `Run the tripwire-asn-update cron / sync-geoip-to-blob.ts to populate it.`, ) } + const t1 = Date.now() + slog.debug({ step: "asn.stream_to_buffer_start" }) const buf = await streamToBuffer(file.stream) + slog.debug({ + step: "asn.stream_to_buffer_done", + elapsed_ms: Date.now() - t1, + bytes: buf.length, + }) + const t2 = Date.now() cachedAsnReader = Reader.openBuffer(buf) + slog.debug({ step: "asn.reader_open_done", elapsed_ms: Date.now() - t2 }) return cachedAsnReader } @@ -83,6 +104,8 @@ export async function buildAggregates( ): Promise { const db = getDb() + const tQ1 = Date.now() + slog.debug({ step: "sql.lifetime.start" }) const lifetimeResult = await db.execute(sql` SELECT COUNT(*)::int AS total_events, @@ -92,11 +115,14 @@ export async function buildAggregates( COUNT(DISTINCT path)::int AS distinct_paths FROM tripwire_events `) + slog.debug({ step: "sql.lifetime.done", elapsed_ms: Date.now() - tQ1 }) const lifetime = lifetimeResult.rows[0] if (!lifetime || lifetime.total_events === 0) { throw new Error("no events in tripwire_events; run ingest first") } + const tQ2 = Date.now() + slog.debug({ step: "sql.byCategory.start" }) const byCategory = await db.execute(sql` SELECT category, COUNT(*)::int AS count FROM tripwire_events @@ -104,14 +130,20 @@ export async function buildAggregates( GROUP BY category ORDER BY count DESC, category ASC `) + slog.debug({ step: "sql.byCategory.done", elapsed_ms: Date.now() - tQ2, rows: byCategory.rows.length }) + const tQ3 = Date.now() + slog.debug({ step: "sql.byUaFamily.start" }) const byUaFamily = await db.execute(sql` SELECT COALESCE(ua_family, 'unknown') AS ua, COUNT(*)::int AS count FROM tripwire_events GROUP BY ua ORDER BY count DESC, ua ASC `) + slog.debug({ step: "sql.byUaFamily.done", elapsed_ms: Date.now() - tQ3, rows: byUaFamily.rows.length }) + const tQ4 = Date.now() + slog.debug({ step: "sql.byDay.start" }) const byDay = await db.execute(sql` SELECT TO_CHAR(ts AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS date, COUNT(*)::int AS count @@ -119,7 +151,10 @@ export async function buildAggregates( GROUP BY date ORDER BY date ASC `) + slog.debug({ step: "sql.byDay.done", elapsed_ms: Date.now() - tQ4, rows: byDay.rows.length }) + const tQ5 = Date.now() + slog.debug({ step: "sql.topPaths.start", limit: topPathsLimit }) const topPaths = await db.execute(sql` SELECT path, COUNT(*)::int AS count, @@ -129,19 +164,27 @@ export async function buildAggregates( ORDER BY count DESC, path ASC LIMIT ${topPathsLimit} `) + slog.debug({ step: "sql.topPaths.done", elapsed_ms: Date.now() - tQ5, rows: topPaths.rows.length }) // ASN enrichment at query time: fold each event's IP through the mmdb // and roll up. Lifetime.distinctAsns is computed from the rolled-up map // rather than from a SQL DISTINCT — the column-stored value is no // longer the source of truth for ASN since we stopped writing it during // ingest. + const tQ6 = Date.now() + slog.debug({ step: "sql.ipCounts.start" }) const ipCountsResult = await db.execute(sql` SELECT ip, COUNT(*)::int AS count FROM tripwire_events WHERE ip IS NOT NULL AND ip <> '' GROUP BY ip `) + slog.debug({ step: "sql.ipCounts.done", elapsed_ms: Date.now() - tQ6, rows: ipCountsResult.rows.length }) + const reader = await getAsnReader() + + const tEnrich = Date.now() + slog.debug({ step: "asn.enrich.start", ips: ipCountsResult.rows.length }) const asnTotals = new Map() for (const row of ipCountsResult.rows) { const lookup = lookupAsn(reader, row.ip) @@ -153,6 +196,11 @@ export async function buildAggregates( const byAsn = [...asnTotals.entries()] .map(([asn, v]) => ({ asn, name: v.name, count: v.count })) .sort((a, b) => (b.count - a.count) || a.asn.localeCompare(b.asn)) + slog.debug({ + step: "asn.enrich.done", + elapsed_ms: Date.now() - tEnrich, + asns: byAsn.length, + }) const earliestDate = new Date(lifetime.earliest_ts) const daysSinceFirst = Math.max( @@ -184,10 +232,14 @@ export async function buildAggregates( } export async function publishAggregates(agg: Aggregates): Promise { - await put(STATS_BLOB_KEY, JSON.stringify(agg, null, 2), { + const body = JSON.stringify(agg, null, 2) + const t0 = Date.now() + slog.debug({ step: "publish.put_start", key: STATS_BLOB_KEY, bytes: body.length }) + await put(STATS_BLOB_KEY, body, { access: "private", contentType: "application/json", addRandomSuffix: false, allowOverwrite: true, }) + slog.debug({ step: "publish.put_done", elapsed_ms: Date.now() - t0 }) } diff --git a/src/lib/tripwire/sync-geoip.ts b/src/lib/tripwire/sync-geoip.ts index e20c0dc..b81ffe2 100644 --- a/src/lib/tripwire/sync-geoip.ts +++ b/src/lib/tripwire/sync-geoip.ts @@ -11,6 +11,9 @@ import { put } from "@vercel/blob" import { gunzipSync } from "node:zlib" +import { log } from "@/lib/log" + +const glog = log.child({ event: "tripwire.sync_geoip" }) const DOWNLOAD_URL = "https://download.maxmind.com/geoip/databases/GeoLite2-ASN/download?suffix=tar.gz" @@ -28,16 +31,31 @@ async function downloadTarball( licenseKey: string, ): Promise { const auth = Buffer.from(`${accountId}:${licenseKey}`).toString("base64") + const t0 = Date.now() + glog.debug({ step: "maxmind.fetch_start", url: DOWNLOAD_URL }) const res = await fetch(DOWNLOAD_URL, { headers: { Authorization: `Basic ${auth}` }, }) + glog.debug({ + step: "maxmind.fetch_headers", + elapsed_ms: Date.now() - t0, + status: res.status, + content_length: res.headers.get("content-length"), + }) if (!res.ok) { const body = await res.text().catch(() => "") throw new Error( `MaxMind download failed: ${res.status} ${res.statusText}. ${body.slice(0, 200)}`, ) } - return Buffer.from(await res.arrayBuffer()) + const t1 = Date.now() + const buf = Buffer.from(await res.arrayBuffer()) + glog.debug({ + step: "maxmind.fetch_body_done", + elapsed_ms: Date.now() - t1, + bytes: buf.length, + }) + return buf } // POSIX ustar header. We only need name, size, typeflag, prefix. @@ -94,14 +112,25 @@ export async function syncGeoipToBlob(): Promise { } const tarball = await downloadTarball(accountId, licenseKey) + + const tExtract = Date.now() + glog.debug({ step: "extract.start", bytes: tarball.length }) const mmdb = extractFileFromTarGz(tarball, MMDB_NAME) + glog.debug({ + step: "extract.done", + elapsed_ms: Date.now() - tExtract, + mmdb_bytes: mmdb.length, + }) + const tPut = Date.now() + glog.debug({ step: "blob.put_start", key: ASN_BLOB_KEY, bytes: mmdb.length }) await put(ASN_BLOB_KEY, mmdb, { access: "private", contentType: "application/octet-stream", addRandomSuffix: false, allowOverwrite: true, }) + glog.debug({ step: "blob.put_done", elapsed_ms: Date.now() - tPut }) return { tarballBytes: tarball.length, From 69f8cec9a495e81ac4eb184aa9fa11c2459ab0e8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 17:06:45 +0000 Subject: [PATCH 2/7] chore(log): default to debug outside production VERCEL_ENV=production keeps the existing info default. Preview, dev, and local all default to debug, so per-step cron traces show up on preview deploys without a manual env var. LOG_LEVEL still overrides. --- src/lib/log.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lib/log.ts b/src/lib/log.ts index 5e8d613..afd9faa 100644 --- a/src/lib/log.ts +++ b/src/lib/log.ts @@ -62,7 +62,12 @@ export function consoleLogger(opts: ConsoleLoggerOptions = {}): Logger { } function readLevel(): Level { - const raw = (process.env.LOG_LEVEL ?? "info").toLowerCase() + // Default: quiet on production, debug everywhere else (preview, dev). + // Crons run identical code in every environment, so a preview deploy + // gets the per-step trace without needing a manual env var. LOG_LEVEL + // overrides if set explicitly. + const fallback = process.env.VERCEL_ENV === "production" ? "info" : "debug" + const raw = (process.env.LOG_LEVEL ?? fallback).toLowerCase() if ( raw === "debug" || raw === "info" || From 188251379227f8cac5baaa8287bf01cbeab19108 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 17:53:56 +0000 Subject: [PATCH 3/7] fix(tripwire): drain blob streams via Response, drop blob-stream helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunk-loop helper in src/lib/blob-stream.ts was a workaround for a Vercel Fluid Compute (Node.js) hang. vercel.json now pins bunVersion: "1.x" and the chunk loop hangs on Bun instead — confirmed on the build-stats cron at the asn.stream_to_buffer_start step. The Response().arrayBuffer() / .text() pattern is what the scripts have been using directly for months and what the helper's own comment said works on Bun. Inline it at the three call sites and delete the helper. One drain pattern, used everywhere. --- src/lib/blob-stream.ts | 30 ------------------------------ src/lib/tripwire/aggregates.ts | 3 +-- src/lib/tripwire/ingest.ts | 3 +-- src/lib/tripwire/stats.ts | 3 +-- 4 files changed, 3 insertions(+), 36 deletions(-) delete mode 100644 src/lib/blob-stream.ts diff --git a/src/lib/blob-stream.ts b/src/lib/blob-stream.ts deleted file mode 100644 index cb4dc7a..0000000 --- a/src/lib/blob-stream.ts +++ /dev/null @@ -1,30 +0,0 @@ -// src/lib/blob-stream.ts -// -// Drain a Web ReadableStream returned by @vercel/blob's get() into a -// Buffer / string. -// -// We had been doing `Buffer.from(await new Response(stream).arrayBuffer())` -// and the equivalent `.text()`. Both work in Bun and locally in `next dev`. -// In production on Vercel Fluid Compute (Node.js), wrapping the blob stream -// in a Web `Response` and reading it back hangs forever — the cron's -// 12 MB ASN db fetch never produced a `drain_done` log line, the function -// just sat at full elapsed_ms until the 300s platform timeout. -// -// Pulling chunks straight off the reader avoids whatever `Response`-wrap -// quirk is at play and is also less indirection. - -export async function streamToBuffer(stream: ReadableStream): Promise { - const reader = stream.getReader() - const chunks: Uint8Array[] = [] - for (;;) { - const { done, value } = await reader.read() - if (done) break - if (value) chunks.push(value) - } - return Buffer.concat(chunks) -} - -export async function streamToText(stream: ReadableStream): Promise { - const buf = await streamToBuffer(stream) - return buf.toString("utf8") -} diff --git a/src/lib/tripwire/aggregates.ts b/src/lib/tripwire/aggregates.ts index 798f2bd..cef527a 100644 --- a/src/lib/tripwire/aggregates.ts +++ b/src/lib/tripwire/aggregates.ts @@ -11,7 +11,6 @@ // failure is better than silently lying about freshness. import { get } from "@vercel/blob" -import { streamToText } from "@/lib/blob-stream" import { STATS_BLOB_KEY, type Aggregates } from "@/lib/tripwire/aggregate-shape" const TTL_MS = 2 * 60 * 1000 @@ -28,7 +27,7 @@ export async function getAggregates(): Promise { `blob get failed (status ${file?.statusCode ?? "no response"})`, ) } - const text = await streamToText(file.stream) + const text = await new Response(file.stream).text() const data = JSON.parse(text) as Aggregates cached = { data, fetchedAt: Date.now() } return data diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index 57f4e09..59d48b9 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -14,7 +14,6 @@ import { list, get } from "@vercel/blob" import { inArray } from "drizzle-orm" import { getDb, schema } from "@/db" -import { streamToText } from "@/lib/blob-stream" import { log } from "@/lib/log" import { isTripwireEvent, type TripwireEvent } from "@/lib/tripwire/patterns" @@ -112,7 +111,7 @@ async function fetchEvent( return null } const t1 = Date.now() - const text = await streamToText(file.stream) + const text = await new Response(file.stream).text() ilog.debug({ step: "fetch_event.drain_done", elapsed_ms: Date.now() - t1, diff --git a/src/lib/tripwire/stats.ts b/src/lib/tripwire/stats.ts index 406a58a..94cca99 100644 --- a/src/lib/tripwire/stats.ts +++ b/src/lib/tripwire/stats.ts @@ -15,7 +15,6 @@ import { get, put } from "@vercel/blob" import { Reader, type Asn, type ReaderModel } from "@maxmind/geoip2-node" import { sql } from "drizzle-orm" import { getDb } from "@/db" -import { streamToBuffer } from "@/lib/blob-stream" import { log } from "@/lib/log" import { STATS_BLOB_KEY, @@ -54,7 +53,7 @@ async function getAsnReader(): Promise { } const t1 = Date.now() slog.debug({ step: "asn.stream_to_buffer_start" }) - const buf = await streamToBuffer(file.stream) + const buf = Buffer.from(await new Response(file.stream).arrayBuffer()) slog.debug({ step: "asn.stream_to_buffer_done", elapsed_ms: Date.now() - t1, From 9af05696283f723f6363978465477c93b3e64be1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 18:21:02 +0000 Subject: [PATCH 4/7] fix(tripwire): bypass blob SDK for ASN read; tag-cache the fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Vercel Blob SDK's get() returns response.body as a stream and lets the underlying Response go out of scope. Under Bun on Vercel the body stream then never reaches EOF for large bodies — neither the chunk-loop nor Response.arrayBuffer() drain the 12MB mmdb. Switch to head() for the URL (small metadata, no body-stream issue) plus a direct fetch with the bearer token. The response stays in scope across arrayBuffer(), and the read uses Next's data cache via next.tags=[ASN_BLOB_TAG]. tripwire-asn-update calls revalidateTag after a successful upload, so subsequent build-stats runs serve the mmdb from cache and only refetch when the data actually changed. --- src/app/api/cron/tripwire-asn-update/route.ts | 6 +- src/lib/tripwire/stats.ts | 56 ++++++++++++------- src/lib/tripwire/sync-geoip.ts | 4 ++ 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/src/app/api/cron/tripwire-asn-update/route.ts b/src/app/api/cron/tripwire-asn-update/route.ts index c2e22af..f92aa27 100644 --- a/src/app/api/cron/tripwire-asn-update/route.ts +++ b/src/app/api/cron/tripwire-asn-update/route.ts @@ -6,7 +6,8 @@ // whatever ASN db is currently in blob. import { NextResponse, type NextRequest } from "next/server" -import { syncGeoipToBlob } from "@/lib/tripwire/sync-geoip" +import { revalidateTag } from "next/cache" +import { syncGeoipToBlob, ASN_BLOB_TAG } from "@/lib/tripwire/sync-geoip" import { checkCronAuth } from "@/lib/cron-helpers" import { log } from "@/lib/log" @@ -23,6 +24,9 @@ export async function GET(req: NextRequest): Promise { cronLog.info({ step: "start" }) const result = await syncGeoipToBlob() + // Invalidate the build-stats fetch cache so the next build-stats run + // picks up the freshly-uploaded mmdb instead of serving the stale one. + revalidateTag(ASN_BLOB_TAG, "max") cronLog.info({ step: "done", elapsed_ms: Date.now() - startedAt, ...result }) return NextResponse.json({ diff --git a/src/lib/tripwire/stats.ts b/src/lib/tripwire/stats.ts index 94cca99..6daf0af 100644 --- a/src/lib/tripwire/stats.ts +++ b/src/lib/tripwire/stats.ts @@ -11,11 +11,12 @@ // it across cron invocations and only the first cold instance pays the // ~10MB blob fetch. -import { get, put } from "@vercel/blob" +import { head, put } from "@vercel/blob" import { Reader, type Asn, type ReaderModel } from "@maxmind/geoip2-node" import { sql } from "drizzle-orm" import { getDb } from "@/db" import { log } from "@/lib/log" +import { ASN_BLOB_KEY, ASN_BLOB_TAG } from "@/lib/tripwire/sync-geoip" import { STATS_BLOB_KEY, DEFAULT_TOP_PATHS, @@ -23,7 +24,6 @@ import { } from "@/lib/tripwire/aggregate-shape" const slog = log.child({ event: "tripwire.stats" }) -const ASN_BLOB_KEY = "geoip/GeoLite2-ASN.mmdb" // Re-export so existing callers can keep importing from "@/lib/tripwire/stats". // The page-side loader imports STATS_BLOB_KEY straight from aggregate-shape so @@ -37,31 +37,47 @@ async function getAsnReader(): Promise { slog.debug({ step: "asn.cache_hit" }) return cachedAsnReader } - const t0 = Date.now() - slog.debug({ step: "asn.blob_get_start", key: ASN_BLOB_KEY }) - const file = await get(ASN_BLOB_KEY, { access: "private" }) + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + // head() resolves the (stable) blob URL for the pathname. The body is + // small JSON metadata, so it doesn't trip the large-body stream hang we + // hit when calling get() on the 12MB mmdb directly. + const tHead = Date.now() + slog.debug({ step: "asn.head_start", key: ASN_BLOB_KEY }) + const meta = await head(ASN_BLOB_KEY) + slog.debug({ step: "asn.head_done", elapsed_ms: Date.now() - tHead, url: meta.url }) + + // Direct fetch with the bearer token, tagged for the Next.js data cache. + // tripwire-asn-update calls revalidateTag(ASN_BLOB_TAG) after a fresh put, + // so we only pay for the 12MB drain when the mmdb actually changed. + const tFetch = Date.now() + slog.debug({ step: "asn.fetch_start" }) + const res = await fetch(meta.url, { + headers: { authorization: `Bearer ${token}` }, + next: { tags: [ASN_BLOB_TAG] }, + }) slog.debug({ - step: "asn.blob_get_done", - elapsed_ms: Date.now() - t0, - status: file?.statusCode ?? null, + step: "asn.fetch_done", + elapsed_ms: Date.now() - tFetch, + status: res.status, + cache: res.headers.get("x-vercel-cache"), }) - if (!file || file.statusCode !== 200) { + if (!res.ok) { throw new Error( - `Failed to fetch ${ASN_BLOB_KEY} from blob (status: ${file?.statusCode ?? "no response"}). ` + + `Failed to fetch ${ASN_BLOB_KEY} (status: ${res.status} ${res.statusText}). ` + `Run the tripwire-asn-update cron / sync-geoip-to-blob.ts to populate it.`, ) } - const t1 = Date.now() - slog.debug({ step: "asn.stream_to_buffer_start" }) - const buf = Buffer.from(await new Response(file.stream).arrayBuffer()) - slog.debug({ - step: "asn.stream_to_buffer_done", - elapsed_ms: Date.now() - t1, - bytes: buf.length, - }) - const t2 = Date.now() + + const tBuf = Date.now() + slog.debug({ step: "asn.array_buffer_start" }) + const buf = Buffer.from(await res.arrayBuffer()) + slog.debug({ step: "asn.array_buffer_done", elapsed_ms: Date.now() - tBuf, bytes: buf.length }) + + const tOpen = Date.now() cachedAsnReader = Reader.openBuffer(buf) - slog.debug({ step: "asn.reader_open_done", elapsed_ms: Date.now() - t2 }) + slog.debug({ step: "asn.reader_open_done", elapsed_ms: Date.now() - tOpen }) return cachedAsnReader } diff --git a/src/lib/tripwire/sync-geoip.ts b/src/lib/tripwire/sync-geoip.ts index b81ffe2..b5c9c0f 100644 --- a/src/lib/tripwire/sync-geoip.ts +++ b/src/lib/tripwire/sync-geoip.ts @@ -18,6 +18,10 @@ const glog = log.child({ event: "tripwire.sync_geoip" }) const DOWNLOAD_URL = "https://download.maxmind.com/geoip/databases/GeoLite2-ASN/download?suffix=tar.gz" export const ASN_BLOB_KEY = "geoip/GeoLite2-ASN.mmdb" +// Next.js fetch-cache tag. Build-stats fetches the mmdb with this tag; +// this cron calls revalidateTag after a successful upload, so subsequent +// build-stats runs get the new mmdb without paying for the 12MB drain. +export const ASN_BLOB_TAG = "asn-mmdb" const MMDB_NAME = "GeoLite2-ASN.mmdb" export interface SyncGeoipResult { From 10ea1f5b633bdc049d589002ba03628c03a1a6b1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 18:28:49 +0000 Subject: [PATCH 5/7] fix(tripwire): bypass blob SDK get() for ingest + page loader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same root cause as the ASN cron fix: @vercel/blob's get() returns response.body and lets the Response go out of scope, which under Bun on Vercel can leave the body stream stuck waiting for EOF — that's the most likely source of the ingest cron's flakiness on big runs. ingest.ts: per-event JSON read uses fetch with cache: "no-store" since each event is read exactly once. aggregates.ts: page-side loader uses head() + fetch with the STATS_BLOB_TAG cache tag. The build-stats cron already invalidates that tag after publishing, so warm pages flip to fresh aggregates without polling. The 2-min module-level singleton stays in front as an instance-local burst absorber. Also moves the literal "tripwire-aggregates" tag string into aggregate-shape.ts so producer (cron) and consumer (page) import the same constant. --- .../api/cron/tripwire-build-stats/route.ts | 3 +- src/lib/tripwire/aggregate-shape.ts | 4 + src/lib/tripwire/aggregates.test.ts | 79 ++++++++++++------- src/lib/tripwire/aggregates.ts | 39 ++++++--- src/lib/tripwire/ingest.ts | 26 ++++-- 5 files changed, 99 insertions(+), 52 deletions(-) diff --git a/src/app/api/cron/tripwire-build-stats/route.ts b/src/app/api/cron/tripwire-build-stats/route.ts index 9b05b2e..4965275 100644 --- a/src/app/api/cron/tripwire-build-stats/route.ts +++ b/src/app/api/cron/tripwire-build-stats/route.ts @@ -8,6 +8,7 @@ import { NextResponse, type NextRequest } from "next/server" import { revalidateTag } from "next/cache" import { buildAggregates, publishAggregates } from "@/lib/tripwire/stats" +import { STATS_BLOB_TAG } from "@/lib/tripwire/aggregate-shape" import { checkCronAuth } from "@/lib/cron-helpers" import { log } from "@/lib/log" @@ -34,7 +35,7 @@ export async function GET(req: NextRequest): Promise { cronLog.info({ step: "publish_start" }) await publishAggregates(aggregates) - revalidateTag("tripwire-aggregates", "max") + revalidateTag(STATS_BLOB_TAG, "max") cronLog.info({ step: "publish_done", elapsed_ms: Date.now() - startedAt }) return NextResponse.json({ diff --git a/src/lib/tripwire/aggregate-shape.ts b/src/lib/tripwire/aggregate-shape.ts index 5ebb552..9fd0cde 100644 --- a/src/lib/tripwire/aggregate-shape.ts +++ b/src/lib/tripwire/aggregate-shape.ts @@ -8,6 +8,10 @@ // before this split. export const STATS_BLOB_KEY = "stats/tripwire-aggregates.json" +// Next.js fetch-cache tag. The page-side loader fetches with this tag; +// the build-stats cron calls revalidateTag after a successful publish, +// so warm pages flip to fresh aggregates without polling on a TTL. +export const STATS_BLOB_TAG = "tripwire-aggregates" export const DEFAULT_TOP_PATHS = 100 export interface Aggregates { diff --git a/src/lib/tripwire/aggregates.test.ts b/src/lib/tripwire/aggregates.test.ts index e77cdf1..f88b3ae 100644 --- a/src/lib/tripwire/aggregates.test.ts +++ b/src/lib/tripwire/aggregates.test.ts @@ -1,7 +1,7 @@ // src/lib/tripwire/aggregates.test.ts import { describe, test, expect, beforeEach, mock } from "bun:test" import * as blob from "@vercel/blob" -import type { Aggregates } from "@/lib/tripwire/aggregate-shape" +import { STATS_BLOB_TAG, type Aggregates } from "@/lib/tripwire/aggregate-shape" const SAMPLE: Aggregates = { generatedAt: "2026-05-02T00:00:00.000Z", @@ -21,53 +21,67 @@ const SAMPLE: Aggregates = { byAsn: [], } -interface GetCall { - pathname: string - options: Record -} -const getCalls: GetCall[] = [] -type GetMode = "ok" | "bad-status" -let getMode: GetMode = "ok" +const FAKE_URL = "https://store.private.blob.vercel-storage.com/stats/tripwire-aggregates.json" + +interface HeadCall { pathname: string } +interface FetchCall { url: string; init: RequestInit | undefined } + +const headCalls: HeadCall[] = [] +const fetchCalls: FetchCall[] = [] +type FetchMode = "ok" | "bad-status" +let fetchMode: FetchMode = "ok" mock.module("@vercel/blob", () => ({ ...blob, - get: async (pathname: string, options: Record) => { - getCalls.push({ pathname, options }) - if (getMode === "bad-status") return { stream: null, statusCode: 404 } - const text = JSON.stringify(SAMPLE) - const stream = new ReadableStream({ - start(c) { - c.enqueue(new TextEncoder().encode(text)) - c.close() - }, - }) - return { stream, statusCode: 200 } + head: async (pathname: string) => { + headCalls.push({ pathname }) + return { url: FAKE_URL, pathname } }, })) +const realFetch = globalThis.fetch +globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + fetchCalls.push({ url: String(input), init }) + if (fetchMode === "bad-status") { + return new Response("nope", { status: 404, statusText: "Not Found" }) + } + return new Response(JSON.stringify(SAMPLE), { + status: 200, + headers: { "content-type": "application/json" }, + }) +}) as typeof fetch + +process.env.BLOB_READ_WRITE_TOKEN = "vercel_blob_rw_test_token" + const { getAggregates, _resetAggregatesCacheForTests } = await import("./aggregates") beforeEach(() => { _resetAggregatesCacheForTests() - getCalls.length = 0 - getMode = "ok" + headCalls.length = 0 + fetchCalls.length = 0 + fetchMode = "ok" }) describe("getAggregates", () => { test("cache miss fetches and parses the blob", async () => { const result = await getAggregates() expect(result).toEqual(SAMPLE) - expect(getCalls).toHaveLength(1) - expect(getCalls[0].pathname).toBe("stats/tripwire-aggregates.json") - expect(getCalls[0].options).toMatchObject({ access: "private" }) + expect(headCalls).toHaveLength(1) + expect(headCalls[0].pathname).toBe("stats/tripwire-aggregates.json") + expect(fetchCalls).toHaveLength(1) + expect(fetchCalls[0].url).toBe(FAKE_URL) + const headers = new Headers(fetchCalls[0].init?.headers) + expect(headers.get("authorization")).toBe("Bearer vercel_blob_rw_test_token") + const next = (fetchCalls[0].init as { next?: { tags?: string[] } } | undefined)?.next + expect(next?.tags).toEqual([STATS_BLOB_TAG]) }) test("cache hit within TTL skips the fetch", async () => { await getAggregates() - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) const result = await getAggregates() expect(result).toEqual(SAMPLE) - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) }) test("expired TTL triggers a fresh fetch", async () => { @@ -76,17 +90,22 @@ describe("getAggregates", () => { Date.now = () => now try { await getAggregates() - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) now += 2 * 60 * 1000 + 1 await getAggregates() - expect(getCalls).toHaveLength(2) + expect(fetchCalls).toHaveLength(2) } finally { Date.now = realNow } }) - test("throws when get() returns a non-200 status", async () => { - getMode = "bad-status" + test("throws when fetch returns a non-200 status", async () => { + fetchMode = "bad-status" await expect(getAggregates()).rejects.toThrow(/status 404/) }) }) + +// Restore real fetch so other test files aren't affected. +process.on("beforeExit", () => { + globalThis.fetch = realFetch +}) diff --git a/src/lib/tripwire/aggregates.ts b/src/lib/tripwire/aggregates.ts index cef527a..e727f5d 100644 --- a/src/lib/tripwire/aggregates.ts +++ b/src/lib/tripwire/aggregates.ts @@ -1,17 +1,26 @@ // src/lib/tripwire/aggregates.ts // // Page-side analytics blob loader. The build-stats cron republishes -// stats/tripwire-aggregates.json every ~15 minutes. We hold the parsed -// JSON in a module-level singleton with a 2-minute TTL so a warm -// Fluid Compute instance only pays for the blob fetch once per window. -// On TTL expiry the next request triggers a fresh fetch. +// stats/tripwire-aggregates.json every ~15 minutes and calls +// revalidateTag(STATS_BLOB_TAG) right after, so the page-side fetch is +// served from Next's data cache until that exact moment of invalidation. +// +// We also keep a 2-minute module-level singleton in front of the fetch: +// the data cache lives in the regional edge, the singleton lives in the +// running instance. The singleton absorbs bursty page traffic on a warm +// instance without crossing the network at all. Stale data is fine for +// up to 2 minutes — the cron only runs every 15. // // On any fetch error we throw — `src/app/x/tripwire/error.tsx` surfaces // a retry button. We deliberately don't fall back to stale data; a hard // failure is better than silently lying about freshness. -import { get } from "@vercel/blob" -import { STATS_BLOB_KEY, type Aggregates } from "@/lib/tripwire/aggregate-shape" +import { head } from "@vercel/blob" +import { + STATS_BLOB_KEY, + STATS_BLOB_TAG, + type Aggregates, +} from "@/lib/tripwire/aggregate-shape" const TTL_MS = 2 * 60 * 1000 @@ -21,14 +30,18 @@ export async function getAggregates(): Promise { if (cached && Date.now() - cached.fetchedAt < TTL_MS) { return cached.data } - const file = await get(STATS_BLOB_KEY, { access: "private" }) - if (!file || file.statusCode !== 200) { - throw new Error( - `blob get failed (status ${file?.statusCode ?? "no response"})`, - ) + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + const meta = await head(STATS_BLOB_KEY) + const res = await fetch(meta.url, { + headers: { authorization: `Bearer ${token}` }, + next: { tags: [STATS_BLOB_TAG] }, + }) + if (!res.ok) { + throw new Error(`blob fetch failed (status ${res.status} ${res.statusText})`) } - const text = await new Response(file.stream).text() - const data = JSON.parse(text) as Aggregates + const data = (await res.json()) as Aggregates cached = { data, fetchedAt: Date.now() } return data } diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index 59d48b9..f849182 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -11,7 +11,7 @@ // Pure library: no console.log, no process.exit. Callers (the CLI script // and the cron route) decide how to log and surface results. -import { list, get } from "@vercel/blob" +import { list } from "@vercel/blob" import { inArray } from "drizzle-orm" import { getDb, schema } from "@/db" import { log } from "@/lib/log" @@ -98,20 +98,30 @@ async function fetchEvent( url: string, log: (e: IngestLogEvent) => void, ): Promise { + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + // Direct fetch instead of @vercel/blob's get(): get() returns + // response.body and lets the Response go out of scope, which under + // Bun on Vercel can leave the body stream stuck waiting for EOF. + // Each event JSON is read once, so no caching. const t0 = Date.now() - ilog.debug({ step: "fetch_event.get_start", url }) - const file = await get(url, { access: "private" }) + ilog.debug({ step: "fetch_event.fetch_start", url }) + const res = await fetch(url, { + headers: { authorization: `Bearer ${token}` }, + cache: "no-store", + }) ilog.debug({ - step: "fetch_event.get_done", + step: "fetch_event.fetch_done", elapsed_ms: Date.now() - t0, - status: file?.statusCode ?? null, + status: res.status, }) - if (!file || file.statusCode !== 200) { - log({ step: "fetch_event.bad_status", url, statusCode: file?.statusCode ?? null }) + if (!res.ok) { + log({ step: "fetch_event.bad_status", url, statusCode: res.status }) return null } const t1 = Date.now() - const text = await new Response(file.stream).text() + const text = await res.text() ilog.debug({ step: "fetch_event.drain_done", elapsed_ms: Date.now() - t1, From e5265b860bc6c892e4e8ad98285313ed748b5593 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 18:35:06 +0000 Subject: [PATCH 6/7] fix(tripwire): bypass blob SDK list() in ingest The ingest cron was hanging at list.page_start with no progress: same root cause as the get() and head() drain hangs. requestApi inside @vercel/blob ends with await apiResponse.json(), which under Bun on Vercel leaves the body stream stuck waiting for EOF after the SDK's internal Response goes out of scope. Inline a direct fetch against the public list endpoint with the same auth + x-api-version headers the SDK sends. The Response stays in scope across .json() and the call completes. --- src/lib/tripwire/ingest.ts | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index f849182..2766d38 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -11,7 +11,6 @@ // Pure library: no console.log, no process.exit. Callers (the CLI script // and the cron route) decide how to log and surface results. -import { list } from "@vercel/blob" import { inArray } from "drizzle-orm" import { getDb, schema } from "@/db" import { log } from "@/lib/log" @@ -61,6 +60,35 @@ function idFromPathname(pathname: string): string | null { return match ? match[2] : null } +interface BlobListPage { + blobs: Array<{ pathname: string; url: string; size: number; uploadedAt: string }> + cursor?: string + hasMore: boolean +} + +// Direct call to Vercel Blob's list API. We bypass @vercel/blob's list() +// for the same reason we bypass get(): the SDK ends in `apiResponse.json()` +// after the Response object goes out of scope, which under Bun on Vercel +// can leave the body stream stuck waiting for EOF. By keeping our own +// Response in scope across the .json() drain, the request completes. +async function listEventsPage(cursor: string | undefined): Promise { + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + const params = new URLSearchParams({ prefix: "events/" }) + if (cursor) params.set("cursor", cursor) + const res = await fetch(`https://vercel.com/api/blob/?${params}`, { + headers: { + authorization: `Bearer ${token}`, + "x-api-version": "12", + }, + cache: "no-store", + }) + if (!res.ok) { + throw new Error(`blob list failed: ${res.status} ${res.statusText}`) + } + return (await res.json()) as BlobListPage +} + async function listAllBlobs( log: (e: IngestLogEvent) => void, ): Promise<{ refs: BlobRef[]; unrecognized: number }> { @@ -72,7 +100,7 @@ async function listAllBlobs( page++ const t0 = Date.now() ilog.debug({ step: "list.page_start", page, cursor: cursor ?? null }) - const result = await list({ prefix: "events/", cursor }) + const result = await listEventsPage(cursor) ilog.debug({ step: "list.page_done", page, From c1cf35f978d4b0eaa6224a0b77430d80509ad879 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 18:38:51 +0000 Subject: [PATCH 7/7] feat(tripwire): bound ingest list to the trailing 2 UTC days The cron was listing the entire events/ prefix every 5 minutes, which grows linearly with the lifetime event count. Switch to listing today + yesterday by exact UTC-date prefix (events//), bounded by INGEST_WINDOW_DAYS=2. Events older than the window are not auto-ingested by the cron path; the CLI script (scripts/tripwire/ingest-events.ts) still walks the full events/ prefix and can backfill manually if a longer outage happens. --- src/lib/tripwire/ingest.test.ts | 30 ++++++++++++++ src/lib/tripwire/ingest.ts | 71 +++++++++++++++++++++------------ 2 files changed, 75 insertions(+), 26 deletions(-) create mode 100644 src/lib/tripwire/ingest.test.ts diff --git a/src/lib/tripwire/ingest.test.ts b/src/lib/tripwire/ingest.test.ts new file mode 100644 index 0000000..9310641 --- /dev/null +++ b/src/lib/tripwire/ingest.test.ts @@ -0,0 +1,30 @@ +// src/lib/tripwire/ingest.test.ts +import { describe, test, expect } from "bun:test" +import { recentDatePrefixes } from "./ingest" + +describe("recentDatePrefixes", () => { + test("returns today and yesterday in UTC, today first", () => { + const now = new Date("2026-05-03T12:34:56.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-05-03/", + "events/2026-05-02/", + ]) + }) + + test("crosses month boundary correctly", () => { + const now = new Date("2026-06-01T00:30:00.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-06-01/", + "events/2026-05-31/", + ]) + }) + + test("uses UTC, not local time, just before midnight UTC", () => { + // Late on the 3rd UTC -> today=03, yesterday=02 regardless of host TZ. + const now = new Date("2026-05-03T23:59:59.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-05-03/", + "events/2026-05-02/", + ]) + }) +}) diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index 2766d38..d1bf0d6 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -71,10 +71,10 @@ interface BlobListPage { // after the Response object goes out of scope, which under Bun on Vercel // can leave the body stream stuck waiting for EOF. By keeping our own // Response in scope across the .json() drain, the request completes. -async function listEventsPage(cursor: string | undefined): Promise { +async function listBlobsPage(prefix: string, cursor: string | undefined): Promise { const token = process.env.BLOB_READ_WRITE_TOKEN if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") - const params = new URLSearchParams({ prefix: "events/" }) + const params = new URLSearchParams({ prefix }) if (cursor) params.set("cursor", cursor) const res = await fetch(`https://vercel.com/api/blob/?${params}`, { headers: { @@ -89,36 +89,55 @@ async function listEventsPage(cursor: string | undefined): Promise return (await res.json()) as BlobListPage } +// Bound the listing to the trailing INGEST_WINDOW_DAYS UTC dates. Cron runs +// every 5 minutes, so a 2-day window leaves 24h+ of slack against any cron +// outage. Events older than the window won't be auto-ingested by the cron; +// the CLI script (scripts/tripwire/ingest-events.ts) still walks the full +// events/ prefix and can backfill manually if a longer outage happens. +const INGEST_WINDOW_DAYS = 2 + +export function recentDatePrefixes(now: Date): string[] { + const out: string[] = [] + for (let i = 0; i < INGEST_WINDOW_DAYS; i++) { + const d = new Date(now.getTime() - i * 24 * 60 * 60 * 1000) + out.push(`events/${d.toISOString().slice(0, 10)}/`) + } + return out +} + async function listAllBlobs( log: (e: IngestLogEvent) => void, ): Promise<{ refs: BlobRef[]; unrecognized: number }> { const refs: BlobRef[] = [] let unrecognized = 0 - let cursor: string | undefined - let page = 0 - do { - page++ - const t0 = Date.now() - ilog.debug({ step: "list.page_start", page, cursor: cursor ?? null }) - const result = await listEventsPage(cursor) - ilog.debug({ - step: "list.page_done", - page, - elapsed_ms: Date.now() - t0, - blobs: result.blobs.length, - has_cursor: Boolean(result.cursor), - }) - for (const blob of result.blobs) { - const id = idFromPathname(blob.pathname) - if (!id) { - unrecognized++ - log({ step: "list.unrecognized_blob", pathname: blob.pathname }) - continue + for (const prefix of recentDatePrefixes(new Date())) { + let cursor: string | undefined + let page = 0 + do { + page++ + const t0 = Date.now() + ilog.debug({ step: "list.page_start", prefix, page, cursor: cursor ?? null }) + const result = await listBlobsPage(prefix, cursor) + ilog.debug({ + step: "list.page_done", + prefix, + page, + elapsed_ms: Date.now() - t0, + blobs: result.blobs.length, + has_cursor: Boolean(result.cursor), + }) + for (const blob of result.blobs) { + const id = idFromPathname(blob.pathname) + if (!id) { + unrecognized++ + log({ step: "list.unrecognized_blob", pathname: blob.pathname }) + continue + } + refs.push({ pathname: blob.pathname, url: blob.url, id }) } - refs.push({ pathname: blob.pathname, url: blob.url, id }) - } - cursor = result.cursor - } while (cursor) + cursor = result.cursor + } while (cursor) + } return { refs, unrecognized } }