Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions packages/durably/src/durably.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Dialect } from 'kysely'
import { Kysely } from 'kysely'
import { Kysely, sql } from 'kysely'
import { monotonicFactory } from 'ulidx'
import type { z } from 'zod'
import type { JobDefinition } from './define-job'
Expand Down Expand Up @@ -134,6 +134,7 @@ function parseDuration(value: string): number {
}

const PURGE_INTERVAL_MS = 60_000
const CHECKPOINT_INTERVAL_MS = 60_000

/** Real wall-clock delegating to globalThis timers. */
const realClock: RuntimeClock = {
Expand Down Expand Up @@ -462,6 +463,8 @@ interface DurablyState<
preserveSteps: boolean
migrating: Promise<void> | null
migrated: boolean
walCheckpointSupported: boolean
probeWalCheckpoint: (() => Promise<void>) | null
leaseMs: number
leaseRenewIntervalMs: number
pollingIntervalMs: number
Expand Down Expand Up @@ -885,8 +888,10 @@ function createDurablyInstance<
}

state.migrating = runMigrations(db)
.then(() => {
.then(async () => {
state.migrated = true
await state.probeWalCheckpoint?.()
state.probeWalCheckpoint = null
})
.finally(() => {
state.migrating = null
Expand Down Expand Up @@ -971,20 +976,44 @@ export function createDurably<
const eventEmitter = createEventEmitter()
const jobRegistry = createJobRegistry()
let lastPurgeAt = 0
let lastCheckpointAt = 0

const runIdleMaintenance = async (): Promise<void> => {
try {
const now = new Date().toISOString()
await storage.releaseExpiredLeases(now)
const nowMs = Date.now()
await storage.releaseExpiredLeases(new Date(nowMs).toISOString())

if (config.retainRunsMs !== null) {
const purgeNow = Date.now()
if (purgeNow - lastPurgeAt >= PURGE_INTERVAL_MS) {
lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - config.retainRunsMs).toISOString()
if (nowMs - lastPurgeAt >= PURGE_INTERVAL_MS) {
lastPurgeAt = nowMs
const cutoff = new Date(nowMs - config.retainRunsMs).toISOString()
await storage.purgeRuns({ olderThan: cutoff, limit: 100 })
}
}

if (state.walCheckpointSupported) {
if (nowMs - lastCheckpointAt >= CHECKPOINT_INTERVAL_MS) {
lastCheckpointAt = nowMs
try {
const result = await sql`PRAGMA wal_checkpoint(TRUNCATE)`.execute(
db,
)
const row = result.rows[0] as
| { busy: number; log: number; checkpointed: number }
| undefined
if (row?.busy !== 0) {
// Retry sooner on next idle cycle (but not immediately)
lastCheckpointAt = nowMs - CHECKPOINT_INTERVAL_MS / 2
Comment thread
coji marked this conversation as resolved.
}
} catch (checkpointError) {
eventEmitter.emit({
type: 'worker:error',
error: getErrorMessage(checkpointError),
context: 'wal-checkpoint',
})
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
} catch (error) {
eventEmitter.emit({
type: 'worker:error',
Expand Down Expand Up @@ -1021,6 +1050,8 @@ export function createDurably<
preserveSteps: config.preserveSteps,
migrating: null,
migrated: false,
walCheckpointSupported: false,
probeWalCheckpoint: null,
leaseMs: config.leaseMs,
leaseRenewIntervalMs: config.leaseRenewIntervalMs,
pollingIntervalMs: config.pollingIntervalMs,
Expand All @@ -1029,6 +1060,23 @@ export function createDurably<
runIdleMaintenance,
}

if (backend === 'generic' && !isBrowserLikeEnvironment()) {
state.probeWalCheckpoint = async () => {
try {
const result = await sql`PRAGMA wal_checkpoint(PASSIVE)`.execute(db)
const row = result.rows[0] as
| { busy: number; log: number; checkpointed: number }
| undefined
// log === -1 means WAL is not active (e.g. libSQL local uses DELETE mode)
if (row && row.log !== -1) {
state.walCheckpointSupported = true
}
} catch {
// Remote backends (e.g. Turso) reject checkpoint pragmas
}
}
}

const instance = createDurablyInstance<Record<string, never>, TLabels>(
state,
{},
Expand Down
Loading