Inject controlled failures into job processing for resilience testing. Validate that your retry logic, circuit breakers, and compensation handlers work correctly under adverse conditions.
npm install @psyqueue/plugin-chaosimport { chaosMode } from '@psyqueue/plugin-chaos'
q.use(chaosMode({
enabled: process.env.CHAOS_ENABLED === 'true',
scenarios: [
{
type: 'slowProcess',
config: { probability: 0.3, minDelay: 500, maxDelay: 5000 },
},
{
type: 'workerCrash',
config: { probability: 0.1, message: 'Simulated crash' },
},
{
type: 'duplicateDelivery',
config: { probability: 0.05, extraRuns: 1 },
},
],
}))| Option | Type | Default | Description |
|---|---|---|---|
enabled |
boolean |
required | Master switch. When false, no chaos is injected. |
scenarios |
ChaosScenarioEntry[] |
required | List of chaos scenarios to apply |
Adds a random delay before processing to simulate slow external services or network latency.
{
type: 'slowProcess',
config: {
probability: 0.3, // 30% of jobs affected
minDelay: 500, // Minimum delay in ms
maxDelay: 5000, // Maximum delay in ms
},
}| Config | Type | Description |
|---|---|---|
probability |
number |
Chance of triggering (0.0-1.0) |
minDelay |
number |
Minimum added delay in ms |
maxDelay |
number |
Maximum added delay in ms |
When triggered, sets ctx.state['chaos_slowProcess'] = true and ctx.state['chaos_delay'] with the actual delay.
Throws an error to simulate a worker crash.
{
type: 'workerCrash',
config: {
probability: 0.1,
message: 'Simulated OOM',
},
}| Config | Type | Description |
|---|---|---|
probability |
number |
Chance of triggering (0.0-1.0) |
message |
string |
Error message (default: 'Chaos: simulated worker crash') |
Runs the handler multiple times to test idempotency.
{
type: 'duplicateDelivery',
config: {
probability: 0.05,
extraRuns: 1,
},
}| Config | Type | Description |
|---|---|---|
probability |
number |
Chance of triggering (0.0-1.0) |
extraRuns |
number |
How many extra times to run the handler (default: 1) |
| Event | Data | When |
|---|---|---|
chaos:enabled |
{ scenarios: string[] } |
Plugin initialized with chaos enabled |
The enabled flag is the master switch. Set it to false in production:
q.use(chaosMode({
enabled: process.env.NODE_ENV !== 'production',
scenarios: [ /* ... */ ],
}))Local-first job queue with background synchronization. When the remote backend is unreachable, jobs are buffered locally and synced when connectivity is restored.
npm install @psyqueue/plugin-offline-syncimport { offlineSync } from '@psyqueue/plugin-offline-sync'
q.use(offlineSync({
localPath: './local-queue.db',
sync: {
intervalMs: 30_000,
autoSync: true,
},
maxLocalJobs: 10_000,
}))| Option | Type | Default | Description |
|---|---|---|---|
localPath |
string |
required | Path to local SQLite buffer database |
remote |
string |
'backend' |
Namespace of the remote backend |
sync.intervalMs |
number |
- | Auto-sync interval in ms |
sync.autoSync |
boolean |
true |
Enable automatic background sync |
maxLocalJobs |
number |
- | Maximum jobs to buffer locally |
- When a job is enqueued, it goes through the normal pipeline.
- If the remote backend's enqueue fails (network error, timeout, etc.), the guard middleware catches the error and buffers the job locally.
- A background timer periodically pushes locally-buffered jobs to the remote backend.
- On
q.stop(), a final push attempt is made.
const api = q.getExposed('offline-sync') as any
// Manually push buffered jobs
const synced = await api.push()
console.log(`Synced ${synced} jobs`)
// Check how many jobs are buffered
const count = api.unsyncedCount()
// List locally-buffered jobs
const localJobs = api.listLocal()| Event | Data | When |
|---|---|---|
offline:fallback |
{ jobId, queue, error } |
Job buffered locally after remote failure |
offline:auto-synced |
{ count } |
Auto-sync pushed buffered jobs |
- Mobile or edge applications with intermittent connectivity
- Local development with an unreliable network
- Disaster recovery: buffer jobs during backend outages
Batches multiple similar jobs into a single fused job. Reduces database writes and network calls for high-volume scenarios like notifications, analytics events, or webhook deliveries.
npm install @psyqueue/plugin-job-fusionimport { jobFusion } from '@psyqueue/plugin-job-fusion'
q.use(jobFusion({
rules: [
{
match: 'notification.send',
groupBy: (job) => job.payload.userId,
window: 5000, // 5-second batching window
maxBatch: 50, // Flush after 50 items
fuse: (jobs) => ({
userId: jobs[0].payload.userId,
messages: jobs.map(j => j.payload.message),
count: jobs.length,
}),
},
],
}))interface FusionRule {
match: string // Job name to match
groupBy: (job: Job) => string // Group key function
window: number // Max time to wait before flushing (ms)
maxBatch: number // Max jobs per batch
fuse: (jobs: Job[]) => unknown // Combine job payloads into one
}| Field | Type | Description |
|---|---|---|
match |
string |
Job name to intercept |
groupBy |
(job: Job) => string |
Returns a group key. Jobs with the same key are batched together. |
window |
number |
Batching window in ms. Jobs are flushed after this duration. |
maxBatch |
number |
Maximum batch size. The batch flushes immediately when this count is reached. |
fuse |
(jobs: Job[]) => unknown |
Merge function. Receives all jobs in the batch and returns a single payload. |
- When a matching job is enqueued, the transform middleware intercepts it and adds it to a batch collector.
- The original enqueue is short-circuited (the job is NOT persisted individually).
- The batch flushes when either the
windowexpires ormaxBatchis reached. - A fused job is created with the merged payload and enqueued to the backend.
The fused job inherits properties from the batch:
- Priority: Maximum priority from all jobs in the batch
- Deadline: Earliest deadline from all jobs in the batch
- Timeout: Maximum timeout from all jobs in the batch
- Metadata: Includes
fusion.originalJobIds,fusion.groupKey,fusion.count
| Event | Data | When |
|---|---|---|
job:fused |
{ fusedJobId, originalJobIds, groupKey, count } |
Batch flushed and fused job enqueued |
q.use(jobFusion({
rules: [{
match: 'webhook.deliver',
groupBy: (job) => (job.payload as any).endpoint,
window: 2000,
maxBatch: 100,
fuse: (jobs) => ({
endpoint: (jobs[0]!.payload as any).endpoint,
events: jobs.map(j => (j.payload as any).event),
}),
}],
}))
// Handle the fused job
q.handle('webhook.deliver', async (ctx) => {
const { endpoint, events } = ctx.job.payload as any
// events is an array of all batched events
await fetch(endpoint, {
method: 'POST',
body: JSON.stringify({ events }),
})
})Payload migration and validation for evolving job schemas. When your payload format changes, define migration functions between versions and the plugin handles the rest.
npm install @psyqueue/plugin-schema-versioningimport { schemaVersioning } from '@psyqueue/plugin-schema-versioning'
q.use(schemaVersioning())No configuration options for the plugin itself. Versioning is configured per-handler.
Use schemaVersioning.versioned() to create a handler with version-aware processing:
import { schemaVersioning } from '@psyqueue/plugin-schema-versioning'
import { z } from 'zod'
q.use(schemaVersioning())
q.handle('email.send', schemaVersioning.versioned({
current: 2,
versions: {
1: {
schema: z.object({
to: z.string(),
body: z.string(),
}),
process: async (ctx) => {
// Handle v1 payloads
const { to, body } = ctx.job.payload as any
await sendEmail(to, 'No Subject', body)
},
migrate: (payload: any) => ({
to: payload.to,
subject: 'No Subject',
body: payload.body,
}),
},
2: {
schema: z.object({
to: z.string(),
subject: z.string(),
body: z.string(),
}),
process: async (ctx) => {
// Handle v2 payloads (current)
const { to, subject, body } = ctx.job.payload as any
await sendEmail(to, subject, body)
},
},
},
}))interface VersionConfig {
schema: ZodSchema // Zod schema for validation
process: JobHandler // Handler for this version
migrate?: (payload: unknown) => unknown // Migrate FROM this version to the next
}- Job is dequeued with
schemaVersion: 1. - Plugin detects current version is
2. - Builds migration chain:
v1.migratetransforms the payload. - Validates the migrated payload against
v2.schema. - If valid, calls
v2.process. - If invalid, dead-letters with
SCHEMA_MISMATCH.
Migrations are composable. If you're on version 3 and receive a version 1 payload:
v1 --[v1.migrate]--> v2 --[v2.migrate]--> v3
Each version's migrate function only needs to handle the transformation from that version to the next.
await q.enqueue('email.send', {
to: 'user@example.com',
subject: 'Hello',
body: 'World',
}, {
meta: { schemaVersion: 2 },
})Jobs without a schemaVersion default to version 1.
Validation uses Zod schemas. If the payload does not match the current version's schema after migration, the job is dead-lettered with a SCHEMA_MISMATCH reason.