This guide walks you through installing PsyQueue, creating your first job, adding retries, scheduling delayed work, and upgrading backends.
PsyQueue requires Node.js 20+ and uses ESM modules.
# Core package + SQLite backend (zero-infra)
npm install @psyqueue/core @psyqueue/backend-sqliteIf you use pnpm or yarn:
pnpm add psyqueue @psyqueue/backend-sqlite
# or
yarn add psyqueue @psyqueue/backend-sqliteCreate a file worker.ts:
import { PsyQueue } from '@psyqueue/core'
import { sqlite } from '@psyqueue/backend-sqlite'
// 1. Create the queue with a SQLite backend
const q = new PsyQueue()
q.use(sqlite({ path: './jobs.db' }))
// 2. Register a job handler
q.handle('email.send', async (ctx) => {
const { to, subject } = ctx.job.payload as { to: string; subject: string }
ctx.log.info(`Sending email to ${to}: ${subject}`)
// Your email sending logic here
await sendEmail(to, subject)
return { sent: true, to }
})
// 3. Start the queue
await q.start()
// 4. Enqueue a job
const jobId = await q.enqueue('email.send', {
to: 'alice@example.com',
subject: 'Welcome to our platform',
})
console.log(`Enqueued job: ${jobId}`)
// 5. Process it
const processed = await q.processNext('email.send')
console.log(`Job processed: ${processed}`) // true
// 6. Graceful shutdown
await q.stop()Key points:
- The
queue(second argument) defaults to the job name if not specified. processNextdequeues one job, runs the handler, and acks/nacks automatically.- The
ctx.loglogger prefixes messages with the job ID.
For production use, startWorker() launches a continuous worker pool that automatically dequeues and processes jobs. It uses blocking reads (BRPOPLPUSH) for Redis backends and polling for SQLite/Postgres.
import { PsyQueue } from '@psyqueue/core'
import { redis } from '@psyqueue/backend-redis'
const q = new PsyQueue()
q.use(redis({ host: 'localhost', port: 6379 }))
q.handle('email.send', async (ctx) => {
const { to, subject } = ctx.job.payload as { to: string; subject: string }
await sendEmail(to, subject)
return { sent: true }
})
await q.start()
// Start a worker pool with 10 concurrent handlers
q.startWorker('email.send', {
concurrency: 10, // Max parallel handlers (default: 1)
pollInterval: 50, // Poll interval for non-blocking backends in ms (default: 50)
blockTimeout: 5000, // Blocking dequeue timeout in ms (default: 5000)
batchSize: 20, // Jobs to pre-fetch per cycle (default: 2x concurrency)
})
// Enqueue jobs -- the worker picks them up automatically
await q.enqueue('email.send', { to: 'alice@example.com', subject: 'Hello' })
// Graceful shutdown (stops all workers, then the queue)
await q.stop()When to use which:
| Method | Use Case |
|---|---|
processNext(queue) |
Tests, scripts, one-off processing, simple loops |
startWorker(queue, opts) |
Production services, high-throughput processing |
startWorker() manages the dequeue loop for you -- it uses a single loop with semaphore-controlled concurrency, pre-fetches jobs into a local buffer to reduce Redis round-trips, and uses ackAndFetch fusion when available to ack the current job and dequeue the next in a single Redis call.
PsyQueue retries failed jobs automatically. The default is 3 retries with exponential backoff.
// Override retry settings per job
const jobId = await q.enqueue('payment.charge', {
customerId: 'cus_123',
amount: 4999,
}, {
maxRetries: 5, // retry up to 5 times
backoff: 'exponential', // exponential, linear, or fixed
backoffBase: 2000, // start at 2 seconds
backoffCap: 60_000, // max delay: 60 seconds
backoffJitter: true, // +/- 25% jitter (default: true)
timeout: 10_000, // fail if handler takes > 10s
})PsyQueue automatically classifies errors to decide retry behavior:
| Category | Retryable | Detection |
|---|---|---|
transient |
Yes | timeout, ECONNREFUSED, ECONNRESET in message |
rate-limit |
Yes | rate limit in message |
validation |
No | validation, invalid in message |
fatal |
No | Error has category: 'fatal' |
unknown |
Yes | Default for unrecognized errors |
You can set the category explicitly on errors:
const err = new Error('Bad input')
;(err as any).category = 'validation' // won't retry
throw errWhen retries are exhausted (or a non-retryable error occurs), jobs move to the dead letter queue.
// List dead-lettered jobs
const dead = await q.deadLetter.list({ queue: 'payment.charge' })
// Replay a specific job
await q.deadLetter.replay(jobId)
// Replay all dead-lettered jobs
const count = await q.deadLetter.replayAll({ queue: 'payment.charge' })
// Purge old dead letters
await q.deadLetter.purge({ before: new Date('2025-01-01') })Install the scheduler plugin:
npm install @psyqueue/plugin-schedulerimport { scheduler } from '@psyqueue/plugin-scheduler'
const q = new PsyQueue()
q.use(sqlite({ path: './jobs.db' }))
q.use(scheduler({ pollInterval: 1000 }))
await q.start()
// Schedule a job to run 30 minutes from now
await q.enqueue('report.generate', { type: 'daily' }, {
runAt: new Date(Date.now() + 30 * 60 * 1000),
})
// Schedule a recurring cron job
await q.enqueue('cleanup.expired', {}, {
cron: '0 2 * * *', // every day at 2:00 AM
})The scheduler plugin polls for due jobs and moves them from scheduled to pending. Cron jobs are automatically re-scheduled after each completion.
Jobs have a default priority of 0. Higher values are dequeued first.
// High-priority job
await q.enqueue('notification.push', { userId: '123' }, {
priority: 10,
})
// Low-priority background job
await q.enqueue('analytics.sync', { batch: 42 }, {
priority: -5,
})For deadline-aware priority boosting, see the deadline-priority plugin.
For inserting many jobs at once, use enqueueBulk which uses the backend's atomic batch insert:
const ids = await q.enqueueBulk([
{ name: 'email.send', payload: { to: 'alice@example.com' } },
{ name: 'email.send', payload: { to: 'bob@example.com' } },
{ name: 'email.send', payload: { to: 'carol@example.com' } },
])Install the dashboard plugin:
npm install @psyqueue/dashboardimport { dashboard } from '@psyqueue/dashboard'
q.use(dashboard({ port: 3001 }))
await q.start()
// Dashboard available at http://localhost:3001Optionally add authentication:
q.use(dashboard({
port: 3001,
auth: { type: 'bearer', credentials: 'my-secret-token' },
}))PsyQueue emits events for every lifecycle transition:
q.events.on('job:completed', (event) => {
const { jobId, name, result } = event.data as any
console.log(`Job ${jobId} (${name}) completed:`, result)
})
q.events.on('job:failed', (event) => {
const { jobId, error } = event.data as any
console.error(`Job ${jobId} failed:`, error)
})
// Wildcard: listen to all job events
q.events.on('job:*', (event) => {
console.log(`[${event.type}]`, event.data)
})Add your own middleware to any lifecycle event:
// Log every job before processing
q.pipeline('process', async (ctx, next) => {
console.log(`Processing: ${ctx.job.name} [${ctx.job.id}]`)
const start = Date.now()
await next()
console.log(`Done in ${Date.now() - start}ms`)
}, { phase: 'observe' })
// Validate payloads before enqueue
q.pipeline('enqueue', async (ctx, next) => {
if (!ctx.job.payload) {
ctx.deadLetter('Empty payload')
return
}
await next()
}, { phase: 'validate' })When you outgrow SQLite, switch to Redis without changing any application code:
npm install @psyqueue/backend-redisimport { redis } from '@psyqueue/backend-redis'
const q = new PsyQueue()
q.use(redis({
host: 'localhost',
port: 6379,
password: 'secret',
db: 0,
}))
// Everything else stays the same
q.handle('email.send', async (ctx) => { /* ... */ })
await q.start()Or use Postgres for ACID compliance:
npm install @psyqueue/backend-postgresimport { postgres } from '@psyqueue/backend-postgres'
q.use(postgres({
connectionString: 'postgresql://user:pass@localhost:5432/psyqueue',
}))- Read the Architecture Guide to understand the kernel, middleware pipeline, and plugin system.
- Browse Plugin Guides for detailed configuration of every plugin.
- Check the API Reference for the full PsyQueue API.
- See the Comparison Guide for how PsyQueue stacks up against BullMQ, Celery, and Temporal.