Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions backend/docs/staleness-detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Staleness Detection

## Overview

The staleness detector tracks whether core data sources and derived metrics are updating
within expected windows. It compares the most recent timestamp in each table against
its freshness thresholds and reports a status of `fresh`, `warning`, `stale`, or `missing`.

## Rules

| Key | Table | Time Column | Type | Expected | Warning | Critical |
| --- | --- | --- | --- | --- | --- | --- |
| `prices` | `prices` | `time` | source | 30s | 2m | 5m |
| `liquidity_snapshots` | `liquidity_snapshots` | `time` | source | 5m | 15m | 30m |
| `health_scores` | `health_scores` | `time` | derived | 5m | 15m | 30m |
| `verification_results` | `verification_results` | `verified_at` | source | 5m | 15m | 30m |
| `bridge_volume_stats` | `bridge_volume_stats` | `stat_date` | derived | 24h | 36h | 48h |
| `external_dependency_checks` | `external_dependency_checks` | `checked_at` | source | 2m | 5m | 10m |

## Scheduled Checks

The job queue runs `staleness-detection` every 5 minutes. Any warnings or stale states
are logged, and the API exposes the same alert payload.

## API Endpoints

- `GET /api/v1/freshness`
- Returns the current snapshot across all sources.
- Query params: `includeHistory` (boolean), `historyLimit` (2-50).
- `GET /api/v1/freshness/:source`
- Returns detail for a single source, including recent history.
- `GET /api/v1/freshness/:source/trend`
- Returns trend output and recent intervals for a single source.
- `GET /api/v1/freshness/alerts`
- Returns warning/stale/missing entries suitable for alerting.

## Updating Rules

Rules live in `backend/src/config/stalenessRules.ts`. Adjust thresholds there to match
new job schedules or ingestion intervals.
169 changes: 169 additions & 0 deletions backend/src/api/routes/freshness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
import { stalenessDetectionService } from "../../services/stalenessDetection.service.js";

interface SnapshotQuery {
includeHistory?: boolean;
historyLimit?: number;
}

interface SourceQuery {
historyLimit?: number;
}

function normalizeHistoryLimit(value: unknown, fallback: number): number {
const parsed = Number(value);
if (!Number.isFinite(parsed)) return fallback;
return Math.min(Math.max(Math.trunc(parsed), 2), 50);
}

export async function freshnessRoutes(server: FastifyInstance) {
server.get<{ Querystring: SnapshotQuery }>(
"/",
{
schema: {
tags: ["Freshness"],
summary: "Get freshness snapshot for monitored sources",
querystring: {
type: "object",
properties: {
includeHistory: { type: "boolean", default: false },
historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 },
},
},
response: {
200: { type: "object", additionalProperties: true },
},
},
},
async (request: FastifyRequest<{ Querystring: SnapshotQuery }>) => {
const includeHistory = String(request.query.includeHistory ?? "false") === "true";
const historyLimit = request.query.historyLimit
? normalizeHistoryLimit(request.query.historyLimit, 10)
: includeHistory
? 10
: 2;

return stalenessDetectionService.getSnapshot({
includeHistory,
historyLimit,
});
}
);

server.get<{ Params: { source: string }; Querystring: SourceQuery }>(
"/:source",
{
schema: {
tags: ["Freshness"],
summary: "Get freshness detail for a specific source",
params: {
type: "object",
required: ["source"],
properties: {
source: { type: "string" },
},
},
querystring: {
type: "object",
properties: {
historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 },
},
},
response: {
200: { type: "object", additionalProperties: true },
404: { $ref: "Error#" },
},
},
},
async (
request: FastifyRequest<{ Params: { source: string }; Querystring: SourceQuery }>,
reply: FastifyReply
) => {
const historyLimit = request.query.historyLimit
? normalizeHistoryLimit(request.query.historyLimit, 10)
: 10;
const detail = await stalenessDetectionService.getSourceDetail(request.params.source, {
includeHistory: true,
historyLimit,
});

if (!detail) {
return reply.status(404).send({ error: "Source not found" });
}

return detail;
}
);

