Complete API documentation for the psyqueue core package.
The main entry point. Creates a micro-kernel instance that coordinates plugins, middleware, and the backend.
const q = new PsyQueue()No arguments. Configure by registering plugins with q.use().
Create a PsyQueue instance from a named preset configuration.
static from(
preset: string | PresetConfig,
overrides?: Partial<PresetConfig>,
): { queue: PsyQueue; config: PresetConfig }Parameters:
| Name | Type | Description |
|---|---|---|
preset |
string | PresetConfig |
Preset name ('lite', 'saas', 'enterprise') or a custom config object |
overrides |
Partial<PresetConfig> |
Additional plugins to merge into the preset |
Returns: { queue: PsyQueue; config: PresetConfig } -- The queue instance and the resolved config listing required plugin names. You still need to install and .use() the actual plugins.
const { queue, config } = PsyQueue.from('lite')
// config.plugins = ['backend-sqlite', 'scheduler', 'crash-recovery']Available presets:
| Preset | Plugins |
|---|---|
lite |
backend-sqlite, scheduler, crash-recovery |
saas |
backend-redis, tenancy, workflows, saga, circuit-breaker, backpressure, dashboard, metrics |
enterprise |
backend-postgres, tenancy, workflows, saga, exactly-once, audit-log, otel-tracing, schema-versioning, circuit-breaker, backpressure, dashboard, metrics |
Returns true after q.start() completes and before q.stop() is called.
The public event bus. Use to subscribe to lifecycle events.
q.events.on('job:completed', (event) => { /* ... */ })
q.events.off('job:completed', handler)Dead letter queue management API.
q.deadLetter: {
list(filter?: JobFilter): Promise<PaginatedResult<Job>>
replay(jobId: string): Promise<void>
replayAll(filter?: JobFilter): Promise<number>
purge(opts?: { queue?: string; before?: Date }): Promise<number>
}Methods:
| Method | Description |
|---|---|
list(filter?) |
List dead-lettered jobs. Supports filtering by queue, name, date range, pagination. |
replay(jobId) |
Requeue a specific dead-lettered job. Throws if job not found or not in dead status. |
replayAll(filter?) |
Requeue all matching dead-lettered jobs. Returns count of replayed jobs. |
purge(opts?) |
Permanently remove dead-lettered jobs. Returns count of purged jobs. |
Register a plugin. Returns this for chaining.
q.use(sqlite({ path: ':memory:' }))
.use(scheduler())
.use(crashRecovery())The plugin's init() method is called immediately during use(). Throws DUPLICATE_PLUGIN if a plugin with the same name is already registered.
Start the queue: connect the backend and start all plugins in dependency order.
Throws NO_BACKEND if no backend plugin has been registered.
await q.start()Stop the queue: stop all plugins in reverse order, then disconnect the backend.
await q.stop()Enqueue a job. Runs through the enqueue middleware pipeline, then persists via the backend.
const jobId = await q.enqueue('email.send', {
to: 'user@example.com',
subject: 'Hello',
}, {
priority: 5,
maxRetries: 5,
timeout: 15_000,
})Parameters:
| Name | Type | Description |
|---|---|---|
name |
string |
Job name. Also used as the default queue name. |
payload |
unknown |
Arbitrary job data. |
opts |
EnqueueOpts |
Optional settings (see below). |
Returns: Promise<string> -- The job ID (ULID).
Bulk enqueue jobs atomically via the backend. Events are emitted for each job.
const ids = await q.enqueueBulk([
{ name: 'email.send', payload: { to: 'a@b.com' } },
{ name: 'email.send', payload: { to: 'c@d.com' }, opts: { priority: 10 } },
])Parameters:
| Name | Type | Description |
|---|---|---|
items |
Array<{ name: string; payload: unknown; opts?: EnqueueOpts }> |
Jobs to enqueue. |
Returns: Promise<string[]> -- Array of job IDs.
Note: enqueueBulk bypasses the enqueue middleware pipeline and goes directly to the backend. Use enqueue if you need middleware processing (rate limiting, deduplication, etc.).
Dequeue and process the next job from a queue.
const hadWork = await q.processNext('email.send')Parameters:
| Name | Type | Description |
|---|---|---|
queue |
string |
Queue name to dequeue from. |
Returns: Promise<boolean> -- true if a job was processed (success or failure), false if the queue was empty.
Processing flow:
- Dequeues one job from the backend.
- Looks up the registered handler by job name.
- Runs the
processmiddleware pipeline. - Runs the handler.
- On success: acks the job, emits
job:completed. - On failure: classifies the error, retries or dead-letters, emits
job:failed.
Start a continuous worker pool for a queue. Jobs are automatically dequeued and processed using registered handlers. Uses blocking reads (BRPOPLPUSH) for Redis backends, polling for others.
q.startWorker('email.send', {
concurrency: 10,
pollInterval: 50,
blockTimeout: 5000,
batchSize: 20,
})Parameters:
| Name | Type | Description |
|---|---|---|
queue |
string |
Queue name to consume from. |
opts |
WorkerOpts |
Optional worker settings (see WorkerOpts). |
Throws WORKER_EXISTS if a worker is already running for the given queue. Workers are automatically stopped when q.stop() is called.
Stop all running worker pools. Waits for in-flight handlers to complete before resolving.
await q.stopWorkers()This is called automatically by q.stop(). You can also call it directly to stop workers without stopping the queue.
Register a job handler.
q.handle('email.send', async (ctx) => {
const { to, subject } = ctx.job.payload as any
await sendEmail(to, subject)
return { sent: true }
}, {
concurrency: 5,
timeout: 30_000,
})Parameters:
| Name | Type | Description |
|---|---|---|
name |
string |
Job name to handle. |
handler |
JobHandler |
Async function receiving JobContext. Return value becomes job.result. |
opts |
HandlerOpts |
Optional handler settings. |
Register user middleware for a lifecycle event.
q.pipeline('process', async (ctx, next) => {
console.log(`Processing ${ctx.job.name}`)
await next()
}, { phase: 'observe' })Parameters:
| Name | Type | Description |
|---|---|---|
event |
LifecycleEvent |
One of: 'enqueue', 'dequeue', 'process', 'complete', 'fail', 'retry', 'schedule' |
fn |
Middleware |
(ctx: JobContext, next: () => Promise<void>) => Promise<void> |
opts.phase |
MiddlewarePhase |
One of: 'guard', 'validate', 'transform', 'observe', 'execute', 'finalize'. Default: 'execute'. |
Get an API exposed by a plugin.
const workflowsApi = q.getExposed('workflows')The complete job object stored in the backend.
interface Job {
id: string // ULID, auto-generated
queue: string // Queue name (defaults to job name)
name: string // Job name (matches handler registration)
payload: unknown // Arbitrary data
schemaVersion?: number // For schema-versioning plugin
priority: number // Higher = dequeued first. Default: 0
deadline?: Date // Optional deadline for priority boosting
runAt?: Date // Future execution time (creates scheduled job)
cron?: string // Cron expression for recurring jobs
tenantId?: string // Tenant identifier for multi-tenancy
idempotencyKey?: string // For exactly-once deduplication
maxRetries: number // Max retry attempts. Default: 3
attempt: number // Current attempt number (starts at 1)
backoff: BackoffStrategy | BackoffFunction // Default: 'exponential'
backoffBase?: number // Base delay in ms. Default: 1000
backoffCap?: number // Max delay in ms. Default: 300000
backoffJitter?: boolean // +/- 25% randomization. Default: true
timeout: number // Handler timeout in ms. Default: 30000
workflowId?: string // Set by workflows plugin
stepId?: string // Set by workflows plugin
parentJobId?: string // Set by workflows plugin
traceId?: string // Set by OTel tracing plugin
spanId?: string // Set by OTel tracing plugin
status: JobStatus // Current status
result?: unknown // Handler return value (on completion)
error?: JobError // Error details (on failure)
createdAt: Date
startedAt?: Date
completedAt?: Date
meta: Record<string, unknown> // Arbitrary metadata
}type JobStatus = 'pending' | 'scheduled' | 'active' | 'completed' | 'failed' | 'dead'Options for q.enqueue().
interface EnqueueOpts {
queue?: string // Override queue name (default: job name)
tenantId?: string // Tenant ID for multi-tenancy
priority?: number // Default: 0
deadline?: Date // Deadline for priority boosting
runAt?: Date // Schedule for future execution
cron?: string // Cron expression for recurring jobs
idempotencyKey?: string // Deduplication key
maxRetries?: number // Default: 3
timeout?: number // Default: 30000 (30s)
backoff?: BackoffStrategy | BackoffFunction // Default: 'exponential'
backoffBase?: number // Default: 1000 (1s)
backoffCap?: number // Default: 300000 (5min)
backoffJitter?: boolean // Default: true
meta?: Record<string, unknown>
}interface HandlerOpts {
concurrency?: number // Max concurrent executions
timeout?: number // Handler timeout in ms
queue?: string // Override queue name
}Options for q.startWorker().
interface WorkerOpts {
/** Number of parallel handlers (default: 1) */
concurrency?: number
/** Blocking dequeue timeout in ms (default: 5000) */
blockTimeout?: number
/** Max jobs to grab per cycle (default: 2x concurrency) */
batchSize?: number
/** Poll interval for non-blocking backends in ms (default: 50) */
pollInterval?: number
}| Option | Default | Description |
|---|---|---|
concurrency |
1 |
Number of handlers that can run in parallel. The worker uses a semaphore to enforce this limit. |
blockTimeout |
5000 |
For blocking backends (Redis): how long to wait in BRPOPLPUSH before retrying, in ms. |
batchSize |
2 * concurrency |
How many jobs to pre-fetch per Redis call. Higher values reduce round-trips but use more memory. |
pollInterval |
50 |
For non-blocking backends (SQLite, Postgres): sleep duration in ms when the queue is empty. |
type BackoffStrategy = 'fixed' | 'exponential' | 'linear'
type BackoffFunction = (attempt: number, error?: Error) => number| Strategy | Formula | Example (base=1000) |
|---|---|---|
fixed |
base |
1s, 1s, 1s, 1s |
linear |
base * attempt |
1s, 2s, 3s, 4s |
exponential |
base * 2^(attempt-1) |
1s, 2s, 4s, 8s |
All strategies respect backoffCap and apply +/- 25% jitter when backoffJitter is true.
Passed to handlers and middleware.
interface JobContext {
job: Job
event: LifecycleEvent
tenant?: { id: string; tier: string; [key: string]: unknown }
trace?: { traceId: string; spanId: string }
workflow?: { workflowId: string; stepId: string; results: Record<string, unknown> }
results?: Record<string, unknown>
state: Record<string, unknown>
log: Logger
requeue(opts?: { delay?: number; priority?: number }): void
deadLetter(reason: string): void
breaker(name: string, fn: () => Promise<unknown>): Promise<unknown>
updateJob(updates: Partial<Job>): Promise<void>
enqueue(name: string, payload: unknown, opts?: EnqueueOpts): Promise<string>
}interface PsyPlugin {
name: string
version: string
provides?: string | string[]
depends?: string[]
init(kernel: Kernel): void
start?(): Promise<void>
stop?(): Promise<void>
destroy?(): Promise<void>
}The kernel interface provided to plugins during init().
interface Kernel {
events: EventBusInterface
pipeline(event: LifecycleEvent, fn: Middleware, opts?: { phase?: MiddlewarePhase }): void
getBackend(): BackendAdapter
expose(namespace: string, api: Record<string, unknown>): void
getExposed(namespace: string): Record<string, unknown> | undefined
}interface BackendAdapter {
name: string
type: string
connect(): Promise<void>
disconnect(): Promise<void>
healthCheck(): Promise<boolean>
enqueue(job: Job): Promise<string>
enqueueBulk(jobs: Job[]): Promise<string[]>
dequeue(queue: string, count: number): Promise<DequeuedJob[]>
ack(jobId: string, completionToken?: string): Promise<AckResult>
nack(jobId: string, opts?: NackOpts): Promise<void>
getJob(jobId: string): Promise<Job | null>
listJobs(filter: JobFilter): Promise<PaginatedResult<Job>>
scheduleAt(job: Job, runAt: Date): Promise<string>
pollScheduled(now: Date, limit: number): Promise<Job[]>
acquireLock(key: string, ttlMs: number): Promise<boolean>
releaseLock(key: string): Promise<void>
atomic(ops: AtomicOp[]): Promise<void>
// --- Optional methods for high-performance backends ---
/** Whether this backend supports blocking dequeue (e.g., Redis BRPOPLPUSH) */
supportsBlocking?: boolean
/** Block until a job is available or timeout expires. Returns empty array on timeout. */
blockingDequeue?(queue: string, timeoutMs: number): Promise<DequeuedJob[]>
/** Non-blocking batch pop -- grab up to `count` jobs immediately. */
batchDequeue?(queue: string, count: number): Promise<DequeuedJob[]>
/** Acknowledge multiple jobs in a single round-trip (pipeline). */
ackBatch?(items: Array<{ jobId: string; completionToken?: string }>): Promise<AckResult[]>
/** Ack current job AND dequeue next job in one atomic Lua call. */
ackAndFetch?(
jobId: string,
completionToken: string | undefined,
queue: string,
): Promise<{ ackResult: AckResult; nextJob: DequeuedJob | null }>
}The optional methods (blockingDequeue, batchDequeue, ackBatch, ackAndFetch) are used by startWorker() when available. The Redis backend implements all of them. SQLite and Postgres backends use standard dequeue() with polling.
interface EventBusInterface {
on(event: string, handler: EventHandler): void
off(event: string, handler: EventHandler): void
emit(event: string, data: unknown, source?: string): void
}
type EventHandler<T = unknown> = (event: PsyEvent<T>) => void | Promise<void>interface PsyEvent<T = unknown> {
type: string // Event name (e.g., 'job:completed')
timestamp: Date
source: string // 'kernel', plugin name, etc.
data: T
traceId?: string
}interface JobFilter {
queue?: string
status?: JobStatus | JobStatus[]
tenantId?: string
name?: string
from?: Date
to?: Date
limit?: number // Default: backend-specific
offset?: number // Default: 0
sortBy?: 'createdAt' | 'priority' | 'runAt'
sortOrder?: 'asc' | 'desc'
}interface PaginatedResult<T> {
data: T[]
total: number
limit: number
offset: number
hasMore: boolean
}interface NackOpts {
requeue?: boolean // Requeue the job for retry
delay?: number // Delay before requeue (ms)
deadLetter?: boolean // Move to dead letter queue
reason?: string // Reason for nack
}interface JobError {
message: string
code?: string
stack?: string
category?: 'transient' | 'validation' | 'rate-limit' | 'fatal' | 'unknown'
retryable: boolean
}type MiddlewarePhase = 'guard' | 'validate' | 'transform' | 'observe' | 'execute' | 'finalize'type LifecycleEvent = 'enqueue' | 'dequeue' | 'process' | 'complete' | 'fail' | 'retry' | 'schedule'interface Logger {
debug(msg: string, data?: Record<string, unknown>): void
info(msg: string, data?: Record<string, unknown>): void
warn(msg: string, data?: Record<string, unknown>): void
error(msg: string, data?: Record<string, unknown>): void
}All errors extend PsyQueueError which has a code property.
| Error | Code | When |
|---|---|---|
PsyQueueError |
varies | Base error class |
PluginError |
PLUGIN_ERROR |
Plugin-specific error |
DependencyError |
MISSING_DEPENDENCY |
Plugin requires a missing dependency |
CircularDependencyError |
CIRCULAR_DEPENDENCY |
Plugin dependency cycle detected |
RateLimitError |
RATE_LIMIT_EXCEEDED |
Tenant exceeded rate limit |
DuplicateJobError |
DUPLICATE_JOB |
Job with same idempotency key exists |
SchemaError |
SCHEMA_MISMATCH |
Payload does not match expected schema |
import { PsyQueueError, RateLimitError } from '@psyqueue/core'
try {
await q.enqueue('job', payload)
} catch (err) {
if (err instanceof RateLimitError) {
console.log(`Rate limited. Retry after ${err.retryAfter}ms`)
}
}Create a Job object with defaults applied. Used internally by q.enqueue().
Generate a ULID string. Used internally for job IDs.
Create a JobContext for middleware execution. Used internally.
The event bus class. Instantiated internally by the kernel.
The plugin registry class. Instantiated internally by the kernel.
The middleware pipeline class. Instantiated internally by the kernel.
The presets configuration object:
import { presets } from '@psyqueue/core'
console.log(presets.lite.plugins)
// ['backend-sqlite', 'scheduler', 'crash-recovery']