Skip to content

Commit 9a3cf7b

Browse files
committed
feat(supervisor): wire dequeue backpressure into preDequeue
Gate dequeues on the backpressure verdict via the existing preDequeue hook, on all paths including k8s (where the resource monitor is a no-op). Construct the Redis-backed monitor only when TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED is set; require a Redis host when enabled. Off by default - no Redis client, no effect.
1 parent 7245038 commit 9a3cf7b

3 files changed

Lines changed: 79 additions & 7 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: supervisor
3+
type: feature
4+
---
5+
6+
Add opt-in dequeue backpressure to the supervisor. When enabled, the supervisor reads a verdict from Redis and pauses dequeuing while the worker cluster is saturated, then resumes once capacity is available. Disabled by default - no behavior change for existing deployments.

apps/supervisor/src/env.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,24 @@ const Env = z
4747
TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms)
4848
TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate)
4949

50+
// Dequeue backpressure - off by default. When enabled, the supervisor reads a
51+
// verdict from Redis (written by the cluster-side aggregator) and pauses dequeues
52+
// while the worker cluster can't schedule pods. Disabled = total no-op: no Redis
53+
// client is created, no reads happen, and the dequeue loop is unaffected.
54+
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: BoolEnv.default(false),
55+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY: z.string().default("engine:dequeue:backpressure"),
56+
TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS: z.coerce.number().int().positive().default(1000),
57+
TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS: z.coerce
58+
.number()
59+
.int()
60+
.positive()
61+
.default(15_000), // Stale verdict → fail-open (treat as not engaged)
62+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: z.string().optional(),
63+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT: z.coerce.number().int().optional(),
64+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(),
65+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(),
66+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false),
67+
5068
// Optional services
5169
TRIGGER_WARM_START_URL: z.string().optional(),
5270
TRIGGER_CHECKPOINT_URL: z.string().optional(),
@@ -281,6 +299,14 @@ const Env = z
281299
path: ["TRIGGER_WORKLOAD_API_DOMAIN"],
282300
});
283301
}
302+
if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) {
303+
ctx.addIssue({
304+
code: z.ZodIssueCode.custom,
305+
message:
306+
"TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST is required when TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED is true",
307+
path: ["TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST"],
308+
});
309+
}
284310
})
285311
.transform((data) => ({
286312
...data,

apps/supervisor/src/index.ts

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
2929
import { OtlpTraceService } from "./services/otlpTraceService.js";
3030
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
31+
import { createRedisClient } from "@internal/redis";
32+
import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js";
33+
import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js";
3134
import {
3235
fromContext,
3336
recordPhaseSince,
@@ -54,6 +57,7 @@ class ManagedSupervisor {
5457
private readonly podCleaner?: PodCleaner;
5558
private readonly failedPodHandler?: FailedPodHandler;
5659
private readonly tracing?: OtlpTraceService;
60+
private readonly backpressureMonitor?: BackpressureMonitor;
5761

5862
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
5963
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;
@@ -181,6 +185,38 @@ class ManagedSupervisor {
181185
);
182186
}
183187

188+
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) {
189+
const backpressureRedis = createRedisClient(
190+
{
191+
host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST,
192+
port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT,
193+
username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME,
194+
password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD,
195+
...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }),
196+
},
197+
{
198+
onError: (error) =>
199+
this.logger.error("Backpressure redis error", { error: error.message }),
200+
}
201+
);
202+
203+
this.backpressureMonitor = new BackpressureMonitor({
204+
enabled: true,
205+
source: new RedisBackpressureSignalSource(
206+
backpressureRedis,
207+
env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY
208+
),
209+
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
210+
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
211+
});
212+
213+
this.logger.log("🛑 Dequeue backpressure enabled", {
214+
key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY,
215+
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
216+
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
217+
});
218+
}
219+
184220
this.workerSession = new SupervisorSession({
185221
workerToken: getWorkerToken(),
186222
apiUrl: env.TRIGGER_API_URL,
@@ -206,13 +242,12 @@ class ManagedSupervisor {
206242
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
207243
sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS,
208244
preDequeue: async () => {
209-
if (!env.RESOURCE_MONITOR_ENABLED) {
210-
return {};
211-
}
245+
// Synchronous, hot-path-safe cached read; undefined when backpressure is disabled.
246+
const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false;
212247

213-
if (this.isKubernetes) {
214-
// Not used in k8s for now
215-
return {};
248+
if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) {
249+
// Resource monitor is not used in k8s; backpressure is the only gate there.
250+
return { skipDequeue: skipForBackpressure };
216251
}
217252

218253
const resources = await this.resourceMonitor.getNodeResources();
@@ -222,7 +257,10 @@ class ManagedSupervisor {
222257
cpu: resources.cpuAvailable,
223258
memory: resources.memoryAvailable,
224259
},
225-
skipDequeue: resources.cpuAvailable < 0.25 || resources.memoryAvailable < 0.25,
260+
skipDequeue:
261+
skipForBackpressure ||
262+
resources.cpuAvailable < 0.25 ||
263+
resources.memoryAvailable < 0.25,
226264
};
227265
},
228266
preSkip: async () => {
@@ -552,6 +590,7 @@ class ManagedSupervisor {
552590
this.logger.log("Starting up");
553591

554592
// Optional services
593+
this.backpressureMonitor?.start();
555594
await this.podCleaner?.start();
556595
await this.failedPodHandler?.start();
557596
await this.metricsServer?.start();
@@ -576,6 +615,7 @@ class ManagedSupervisor {
576615
await this.workerSession.stop();
577616

578617
// Optional services
618+
this.backpressureMonitor?.stop();
579619
await this.podCleaner?.stop();
580620
await this.failedPodHandler?.stop();
581621
await this.metricsServer?.stop();

0 commit comments

Comments
 (0)