Skip to content

fapoli/pg-scheduler

Repository files navigation

@fapoli/pg-scheduler

npm GitHub repo license

A PostgreSQL-backed distributed task scheduler for Node.js. Inspired by db-scheduler.

Why does this exist?

In distributed systems, running background jobs reliably is non-trivial.

When multiple instances of an application are running, naive schedulers will execute the same task in every instance—causing duplicate work, race conditions, and inconsistent state.

This project uses PostgreSQL as a coordination layer to ensure that tasks are executed by only one worker at a time. By leveraging row-level locking (FOR UPDATE SKIP LOCKED), it enables safe, distributed job execution without requiring additional infrastructure.

Features

  • Distributed-safe via FOR UPDATE SKIP LOCKED
  • One-time and recurring tasks
  • Heartbeat-based dead execution recovery
  • Configurable failure handlers with exponential backoff, fixed delay, or max retries
  • Graceful shutdown
  • No external dependencies beyond pg

Installation

npm install @fapoli/pg-scheduler

pg is a peer dependency — install it if you haven't already:

npm install pg

Database setup

Run this migration against your PostgreSQL database:

CREATE TABLE IF NOT EXISTS scheduled_tasks (
  task_name            TEXT        NOT NULL,
  task_instance        TEXT        NOT NULL,
  task_data            JSONB,
  execution_time       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  picked               BOOLEAN     NOT NULL DEFAULT FALSE,
  picked_by            TEXT,
  last_success         TIMESTAMPTZ,
  last_failure         TIMESTAMPTZ,
  consecutive_failures INT         NOT NULL DEFAULT 0,
  last_heartbeat       TIMESTAMPTZ,
  version              BIGINT      NOT NULL DEFAULT 1,
  priority             INT,
  PRIMARY KEY (task_name, task_instance)
);

CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_last_heartbeat
    ON scheduled_tasks (last_heartbeat);

CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_picker
    ON scheduled_tasks (priority DESC NULLS LAST, execution_time ASC)
    WHERE picked = FALSE;

Usage

Register tasks and start the worker

import { Pool } from 'pg';
import { oneTime, recurring, scheduleTask, startWorker } from '@fapoli/pg-scheduler';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

// Register a one-time task
oneTime({
  name: 'send-welcome-email',
  run: async (data) => {
    await sendEmail(data.userId);
  },
});

// Register a recurring task
recurring({
  name: 'cleanup-expired-sessions',
  intervalMs: 60 * 60 * 1000, // every hour
  run: async () => {
    await deleteExpiredSessions();
  },
});

// Schedule initial executions (ON CONFLICT DO NOTHING — safe to call on every boot)
await scheduleTask({ pool, taskName: 'send-welcome-email', taskInstance: 'user-123', taskData: { userId: 'user-123' } });
await scheduleTask({ pool, taskName: 'cleanup-expired-sessions', taskInstance: 'default', taskData: null });

// Start the worker
const worker = startWorker({ pool });

// Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.shutdown();
  process.exit(0);
});

One-time tasks

One-time tasks are deleted after successful execution.

oneTime({
  name: 'my-task',
  run: async (data) => { /* ... */ },
});

Recurring tasks

Recurring tasks reschedule themselves after each successful execution based on intervalMs.

recurring({
  name: 'my-recurring-task',
  intervalMs: 5 * 60 * 1000, // every 5 minutes
  run: async (data) => { /* ... */ },
});

Scheduling tasks

// Schedule immediately (or supply an executionTime)
await scheduleTask({
  pool,
  taskName: 'my-task',
  taskInstance: 'unique-instance-id',
  taskData: { foo: 'bar' }, // pass null if no data
  executionTime: new Date(Date.now() + 5000), // optional, defaults to now
  tableName: 'scheduled_tasks',              // optional, defaults to 'scheduled_tasks'
});

// Reschedule an existing task (throws if not found or currently running)
await rescheduleTask({
  pool,
  taskName: 'my-task',
  taskInstance: 'unique-instance-id',
  taskData: { foo: 'baz' },
  executionTime: new Date(Date.now() + 10000),
  tableName: 'scheduled_tasks',              // optional, defaults to 'scheduled_tasks'
});

Failure handlers

import { exponentialBackoff, fixedDelay, maxRetries } from '@fapoli/pg-scheduler';

recurring({
  name: 'my-task',
  intervalMs: 60000,
  run: async () => { /* ... */ },
  failureHandler: exponentialBackoff(1000, 5), // 1s initial delay, max 5 retries
});

// Other built-ins
fixedDelay(5000, 3)   // retry every 5s, max 3 times
maxRetries(3)          // retry immediately, max 3 times

Worker options

startWorker({
  pool,
  tableName: 'scheduled_tasks',     // default
  workerId: 'my-worker-1',          // default: hostname-pid
  pollingIntervalMs: 10000,         // default: 10s
  batchSize: 10,                    // default: 10
  heartbeatTimeoutMs: 300000,       // default: 5 minutes
  heartbeatIntervalMs: 100000,      // default: heartbeatTimeoutMs / 3
  logger: console,                  // default: console — inject your own
});

Custom logger

startWorker({
  pool,
  logger: {
    info: (msg) => myLogger.info(msg),
    warn: (msg) => myLogger.warn(msg),
    error: (msg, err) => myLogger.error(msg, err),
  },
});

License

MIT