Skip to content

Latest commit

 

History

History
342 lines (251 loc) · 8.98 KB

File metadata and controls

342 lines (251 loc) · 8.98 KB

Getting Started with PsyQueue

This guide walks you through installing PsyQueue, creating your first job, adding retries, scheduling delayed work, and upgrading backends.

Installation

PsyQueue requires Node.js 20+ and uses ESM modules.

# Core package + SQLite backend (zero-infra)
npm install @psyqueue/core @psyqueue/backend-sqlite

If you use pnpm or yarn:

pnpm add psyqueue @psyqueue/backend-sqlite
# or
yarn add psyqueue @psyqueue/backend-sqlite

Your First Job

Create a file worker.ts:

import { PsyQueue } from '@psyqueue/core'
import { sqlite } from '@psyqueue/backend-sqlite'

// 1. Create the queue with a SQLite backend
const q = new PsyQueue()
q.use(sqlite({ path: './jobs.db' }))

// 2. Register a job handler
q.handle('email.send', async (ctx) => {
  const { to, subject } = ctx.job.payload as { to: string; subject: string }
  ctx.log.info(`Sending email to ${to}: ${subject}`)

  // Your email sending logic here
  await sendEmail(to, subject)

  return { sent: true, to }
})

// 3. Start the queue
await q.start()

// 4. Enqueue a job
const jobId = await q.enqueue('email.send', {
  to: 'alice@example.com',
  subject: 'Welcome to our platform',
})

console.log(`Enqueued job: ${jobId}`)

// 5. Process it
const processed = await q.processNext('email.send')
console.log(`Job processed: ${processed}`) // true

// 6. Graceful shutdown
await q.stop()

Key points:

  • The queue (second argument) defaults to the job name if not specified.
  • processNext dequeues one job, runs the handler, and acks/nacks automatically.
  • The ctx.log logger prefixes messages with the job ID.

Production Worker Mode

For production use, startWorker() launches a continuous worker pool that automatically dequeues and processes jobs. It uses blocking reads (BRPOPLPUSH) for Redis backends and polling for SQLite/Postgres.

import { PsyQueue } from '@psyqueue/core'
import { redis } from '@psyqueue/backend-redis'

const q = new PsyQueue()
q.use(redis({ host: 'localhost', port: 6379 }))

q.handle('email.send', async (ctx) => {
  const { to, subject } = ctx.job.payload as { to: string; subject: string }
  await sendEmail(to, subject)
  return { sent: true }
})

await q.start()

// Start a worker pool with 10 concurrent handlers
q.startWorker('email.send', {
  concurrency: 10,      // Max parallel handlers (default: 1)
  pollInterval: 50,     // Poll interval for non-blocking backends in ms (default: 50)
  blockTimeout: 5000,   // Blocking dequeue timeout in ms (default: 5000)
  batchSize: 20,        // Jobs to pre-fetch per cycle (default: 2x concurrency)
})

// Enqueue jobs -- the worker picks them up automatically
await q.enqueue('email.send', { to: 'alice@example.com', subject: 'Hello' })

// Graceful shutdown (stops all workers, then the queue)
await q.stop()

When to use which:

Method Use Case
processNext(queue) Tests, scripts, one-off processing, simple loops
startWorker(queue, opts) Production services, high-throughput processing

startWorker() manages the dequeue loop for you -- it uses a single loop with semaphore-controlled concurrency, pre-fetches jobs into a local buffer to reduce Redis round-trips, and uses ackAndFetch fusion when available to ack the current job and dequeue the next in a single Redis call.

Adding Retry Logic

PsyQueue retries failed jobs automatically. The default is 3 retries with exponential backoff.

// Override retry settings per job
const jobId = await q.enqueue('payment.charge', {
  customerId: 'cus_123',
  amount: 4999,
}, {
  maxRetries: 5,                // retry up to 5 times
  backoff: 'exponential',       // exponential, linear, or fixed
  backoffBase: 2000,            // start at 2 seconds
  backoffCap: 60_000,           // max delay: 60 seconds
  backoffJitter: true,          // +/- 25% jitter (default: true)
  timeout: 10_000,              // fail if handler takes > 10s
})

Error Classification

PsyQueue automatically classifies errors to decide retry behavior:

Category Retryable Detection
transient Yes timeout, ECONNREFUSED, ECONNRESET in message
rate-limit Yes rate limit in message
validation No validation, invalid in message
fatal No Error has category: 'fatal'
unknown Yes Default for unrecognized errors

You can set the category explicitly on errors:

const err = new Error('Bad input')
;(err as any).category = 'validation' // won't retry
throw err

