From 41623f8dd243fff38126803625c999ae2181a6d7 Mon Sep 17 00:00:00 2001 From: Yusufolosun Date: Mon, 1 Jun 2026 21:19:49 +0100 Subject: [PATCH] fix: add staleness detection service --- backend/docs/staleness-detection.md | 40 +++ backend/src/api/routes/freshness.ts | 169 ++++++++++++ backend/src/api/routes/index.ts | 2 + backend/src/config/stalenessRules.ts | 85 ++++++ .../services/stalenessDetection.service.ts | 247 ++++++++++++++++++ backend/src/workers/index.ts | 6 + backend/src/workers/stalenessDetection.job.ts | 18 ++ .../stalenessDetection.service.test.ts | 87 ++++++ 8 files changed, 654 insertions(+) create mode 100644 backend/docs/staleness-detection.md create mode 100644 backend/src/api/routes/freshness.ts create mode 100644 backend/src/config/stalenessRules.ts create mode 100644 backend/src/services/stalenessDetection.service.ts create mode 100644 backend/src/workers/stalenessDetection.job.ts create mode 100644 backend/tests/services/stalenessDetection.service.test.ts diff --git a/backend/docs/staleness-detection.md b/backend/docs/staleness-detection.md new file mode 100644 index 00000000..77a90626 --- /dev/null +++ b/backend/docs/staleness-detection.md @@ -0,0 +1,40 @@ +# Staleness Detection + +## Overview + +The staleness detector tracks whether core data sources and derived metrics are updating +within expected windows. It compares the most recent timestamp in each table against +its freshness thresholds and reports a status of `fresh`, `warning`, `stale`, or `missing`. + +## Rules + +| Key | Table | Time Column | Type | Expected | Warning | Critical | +| --- | --- | --- | --- | --- | --- | --- | +| `prices` | `prices` | `time` | source | 30s | 2m | 5m | +| `liquidity_snapshots` | `liquidity_snapshots` | `time` | source | 5m | 15m | 30m | +| `health_scores` | `health_scores` | `time` | derived | 5m | 15m | 30m | +| `verification_results` | `verification_results` | `verified_at` | source | 5m | 15m | 30m | +| `bridge_volume_stats` | `bridge_volume_stats` | `stat_date` | derived | 24h | 36h | 48h | +| `external_dependency_checks` | `external_dependency_checks` | `checked_at` | source | 2m | 5m | 10m | + +## Scheduled Checks + +The job queue runs `staleness-detection` every 5 minutes. Any warnings or stale states +are logged, and the API exposes the same alert payload. + +## API Endpoints + +- `GET /api/v1/freshness` + - Returns the current snapshot across all sources. + - Query params: `includeHistory` (boolean), `historyLimit` (2-50). +- `GET /api/v1/freshness/:source` + - Returns detail for a single source, including recent history. +- `GET /api/v1/freshness/:source/trend` + - Returns trend output and recent intervals for a single source. +- `GET /api/v1/freshness/alerts` + - Returns warning/stale/missing entries suitable for alerting. + +## Updating Rules + +Rules live in `backend/src/config/stalenessRules.ts`. Adjust thresholds there to match +new job schedules or ingestion intervals. diff --git a/backend/src/api/routes/freshness.ts b/backend/src/api/routes/freshness.ts new file mode 100644 index 00000000..16962851 --- /dev/null +++ b/backend/src/api/routes/freshness.ts @@ -0,0 +1,169 @@ +import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify"; +import { stalenessDetectionService } from "../../services/stalenessDetection.service.js"; + +interface SnapshotQuery { + includeHistory?: boolean; + historyLimit?: number; +} + +interface SourceQuery { + historyLimit?: number; +} + +function normalizeHistoryLimit(value: unknown, fallback: number): number { + const parsed = Number(value); + if (!Number.isFinite(parsed)) return fallback; + return Math.min(Math.max(Math.trunc(parsed), 2), 50); +} + +export async function freshnessRoutes(server: FastifyInstance) { + server.get<{ Querystring: SnapshotQuery }>( + "/", + { + schema: { + tags: ["Freshness"], + summary: "Get freshness snapshot for monitored sources", + querystring: { + type: "object", + properties: { + includeHistory: { type: "boolean", default: false }, + historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 }, + }, + }, + response: { + 200: { type: "object", additionalProperties: true }, + }, + }, + }, + async (request: FastifyRequest<{ Querystring: SnapshotQuery }>) => { + const includeHistory = String(request.query.includeHistory ?? "false") === "true"; + const historyLimit = request.query.historyLimit + ? normalizeHistoryLimit(request.query.historyLimit, 10) + : includeHistory + ? 10 + : 2; + + return stalenessDetectionService.getSnapshot({ + includeHistory, + historyLimit, + }); + } + ); + + server.get<{ Params: { source: string }; Querystring: SourceQuery }>( + "/:source", + { + schema: { + tags: ["Freshness"], + summary: "Get freshness detail for a specific source", + params: { + type: "object", + required: ["source"], + properties: { + source: { type: "string" }, + }, + }, + querystring: { + type: "object", + properties: { + historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 }, + }, + }, + response: { + 200: { type: "object", additionalProperties: true }, + 404: { $ref: "Error#" }, + }, + }, + }, + async ( + request: FastifyRequest<{ Params: { source: string }; Querystring: SourceQuery }>, + reply: FastifyReply + ) => { + const historyLimit = request.query.historyLimit + ? normalizeHistoryLimit(request.query.historyLimit, 10) + : 10; + const detail = await stalenessDetectionService.getSourceDetail(request.params.source, { + includeHistory: true, + historyLimit, + }); + + if (!detail) { + return reply.status(404).send({ error: "Source not found" }); + } + + return detail; + } + ); + + server.get<{ Params: { source: string }; Querystring: SourceQuery }>( + "/:source/trend", + { + schema: { + tags: ["Freshness"], + summary: "Get freshness trend data for a specific source", + params: { + type: "object", + required: ["source"], + properties: { + source: { type: "string" }, + }, + }, + querystring: { + type: "object", + properties: { + historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 }, + }, + }, + }, + }, + async ( + request: FastifyRequest<{ Params: { source: string }; Querystring: SourceQuery }>, + reply: FastifyReply + ) => { + const historyLimit = request.query.historyLimit + ? normalizeHistoryLimit(request.query.historyLimit, 10) + : 10; + const detail = await stalenessDetectionService.getSourceDetail(request.params.source, { + includeHistory: true, + historyLimit, + }); + + if (!detail) { + return reply.status(404).send({ error: "Source not found" }); + } + + return { + key: detail.key, + label: detail.label, + trend: detail.trend, + expectedIntervalMs: detail.expectedIntervalMs, + lastUpdated: detail.lastUpdated, + recentIntervalsMs: detail.recentIntervalsMs ?? [], + history: detail.history ?? [], + }; + } + ); + + server.get( + "/alerts", + { + schema: { + tags: ["Freshness"], + summary: "Get freshness alerts", + response: { + 200: { + type: "object", + properties: { + alerts: { type: "array", items: { type: "object", additionalProperties: true } }, + timestamp: { type: "string" }, + }, + }, + }, + }, + }, + async () => { + const alerts = await stalenessDetectionService.getAlerts(); + return { alerts, timestamp: new Date().toISOString() }; + } + ); +} diff --git a/backend/src/api/routes/index.ts b/backend/src/api/routes/index.ts index 65f6991a..2d909129 100644 --- a/backend/src/api/routes/index.ts +++ b/backend/src/api/routes/index.ts @@ -49,6 +49,7 @@ import { eventSubscriptionFilterRoutes } from "./eventSubscriptionFilter.routes. import { maintenanceRoutes } from "./maintenance.js"; import { notificationTemplatesRoutes } from "./notificationTemplates.js"; import { archivedDataBrowserRoutes } from "./archivedDataBrowser.routes.js"; +import { freshnessRoutes } from "./freshness.js"; export async function registerRoutes(server: FastifyInstance) { server.register(assetsRoutes, { prefix: "/api/v1/assets" }); @@ -114,6 +115,7 @@ export async function registerRoutes(server: FastifyInstance) { server.register(externalRateLimitMetricsRoutes, { prefix: "/api/v1/metrics/external-rate-limits", }); + server.register(freshnessRoutes, { prefix: "/api/v1/freshness" }); server.register(eventSubscriptionFilterRoutes, { prefix: "/api/v1/event-subscriptions", }); diff --git a/backend/src/config/stalenessRules.ts b/backend/src/config/stalenessRules.ts new file mode 100644 index 00000000..99515127 --- /dev/null +++ b/backend/src/config/stalenessRules.ts @@ -0,0 +1,85 @@ +export type StalenessSourceType = "source" | "derived"; + +export interface StalenessRule { + key: string; + label: string; + description: string; + table: string; + timeColumn: string; + sourceType: StalenessSourceType; + expectedIntervalMs: number; + warnAfterMs: number; + criticalAfterMs: number; +} + +const minutes = (value: number) => value * 60 * 1000; +const hours = (value: number) => minutes(60) * value; + +export const STALENESS_RULES: StalenessRule[] = [ + { + key: "prices", + label: "Price observations", + description: "Aggregated price samples stored in the prices hypertable.", + table: "prices", + timeColumn: "time", + sourceType: "source", + expectedIntervalMs: 30_000, + warnAfterMs: minutes(2), + criticalAfterMs: minutes(5), + }, + { + key: "liquidity_snapshots", + label: "Liquidity snapshots", + description: "Per-DEX liquidity snapshots used for analytics rollups.", + table: "liquidity_snapshots", + timeColumn: "time", + sourceType: "source", + expectedIntervalMs: minutes(5), + warnAfterMs: minutes(15), + criticalAfterMs: minutes(30), + }, + { + key: "health_scores", + label: "Health scores", + description: "Composite asset health scores produced by scheduled jobs.", + table: "health_scores", + timeColumn: "time", + sourceType: "derived", + expectedIntervalMs: minutes(5), + warnAfterMs: minutes(15), + criticalAfterMs: minutes(30), + }, + { + key: "verification_results", + label: "Bridge verification results", + description: "Supply verification results captured for bridged assets.", + table: "verification_results", + timeColumn: "verified_at", + sourceType: "source", + expectedIntervalMs: minutes(5), + warnAfterMs: minutes(15), + criticalAfterMs: minutes(30), + }, + { + key: "bridge_volume_stats", + label: "Bridge volume rollups", + description: "Daily rollups that feed protocol and asset analytics.", + table: "bridge_volume_stats", + timeColumn: "stat_date", + sourceType: "derived", + expectedIntervalMs: hours(24), + warnAfterMs: hours(36), + criticalAfterMs: hours(48), + }, + { + key: "external_dependency_checks", + label: "External dependency checks", + description: "Health checks for upstream providers (RPCs, APIs).", + table: "external_dependency_checks", + timeColumn: "checked_at", + sourceType: "source", + expectedIntervalMs: minutes(2), + warnAfterMs: minutes(5), + criticalAfterMs: minutes(10), + }, +]; diff --git a/backend/src/services/stalenessDetection.service.ts b/backend/src/services/stalenessDetection.service.ts new file mode 100644 index 00000000..d7eec46c --- /dev/null +++ b/backend/src/services/stalenessDetection.service.ts @@ -0,0 +1,247 @@ +import { getDatabase } from "../database/connection.js"; +import { logger } from "../utils/logger.js"; +import { STALENESS_RULES, type StalenessRule } from "../config/stalenessRules.js"; + +export type FreshnessStatus = "fresh" | "warning" | "stale" | "missing"; +export type TrendDirection = "improving" | "stable" | "deteriorating" | "unknown"; + +export interface FreshnessSourceSnapshot { + key: string; + label: string; + description: string; + sourceType: "source" | "derived"; + status: FreshnessStatus; + lastUpdated: string | null; + ageMs: number | null; + expectedIntervalMs: number; + warnAfterMs: number; + criticalAfterMs: number; + trend: TrendDirection; + recentIntervalsMs?: number[]; + history?: string[]; + error?: string; +} + +export interface FreshnessSnapshot { + status: FreshnessStatus; + timestamp: string; + sources: FreshnessSourceSnapshot[]; +} + +export interface FreshnessAlert { + key: string; + label: string; + severity: "warning" | "critical"; + status: FreshnessStatus; + message: string; + lastUpdated: string | null; + ageMs: number | null; + thresholdMs: number; + timestamp: string; +} + +export class StalenessDetectionService { + private readonly db = getDatabase(); + private readonly rules = STALENESS_RULES; + + async getSnapshot(options?: { + includeHistory?: boolean; + historyLimit?: number; + }): Promise { + const includeHistory = options?.includeHistory ?? false; + const historyLimit = Math.max(2, options?.historyLimit ?? 10); + + const sources = await Promise.all( + this.rules.map((rule) => + this.buildSourceSnapshot(rule, { includeHistory, historyLimit }) + ) + ); + + const statuses = sources.map((source) => source.status); + const status = this.rollupStatus(statuses); + + return { + status, + timestamp: new Date().toISOString(), + sources, + }; + } + + async getSourceDetail( + key: string, + options?: { includeHistory?: boolean; historyLimit?: number } + ): Promise { + const rule = this.rules.find((r) => r.key === key); + if (!rule) return null; + + const includeHistory = options?.includeHistory ?? true; + const historyLimit = Math.max(2, options?.historyLimit ?? 10); + return this.buildSourceSnapshot(rule, { includeHistory, historyLimit }); + } + + async getAlerts(): Promise { + const snapshot = await this.getSnapshot({ includeHistory: false }); + return this.buildAlerts(snapshot); + } + + async runScheduledCheck(): Promise<{ snapshot: FreshnessSnapshot; alerts: FreshnessAlert[] }> { + const snapshot = await this.getSnapshot({ includeHistory: false }); + const alerts = this.buildAlerts(snapshot); + + if (alerts.length > 0) { + logger.warn({ alertCount: alerts.length, alerts }, "Staleness checks detected issues"); + } else { + logger.info("Staleness checks are healthy"); + } + + return { snapshot, alerts }; + } + + private async buildSourceSnapshot( + rule: StalenessRule, + options: { includeHistory: boolean; historyLimit: number } + ): Promise { + try { + const timestamps = await this.getRecentTimestamps(rule, options.historyLimit); + const latest = timestamps[0] ?? null; + const ageMs = latest ? Date.now() - latest.getTime() : null; + const status = this.evaluateStatus(rule, ageMs); + const trend = this.calculateTrend(rule, timestamps); + const intervals = this.calculateIntervals(timestamps); + + return { + key: rule.key, + label: rule.label, + description: rule.description, + sourceType: rule.sourceType, + status, + lastUpdated: latest ? latest.toISOString() : null, + ageMs, + expectedIntervalMs: rule.expectedIntervalMs, + warnAfterMs: rule.warnAfterMs, + criticalAfterMs: rule.criticalAfterMs, + trend, + recentIntervalsMs: options.includeHistory ? intervals : undefined, + history: options.includeHistory ? timestamps.map((t) => t.toISOString()) : undefined, + }; + } catch (error) { + logger.error({ error, rule: rule.key }, "Failed to evaluate freshness rule"); + return { + key: rule.key, + label: rule.label, + description: rule.description, + sourceType: rule.sourceType, + status: "missing", + lastUpdated: null, + ageMs: null, + expectedIntervalMs: rule.expectedIntervalMs, + warnAfterMs: rule.warnAfterMs, + criticalAfterMs: rule.criticalAfterMs, + trend: "unknown", + error: error instanceof Error ? error.message : "Unknown error", + }; + } + } + + private async getRecentTimestamps(rule: StalenessRule, limit: number): Promise { + const rows = await this.db(rule.table) + .select(rule.timeColumn) + .orderBy(rule.timeColumn, "desc") + .limit(limit); + + return rows + .map((row: Record) => this.normalizeTimestamp(row[rule.timeColumn])) + .filter((value): value is Date => value !== null); + } + + private normalizeTimestamp(value: unknown): Date | null { + if (!value) return null; + if (value instanceof Date) return value; + + const parsed = new Date(String(value)); + return Number.isNaN(parsed.getTime()) ? null : parsed; + } + + private evaluateStatus(rule: StalenessRule, ageMs: number | null): FreshnessStatus { + if (ageMs === null) return "missing"; + if (ageMs <= rule.warnAfterMs) return "fresh"; + if (ageMs <= rule.criticalAfterMs) return "warning"; + return "stale"; + } + + private calculateTrend(rule: StalenessRule, timestamps: Date[]): TrendDirection { + if (timestamps.length < 2) return "unknown"; + + const intervals = this.calculateIntervals(timestamps); + if (intervals.length === 0) return "unknown"; + + const latestInterval = intervals[0]; + const baseline = intervals.length > 1 ? intervals[1] : rule.expectedIntervalMs; + + if (latestInterval >= baseline * 1.2) return "deteriorating"; + if (latestInterval <= baseline * 0.8) return "improving"; + return "stable"; + } + + private calculateIntervals(timestamps: Date[]): number[] { + const intervals: number[] = []; + + for (let i = 0; i < timestamps.length - 1; i += 1) { + const delta = timestamps[i].getTime() - timestamps[i + 1].getTime(); + if (delta > 0) { + intervals.push(delta); + } + } + + return intervals; + } + + private rollupStatus(statuses: FreshnessStatus[]): FreshnessStatus { + if (statuses.length === 0) return "missing"; + if (statuses.every((status) => status === "missing")) return "missing"; + if (statuses.includes("stale")) return "stale"; + if (statuses.includes("warning")) return "warning"; + return "fresh"; + } + + private buildAlerts(snapshot: FreshnessSnapshot): FreshnessAlert[] { + const now = new Date().toISOString(); + + return snapshot.sources + .filter((source) => source.status === "warning" || source.status === "stale" || source.status === "missing") + .map((source) => { + const severity = source.status === "warning" ? "warning" : "critical"; + const thresholdMs = source.status === "warning" ? source.warnAfterMs : source.criticalAfterMs; + const message = this.formatAlertMessage(source, thresholdMs); + + return { + key: source.key, + label: source.label, + severity, + status: source.status, + message, + lastUpdated: source.lastUpdated, + ageMs: source.ageMs, + thresholdMs, + timestamp: now, + }; + }); + } + + private formatAlertMessage(source: FreshnessSourceSnapshot, thresholdMs: number): string { + if (!source.lastUpdated) { + return `No recent updates detected for ${source.label}`; + } + + const age = source.ageMs ?? 0; + return `${source.label} last updated ${this.formatDuration(age)} ago (threshold ${this.formatDuration(thresholdMs)})`; + } + + private formatDuration(ms: number): string { + if (ms < 60_000) return `${Math.max(1, Math.round(ms / 1000))}s`; + if (ms < 60 * 60_000) return `${Math.round(ms / 60_000)}m`; + return `${Math.round(ms / (60 * 60_000))}h`; + } +} + +export const stalenessDetectionService = new StalenessDetectionService(); diff --git a/backend/src/workers/index.ts b/backend/src/workers/index.ts index dfd8dd97..0a926bcd 100644 --- a/backend/src/workers/index.ts +++ b/backend/src/workers/index.ts @@ -9,6 +9,7 @@ import { processDigestScheduler } from "./digestScheduler.worker.js"; import { processMetadataSync } from "./metadataSync.job.js"; import { processExternalDependencyMonitor } from "./externalDependencyMonitor.job.js"; import { processReconciliation } from "./reconciliation.job.js"; +import { processStalenessDetection } from "./stalenessDetection.job.js"; import { logger } from "../utils/logger.js"; import { initSupplyVerificationJob } from "../jobs/supplyVerification.job.js"; import { runAuditRetentionJob } from "../jobs/auditRetention.job.js"; @@ -59,6 +60,9 @@ export async function initJobSystem() { case "external-dependency-monitor": await processExternalDependencyMonitor(job); break; + case "staleness-detection": + await processStalenessDetection(job); + break; case "reconciliation": await processReconciliation(job as any); break; @@ -130,6 +134,8 @@ export async function initJobSystem() { // External dependency checks: every 2 minutes await jobQueue.addRepeatableJob("external-dependency-monitor", {}, "*/2 * * * *"); + // Staleness detection: every 5 minutes + await jobQueue.addRepeatableJob("staleness-detection", {}, "*/5 * * * *"); // reconciliation: per-asset, every hour (top of hour) // Note: This uses the queue helper for retry/backoff defaults. for (const assetCode of ["USDC", "EURC"]) { diff --git a/backend/src/workers/stalenessDetection.job.ts b/backend/src/workers/stalenessDetection.job.ts new file mode 100644 index 00000000..519374c4 --- /dev/null +++ b/backend/src/workers/stalenessDetection.job.ts @@ -0,0 +1,18 @@ +import { Job } from "bullmq"; +import { stalenessDetectionService } from "../services/stalenessDetection.service.js"; +import { logger } from "../utils/logger.js"; + +export async function processStalenessDetection(job: Job): Promise { + logger.info({ jobId: job.id }, "Starting staleness detection job"); + + try { + const result = await stalenessDetectionService.runScheduledCheck(); + logger.info( + { jobId: job.id, status: result.snapshot.status, alerts: result.alerts.length }, + "Staleness detection job completed" + ); + } catch (error) { + logger.error({ jobId: job.id, error }, "Staleness detection job failed"); + throw error; + } +} diff --git a/backend/tests/services/stalenessDetection.service.test.ts b/backend/tests/services/stalenessDetection.service.test.ts new file mode 100644 index 00000000..efbbf696 --- /dev/null +++ b/backend/tests/services/stalenessDetection.service.test.ts @@ -0,0 +1,87 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { StalenessDetectionService } from "../../src/services/stalenessDetection.service.js"; + +const NOW = new Date("2026-06-01T00:00:00Z"); +const tableData = new Map>>(); + +const createBuilder = (rows: Array>) => ({ + select: vi.fn().mockReturnThis(), + orderBy: vi.fn().mockReturnThis(), + limit: vi.fn().mockImplementation((limit: number) => rows.slice(0, limit)), +}); + +const mockKnex = (table: string) => createBuilder(tableData.get(table) ?? []); + +vi.mock("../../src/database/connection.js", () => ({ + getDatabase: () => mockKnex, +})); + +vi.mock("../../src/utils/logger.js", () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +vi.mock("../../src/config/stalenessRules.js", () => ({ + STALENESS_RULES: [ + { + key: "prices", + label: "Prices", + description: "Price data", + table: "prices", + timeColumn: "time", + sourceType: "source", + expectedIntervalMs: 30_000, + warnAfterMs: 120_000, + criticalAfterMs: 300_000, + }, + ], +})); + +describe("StalenessDetectionService", () => { + let service: StalenessDetectionService; + + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(NOW); + tableData.set("prices", []); + service = new StalenessDetectionService(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("marks data as stale when beyond the critical threshold", async () => { + tableData.set("prices", [{ time: new Date(NOW.getTime() - 600_000) }]); + + const detail = await service.getSourceDetail("prices", { includeHistory: true }); + + expect(detail?.status).toBe("stale"); + expect(detail?.ageMs).toBe(600_000); + }); + + it("returns a critical alert when data is missing", async () => { + const alerts = await service.getAlerts(); + + expect(alerts).toHaveLength(1); + expect(alerts[0].severity).toBe("critical"); + expect(alerts[0].key).toBe("prices"); + }); + + it("reports stable trend when update intervals are consistent", async () => { + tableData.set("prices", [ + { time: new Date(NOW.getTime()) }, + { time: new Date(NOW.getTime() - 30_000) }, + { time: new Date(NOW.getTime() - 60_000) }, + ]); + + const detail = await service.getSourceDetail("prices", { includeHistory: true }); + + expect(detail?.trend).toBe("stable"); + }); +});