Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import random
from datetime import datetime, timedelta

import psutil

from odoo import _, api, exceptions, fields, models
from odoo.osv import expression
from odoo.tools import config, html_escape
Expand Down Expand Up @@ -417,6 +419,35 @@ def autovacuum(self):
break
return True

def _check_job_worker_pid(self):
"""
Checking that job's worker pids still exist
If not, it means that the worker has been killed
"""
jobs = self.env["queue.job"].search(
[
("state", "=", "started"),
("worker_pid", "!=", False),
]
)

for job in jobs:
if not psutil.pid_exists(job.worker_pid):
_logger.info(
"Worker %d executing job %s does not exist"
% (job.worker_pid, job.uuid)
)
_job = Job.load(job.env, job.uuid)
_job.set_failed(
exc_name=_("WorkerError"),
exc_info=_(
"The worker executing the job was killed."
"This is likely to be due to a timeout"
),
exc_message=_("Associated worker was killed"),
)
_job.store()

def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""Fix jobs that are in a bad states

Expand All @@ -431,6 +462,9 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""
if started_delta == -1:
started_delta = (config["limit_time_real"] // 60) + 1

self._check_job_worker_pid()

return self._get_stuck_jobs_to_requeue(
enqueued_delta=enqueued_delta, started_delta=started_delta
).requeue()
Expand Down