Skip to content

Commit 7245038

Browse files
committed
feat(supervisor): add RedisBackpressureSignalSource
Reads the backpressure verdict from a Redis key (written by the cluster-side aggregator). Malformed or wrong-shaped values are treated as unknown so the monitor fails open. Adds @internal/redis + @internal/testcontainers deps.
1 parent 331bf2e commit 7245038

4 files changed

Lines changed: 105 additions & 0 deletions

File tree

apps/supervisor/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"dependencies": {
1616
"@aws-sdk/client-ecr": "^3.839.0",
1717
"@internal/compute": "workspace:*",
18+
"@internal/redis": "workspace:*",
1819
"@kubernetes/client-node": "^1.0.0",
1920
"@trigger.dev/core": "workspace:*",
2021
"dockerode": "^4.0.6",
@@ -25,6 +26,7 @@
2526
"zod": "3.25.76"
2627
},
2728
"devDependencies": {
29+
"@internal/testcontainers": "workspace:*",
2830
"@types/dockerode": "^3.3.33"
2931
}
3032
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { redisTest } from "@internal/testcontainers";
2+
import { Redis } from "@internal/redis";
3+
import { describe, expect } from "vitest";
4+
import { RedisBackpressureSignalSource } from "./redisBackpressureSignalSource.js";
5+
6+
const KEY = "backpressure:test";
7+
8+
describe("RedisBackpressureSignalSource", () => {
9+
redisTest("returns null when the key is absent", async ({ redisOptions }) => {
10+
const redis = new Redis(redisOptions);
11+
try {
12+
const source = new RedisBackpressureSignalSource(redis, KEY);
13+
expect(await source.read()).toBeNull();
14+
} finally {
15+
await redis.quit();
16+
}
17+
});
18+
19+
redisTest("parses a valid engaged verdict", async ({ redisOptions }) => {
20+
const redis = new Redis(redisOptions);
21+
try {
22+
await redis.set(KEY, JSON.stringify({ engaged: true, ts: 1_700_000_000_000 }));
23+
const source = new RedisBackpressureSignalSource(redis, KEY);
24+
expect(await source.read()).toEqual({ engaged: true, ts: 1_700_000_000_000 });
25+
} finally {
26+
await redis.quit();
27+
}
28+
});
29+
30+
redisTest("parses a clear verdict", async ({ redisOptions }) => {
31+
const redis = new Redis(redisOptions);
32+
try {
33+
await redis.set(KEY, JSON.stringify({ engaged: false }));
34+
const source = new RedisBackpressureSignalSource(redis, KEY);
35+
expect(await source.read()).toEqual({ engaged: false });
36+
} finally {
37+
await redis.quit();
38+
}
39+
});
40+
41+
redisTest("returns null for malformed JSON (fail-open)", async ({ redisOptions }) => {
42+
const redis = new Redis(redisOptions);
43+
try {
44+
await redis.set(KEY, "not json {");
45+
const source = new RedisBackpressureSignalSource(redis, KEY);
46+
expect(await source.read()).toBeNull();
47+
} finally {
48+
await redis.quit();
49+
}
50+
});
51+
52+
redisTest("returns null for valid JSON of the wrong shape (fail-open)", async ({ redisOptions }) => {
53+
const redis = new Redis(redisOptions);
54+
try {
55+
await redis.set(KEY, JSON.stringify({ foo: "bar" }));
56+
const source = new RedisBackpressureSignalSource(redis, KEY);
57+
expect(await source.read()).toBeNull();
58+
} finally {
59+
await redis.quit();
60+
}
61+
});
62+
});
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import type { Redis } from "@internal/redis";
2+
import { z } from "zod";
3+
import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js";
4+
5+
const VerdictSchema = z.object({
6+
engaged: z.boolean(),
7+
ts: z.number().optional(),
8+
});
9+
10+
/** Reads the backpressure verdict from a Redis key written by the cluster-side aggregator. */
11+
export class RedisBackpressureSignalSource implements BackpressureSignalSource {
12+
constructor(
13+
private readonly redis: Redis,
14+
private readonly key: string
15+
) {}
16+
17+
async read(): Promise<BackpressureVerdict | null> {
18+
const raw = await this.redis.get(this.key);
19+
if (raw === null) {
20+
return null;
21+
}
22+
23+
// A malformed or wrong-shaped value is treated as unknown (null) so the
24+
// monitor fails open rather than acting on garbage.
25+
let json: unknown;
26+
try {
27+
json = JSON.parse(raw);
28+
} catch {
29+
return null;
30+
}
31+
32+
const parsed = VerdictSchema.safeParse(json);
33+
return parsed.success ? parsed.data : null;
34+
}
35+
}

pnpm-lock.yaml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)