Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/contracts/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ export interface Adapter {
maxStalledCount: number
): Promise<number>

/**
* Renew the acquired timestamp of in-flight jobs (heartbeat).
*
* A worker calls this periodically for the jobs it is actively processing
* so that long-running handlers are not mistaken for stalled jobs and
* re-delivered while they are still running. Only jobs that are still active
* AND still owned by the calling worker (the one set via setWorkerId) are
* renewed; jobs that have already been recovered/completed, or have since
* been re-acquired by another worker, are skipped. This prevents a slow
* worker from resurrecting a job or sabotaging the recovery of the worker
* that legitimately owns it now with a late heartbeat.
*
* @param queue - The queue the jobs belong to
* @param jobIds - The ids of the jobs currently being processed
* @returns Number of jobs whose timestamp was renewed
*/
renewJobs(queue: string, jobIds: string[]): Promise<number>

/**
* Mark a job as completed and remove it from the queue.
*
Expand Down
23 changes: 21 additions & 2 deletions src/drivers/fake_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ interface ActiveJob {
job: JobData
acquiredAt: number
queue: string
workerId: string
}

interface DelayedJob {
Expand Down Expand Up @@ -81,6 +82,7 @@ export class FakeAdapter implements Adapter {
#pushedJobs: FakeJobRecord[] = []
#dedupIndex = new Map<string, Map<string, DedupEntry>>()
#onDispose?: () => void
#workerId: string = ''

/**
* Set the function to call when the fake is disposed
Expand All @@ -94,7 +96,9 @@ export class FakeAdapter implements Adapter {
this.#onDispose?.()
}

setWorkerId(_workerId: string): void {}
setWorkerId(workerId: string): void {
this.#workerId = workerId
}

getPushedJobs(): FakeJobRecord[] {
return [...this.#pushedJobs]
Expand Down Expand Up @@ -240,7 +244,7 @@ export class FakeAdapter implements Adapter {
}

const acquiredAt = Date.now()
this.#activeJobs.set(job.id, { job, acquiredAt, queue })
this.#activeJobs.set(job.id, { job, acquiredAt, queue, workerId: this.#workerId })

return { ...job, acquiredAt }
}
Expand Down Expand Up @@ -345,6 +349,21 @@ export class FakeAdapter implements Adapter {
return recovered
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
const now = Date.now()
let renewed = 0

for (const jobId of jobIds) {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue && active.workerId === this.#workerId) {
active.acquiredAt = now
renewed++
}
}

return renewed
}

async getJob(jobId: string, queue: string): Promise<JobRecord | null> {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue) {
Expand Down
20 changes: 20 additions & 0 deletions src/drivers/knex_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,26 @@ export class KnexAdapter implements Adapter {
})
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
if (jobIds.length === 0) {
return 0
}

const now = Date.now()

// Only renew jobs that are still active AND still owned by this worker; a
// job that was already recovered, finalized, or re-acquired by another
// worker will not match and is therefore never resurrected.
const renewed = await this.#connection(this.#jobsTable)
.where('queue', queue)
.where('status', 'active')
.where('worker_id', this.#workerId)
.whereIn('id', jobIds)
.update({ acquired_at: now })

return renewed
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()

Expand Down
21 changes: 21 additions & 0 deletions src/drivers/redis_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
PUSH_JOB_SCRIPT,
RECOVER_STALLED_JOBS_SCRIPT,
REMOVE_JOB_SCRIPT,
RENEW_JOBS_SCRIPT,
RETRY_JOB_SCRIPT,
} from './redis_scripts.js'

Expand Down Expand Up @@ -404,6 +405,26 @@ export class RedisAdapter implements Adapter {
return recovered as number
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
if (jobIds.length === 0) {
return 0
}

const keys = this.#getKeys(queue)
const now = Date.now()

const renewed = await this.#connection.eval(
RENEW_JOBS_SCRIPT,
1,
keys.active,
now.toString(),
this.#workerId,
...jobIds
)

return renewed as number
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()
const now = Date.now()
Expand Down
31 changes: 31 additions & 0 deletions src/drivers/redis_scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,37 @@ ${REDIS_JOB_STORAGE_LUA}
return recovered
`

/**
* Lua script for renewing the acquired timestamp of in-flight jobs (heartbeat).
* Only entries still present in the active hash AND still owned by the calling
* worker are renewed, so a job that was already recovered, finalized, or
* re-acquired by another worker is never resurrected by a late heartbeat.
* Preserves the existing worker info, updating only acquiredAt.
* Returns the number of jobs renewed.
*/
export const RENEW_JOBS_SCRIPT = `
local active_key = KEYS[1]
local now = tonumber(ARGV[1])
local worker_id = ARGV[2]

local renewed = 0
for i = 3, #ARGV do
local job_id = ARGV[i]
local active_data = redis.call('HGET', active_key, job_id)
if active_data then
local active = cjson.decode(active_data)
-- Only the worker that currently owns the lease may renew it.
if active.workerId == worker_id then
active.acquiredAt = now
redis.call('HSET', active_key, job_id, cjson.encode(active))
renewed = renewed + 1
end
end
end

return renewed
`

/**
* Lua script for getting a job record with its status.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/drivers/sync_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ export class SyncAdapter implements Adapter {
return Promise.resolve(0)
}

renewJobs(_queue: string, _jobIds: string[]): Promise<number> {
// SyncAdapter executes jobs immediately - there is nothing to renew
return Promise.resolve(0)
}

getJob(_jobId: string, _queue: string): Promise<null> {
return Promise.resolve(null)
}
Expand Down
24 changes: 24 additions & 0 deletions src/job_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ export class JobPool {
this.#activeJobs.set(job.id, { promise, job, queue })
}

/**
* Get the ids of all currently running jobs, grouped by the queue they
* came from.
*
* Used by the worker heartbeat to renew the acquired timestamp of in-flight
* jobs so long-running handlers are not mistaken for stalled jobs.
*
* @returns A map of queue name to the job ids running for that queue
*/
activeJobIdsByQueue(): Map<string, string[]> {
const byQueue = new Map<string, string[]>()

for (const { job, queue } of this.#activeJobs.values()) {
const ids = byQueue.get(queue)
if (ids) {
ids.push(job.id)
} else {
byQueue.set(queue, [job.id])
}
}

return byQueue
}

/**
* Wait for the next job to complete and return it.
*
Expand Down
Loading
Loading