server.get<{ Params: { source: string }; Querystring: SourceQuery }>(
"/:source/trend",
{
schema: {
tags: ["Freshness"],
summary: "Get freshness trend data for a specific source",
params: {
type: "object",
required: ["source"],
properties: {
source: { type: "string" },
},
},
querystring: {
type: "object",
properties: {
historyLimit: { type: "integer", minimum: 2, maximum: 50, default: 10 },
},
},
},
},
async (
request: FastifyRequest<{ Params: { source: string }; Querystring: SourceQuery }>,
reply: FastifyReply
) => {
const historyLimit = request.query.historyLimit
? normalizeHistoryLimit(request.query.historyLimit, 10)
: 10;
const detail = await stalenessDetectionService.getSourceDetail(request.params.source, {
includeHistory: true,
historyLimit,
});

if (!detail) {
return reply.status(404).send({ error: "Source not found" });
}

return {
key: detail.key,
label: detail.label,
trend: detail.trend,
expectedIntervalMs: detail.expectedIntervalMs,
lastUpdated: detail.lastUpdated,
recentIntervalsMs: detail.recentIntervalsMs ?? [],
history: detail.history ?? [],
};
}
);

server.get(
"/alerts",
{
schema: {
tags: ["Freshness"],
summary: "Get freshness alerts",
response: {
200: {
type: "object",
properties: {
alerts: { type: "array", items: { type: "object", additionalProperties: true } },
timestamp: { type: "string" },
},
},
},
},
},
async () => {
const alerts = await stalenessDetectionService.getAlerts();
return { alerts, timestamp: new Date().toISOString() };
}
);
}
2 changes: 2 additions & 0 deletions backend/src/api/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import { eventSubscriptionFilterRoutes } from "./eventSubscriptionFilter.routes.
import { maintenanceRoutes } from "./maintenance.js";
import { notificationTemplatesRoutes } from "./notificationTemplates.js";
import { archivedDataBrowserRoutes } from "./archivedDataBrowser.routes.js";
import { freshnessRoutes } from "./freshness.js";

export async function registerRoutes(server: FastifyInstance) {
server.register(assetsRoutes, { prefix: "/api/v1/assets" });
Expand Down Expand Up @@ -114,6 +115,7 @@ export async function registerRoutes(server: FastifyInstance) {
server.register(externalRateLimitMetricsRoutes, {
prefix: "/api/v1/metrics/external-rate-limits",
});
server.register(freshnessRoutes, { prefix: "/api/v1/freshness" });
server.register(eventSubscriptionFilterRoutes, {
prefix: "/api/v1/event-subscriptions",
});
Expand Down
85 changes: 85 additions & 0 deletions backend/src/config/stalenessRules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
export type StalenessSourceType = "source" | "derived";

export interface StalenessRule {
key: string;
label: string;
description: string;
table: string;
timeColumn: string;
sourceType: StalenessSourceType;
expectedIntervalMs: number;
warnAfterMs: number;
criticalAfterMs: number;
}

const minutes = (value: number) => value * 60 * 1000;
const hours = (value: number) => minutes(60) * value;

export const STALENESS_RULES: StalenessRule[] = [
{
key: "prices",
label: "Price observations",
description: "Aggregated price samples stored in the prices hypertable.",
table: "prices",
timeColumn: "time",
sourceType: "source",
expectedIntervalMs: 30_000,
warnAfterMs: minutes(2),
criticalAfterMs: minutes(5),
},
{
key: "liquidity_snapshots",
label: "Liquidity snapshots",
description: "Per-DEX liquidity snapshots used for analytics rollups.",
table: "liquidity_snapshots",
timeColumn: "time",
sourceType: "source",
expectedIntervalMs: minutes(5),
warnAfterMs: minutes(15),
criticalAfterMs: minutes(30),
},
{
key: "health_scores",
label: "Health scores",
description: "Composite asset health scores produced by scheduled jobs.",
table: "health_scores",
timeColumn: "time",
sourceType: "derived",
expectedIntervalMs: minutes(5),
warnAfterMs: minutes(15),
criticalAfterMs: minutes(30),
},
{
key: "verification_results",
label: "Bridge verification results",
description: "Supply verification results captured for bridged assets.",
table: "verification_results",
timeColumn: "verified_at",
sourceType: "source",
expectedIntervalMs: minutes(5),
warnAfterMs: minutes(15),
criticalAfterMs: minutes(30),
},
{
key: "bridge_volume_stats",
label: "Bridge volume rollups",
description: "Daily rollups that feed protocol and asset analytics.",
table: "bridge_volume_stats",
timeColumn: "stat_date",
sourceType: "derived",
expectedIntervalMs: hours(24),
warnAfterMs: hours(36),
criticalAfterMs: hours(48),
},
{
key: "external_dependency_checks",
label: "External dependency checks",
description: "Health checks for upstream providers (RPCs, APIs).",
table: "external_dependency_checks",
timeColumn: "checked_at",
sourceType: "source",
expectedIntervalMs: minutes(2),
warnAfterMs: minutes(5),
criticalAfterMs: minutes(10),
},
];
Loading
Loading