Protects downstream services from cascading failures. When a service starts failing, the circuit opens and jobs are requeued or dead-lettered instead of hammering the failing service.
npm install @psyqueue/plugin-circuit-breaker
import { circuitBreaker } from '@psyqueue/plugin-circuit-breaker'
q.use(circuitBreaker({
breakers: {
'payment-api': {
failureThreshold: 5,
failureWindow: 30_000,
resetTimeout: 60_000,
halfOpenRequests: 3,
onOpen: 'requeue',
},
'email-service': {
failureThreshold: 10,
failureWindow: 60_000,
resetTimeout: 120_000,
halfOpenRequests: 2,
onOpen: 'fail',
},
},
}))
| Breaker Option |
Type |
Default |
Description |
failureThreshold |
number |
required |
Number of failures within the window to trip the breaker |
failureWindow |
number |
required |
Time window in ms for counting failures |
resetTimeout |
number |
required |
Time in ms to wait before transitioning OPEN to HALF_OPEN |
halfOpenRequests |
number |
required |
Successful requests in HALF_OPEN needed to close the breaker |
onOpen |
'requeue' | 'fail' |
'requeue' |
Action when circuit is open |
Wrap calls to external services with ctx.breaker():
q.handle('payment.charge', async (ctx) => {
const { customerId, amount } = ctx.job.payload as any
const result = await ctx.breaker('payment-api', async () => {
return await stripe.charges.create({
customer: customerId,
amount,
})
})
return result
})
CLOSED ----[failure threshold reached]----> OPEN
^ |
| [resetTimeout elapsed]
| |
+------[halfOpenRequests succeed]------ HALF_OPEN
|
[any failure]
|
OPEN
| State |
Behavior |
| CLOSED |
Normal operation. Failures are counted. |
| OPEN |
All calls short-circuit. Jobs are requeued or dead-lettered based on onOpen. |
| HALF_OPEN |
Limited requests are allowed through. Success closes the breaker; failure reopens it. |
| Event |
Data |
When |
circuit:open |
{ name } |
Breaker tripped open |
circuit:half-open |
{ name } |
Breaker transitioned to half-open |
circuit:close |
{ name } |
Breaker recovered and closed |
const plugin = circuitBreaker({ breakers: { ... } })
q.use(plugin)
// Get breaker state
const breaker = plugin.getBreaker('payment-api')
breaker?.currentState // 'CLOSED' | 'OPEN' | 'HALF_OPEN'
Adaptive load shedding based on system health signals. Automatically reduces throughput when the system is under pressure.
npm install @psyqueue/plugin-backpressure
import { backpressure } from '@psyqueue/plugin-backpressure'
q.use(backpressure({
signals: {
queueDepth: { pressure: 1000, critical: 5000 },
errorRate: { pressure: 0.1, critical: 0.5 },
},
actions: {
pressure: ['reduce-concurrency'],
critical: ['pause-enqueue'],
},
recovery: {
cooldown: 30_000,
stepUp: 2,
},
}))
| Option |
Type |
Default |
Description |
signals.queueDepth.pressure |
number |
- |
Queue depth threshold for PRESSURE state |
signals.queueDepth.critical |
number |
- |
Queue depth threshold for CRITICAL state |
signals.errorRate.pressure |
number |
- |
Error rate threshold (0-1) for PRESSURE |
signals.errorRate.critical |
number |
- |
Error rate threshold (0-1) for CRITICAL |
actions.pressure |
string[] |
[] |
Actions to take in PRESSURE state |
actions.critical |
string[] |
[] |
Actions to take in CRITICAL state |
onPressure |
Function |
- |
Custom handler for PRESSURE state |
onCritical |
Function |
- |
Custom handler for CRITICAL state |
recovery.cooldown |
number |
0 |
Time in ms to wait before allowing recovery |
recovery.stepUp |
number |
10 |
Concurrency increase per recovery step |
| State |
Meaning |
HEALTHY |
System operating normally |
PRESSURE |
System under load; reducing throughput |
CRITICAL |
System overloaded; aggressive measures |
q.use(backpressure({
signals: { queueDepth: { pressure: 500, critical: 2000 } },
onPressure: async ({ setConcurrency }) => {
await setConcurrency(5) // reduce to 5 concurrent workers
},
onCritical: async ({ setConcurrency }) => {
await setConcurrency(1) // reduce to 1 concurrent worker
},
}))
The plugin exposes methods for feeding it system metrics:
const bp = backpressure({ signals: { ... } })
q.use(bp)
// Feed metrics from your monitoring system
setInterval(async () => {
await bp.updateMetrics({
queueDepth: await getQueueDepth(),
errorRate: getRecentErrorRate(),
})
}, 5000)
| Event |
Data |
When |
backpressure:pressure |
{ state, jobId? } |
Entered PRESSURE state |
backpressure:critical |
{ state, jobId? } |
Entered CRITICAL state |
backpressure:healthy |
{ state } |
Recovered to HEALTHY |
Deduplicates jobs using idempotency keys. Prevents duplicate processing when the same request is submitted multiple times.
npm install @psyqueue/plugin-exactly-once
import { exactlyOnce } from '@psyqueue/plugin-exactly-once'
q.use(exactlyOnce({
window: '24h',
onDuplicate: 'ignore',
cleanupInterval: '1h',
}))
| Option |
Type |
Default |
Description |
window |
string |
'24h' |
Dedup window. Formats: '500ms', '30s', '5m', '1h', '7d' |
onDuplicate |
'ignore' | 'reject' |
'ignore' |
Action when duplicate detected |
cleanupInterval |
string |
'1h' |
How often to prune expired dedup keys |
Depends on: backend
Add an idempotencyKey when enqueuing:
// First call: job is created
const id1 = await q.enqueue('payment.charge', { amount: 4999 }, {
idempotencyKey: 'order-789-charge',
})
// Second call with same key within window:
const id2 = await q.enqueue('payment.charge', { amount: 4999 }, {
idempotencyKey: 'order-789-charge',
})
// With 'ignore' mode: id2 === id1 (returns original job ID, no new job created)
// With 'reject' mode: throws DuplicateJobError
| Mode |
Behavior |
ignore |
Returns the original job ID. The caller sees success. No new job is created. |
reject |
Throws a DuplicateJobError with the original job ID. |
import { DuplicateJobError } from '@psyqueue/core'
try {
await q.enqueue('task', payload, { idempotencyKey: 'key-123' })
} catch (err) {
if (err instanceof DuplicateJobError) {
console.log(`Duplicate! Original job: ${err.existingJobId}`)
}
}
| Format |
Example |
Duration |
| Milliseconds |
'500ms' |
500ms |
| Seconds |
'30s' |
30 seconds |
| Minutes |
'5m' |
5 minutes |
| Hours |
'24h' |
24 hours |
| Days |
'7d' |
7 days |
Write-ahead log (WAL) based recovery of orphaned jobs. If the process crashes mid-execution, jobs are automatically requeued on the next startup.
npm install @psyqueue/plugin-crash-recovery
import { crashRecovery } from '@psyqueue/plugin-crash-recovery'
q.use(crashRecovery({
walPath: './psyqueue.wal',
autoRecover: true,
onRecoverActiveJob: 'requeue',
shutdownTimeout: 30_000,
}))
| Option |
Type |
Default |
Description |
walPath |
string |
'./psyqueue.wal' |
Path to the write-ahead log file |
autoRecover |
boolean |
true |
Automatically recover orphaned jobs on start |
onRecoverActiveJob |
'requeue' | 'fail' | Function |
'requeue' |
What to do with jobs that were active during a crash |
shutdownTimeout |
number |
30000 |
Max time to wait for in-flight jobs on shutdown (ms) |
Depends on: backend
- Before processing (guard phase): Writes
{ jobId, status: 'active' } to the WAL.
- After processing (finalize phase): Writes
{ jobId, status: 'completed' } to the WAL.
- On failure (finalize phase): Writes
{ jobId, status: 'failed' } to the WAL.
- On start: Reads the WAL and finds jobs marked
active without a subsequent completed or failed entry. These are orphaned jobs from a crash.
| Strategy |
Behavior |
'requeue' |
Put the job back in the queue for retry |
'fail' |
Nack without requeue (counts as a failure) |
| Custom function |
Per-job decision based on job ID and attempt number |
q.use(crashRecovery({
onRecoverActiveJob: async (job) => {
if (job.attempt >= 3) return 'dead-letter'
return 'requeue'
},
}))
On q.stop(), the plugin closes the WAL file. The shutdownTimeout option controls how long to wait for in-flight jobs to complete before force-stopping.