Dead Letter Queue

When retries are exhausted (or a non-retryable error occurs), jobs move to the dead letter queue.

// List dead-lettered jobs
const dead = await q.deadLetter.list({ queue: 'payment.charge' })

// Replay a specific job
await q.deadLetter.replay(jobId)

// Replay all dead-lettered jobs
const count = await q.deadLetter.replayAll({ queue: 'payment.charge' })

// Purge old dead letters
await q.deadLetter.purge({ before: new Date('2025-01-01') })

Scheduling Delayed Jobs

Install the scheduler plugin:

npm install @psyqueue/plugin-scheduler
import { scheduler } from '@psyqueue/plugin-scheduler'

const q = new PsyQueue()
q.use(sqlite({ path: './jobs.db' }))
q.use(scheduler({ pollInterval: 1000 }))

await q.start()

// Schedule a job to run 30 minutes from now
await q.enqueue('report.generate', { type: 'daily' }, {
  runAt: new Date(Date.now() + 30 * 60 * 1000),
})

// Schedule a recurring cron job
await q.enqueue('cleanup.expired', {}, {
  cron: '0 2 * * *', // every day at 2:00 AM
})

The scheduler plugin polls for due jobs and moves them from scheduled to pending. Cron jobs are automatically re-scheduled after each completion.

Adding Priority

Jobs have a default priority of 0. Higher values are dequeued first.

// High-priority job
await q.enqueue('notification.push', { userId: '123' }, {
  priority: 10,
})

// Low-priority background job
await q.enqueue('analytics.sync', { batch: 42 }, {
  priority: -5,
})

For deadline-aware priority boosting, see the deadline-priority plugin.

Bulk Enqueue

For inserting many jobs at once, use enqueueBulk which uses the backend's atomic batch insert:

const ids = await q.enqueueBulk([
  { name: 'email.send', payload: { to: 'alice@example.com' } },
  { name: 'email.send', payload: { to: 'bob@example.com' } },
  { name: 'email.send', payload: { to: 'carol@example.com' } },
])

Monitoring with the Dashboard

Install the dashboard plugin:

npm install @psyqueue/dashboard
import { dashboard } from '@psyqueue/dashboard'

q.use(dashboard({ port: 3001 }))

await q.start()
// Dashboard available at http://localhost:3001

Optionally add authentication:

q.use(dashboard({
  port: 3001,
  auth: { type: 'bearer', credentials: 'my-secret-token' },
}))

Listening to Events

PsyQueue emits events for every lifecycle transition:

q.events.on('job:completed', (event) => {
  const { jobId, name, result } = event.data as any
  console.log(`Job ${jobId} (${name}) completed:`, result)
})

q.events.on('job:failed', (event) => {
  const { jobId, error } = event.data as any
  console.error(`Job ${jobId} failed:`, error)
})

// Wildcard: listen to all job events
q.events.on('job:*', (event) => {
  console.log(`[${event.type}]`, event.data)
})

Custom Middleware

Add your own middleware to any lifecycle event:

// Log every job before processing
q.pipeline('process', async (ctx, next) => {
  console.log(`Processing: ${ctx.job.name} [${ctx.job.id}]`)
  const start = Date.now()
  await next()
  console.log(`Done in ${Date.now() - start}ms`)
}, { phase: 'observe' })

// Validate payloads before enqueue
q.pipeline('enqueue', async (ctx, next) => {
  if (!ctx.job.payload) {
    ctx.deadLetter('Empty payload')
    return
  }
  await next()
}, { phase: 'validate' })

Upgrading to Redis

When you outgrow SQLite, switch to Redis without changing any application code:

npm install @psyqueue/backend-redis
import { redis } from '@psyqueue/backend-redis'

const q = new PsyQueue()
q.use(redis({
  host: 'localhost',
  port: 6379,
  password: 'secret',
  db: 0,
}))

// Everything else stays the same
q.handle('email.send', async (ctx) => { /* ... */ })
await q.start()

Or use Postgres for ACID compliance:

npm install @psyqueue/backend-postgres
import { postgres } from '@psyqueue/backend-postgres'

q.use(postgres({
  connectionString: 'postgresql://user:pass@localhost:5432/psyqueue',
}))

Next Steps

  • Read the Architecture Guide to understand the kernel, middleware pipeline, and plugin system.
  • Browse Plugin Guides for detailed configuration of every plugin.
  • Check the API Reference for the full PsyQueue API.
  • See the Comparison Guide for how PsyQueue stacks up against BullMQ, Celery, and Temporal.