Skip to content

Commit 22e4040

Browse files
committed
Adding a killing feature for jobs (mrq/{job.py,worker.py)
1 parent f806295 commit 22e4040

2 files changed

Lines changed: 30 additions & 7 deletions

File tree

mrq/job.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,14 @@ def abort(self):
249249
self._attach_original_exception(exc)
250250
raise exc
251251

252+
def kill(self):
253+
""" Kill the current job """
254+
context.connections.redis.rpush("{}:wcmd:{}".format(context.get_current_config()["redis_prefix"],
255+
context.get_current_worker()),
256+
"kill {}".format(self.id))
257+
self._save_status("killed")
258+
pass
259+
252260
def cancel(self):
253261
""" Markes the current job as cancelled. Doesn't interrupt it. """
254262
self._save_status("cancel")
@@ -321,15 +329,15 @@ def perform(self):
321329
# pylint: disable=protected-access
322330

323331
gevent.sleep(0)
324-
current_greenlet = gevent.getcurrent()
332+
self.current_greenlet = gevent.getcurrent()
325333
t = (datetime.datetime.utcnow() - self.datestarted).total_seconds()
326334

327335
context.log.debug(
328336
"Job %s success: %0.6fs total, %0.6fs in greenlet, %s switches" %
329337
(self.id,
330338
t,
331-
current_greenlet._trace_time,
332-
current_greenlet._trace_switches - 1)
339+
self.current_greenlet._trace_time,
340+
self.current_greenlet._trace_switches - 1)
333341
)
334342

335343
else:
@@ -454,13 +462,13 @@ def _save_status(self, status, updates=None, exception=False, w=None, j=None):
454462
db_updates["totaltime"] = (now - self.datestarted).total_seconds()
455463

456464
if context.get_current_config().get("trace_greenlets"):
457-
current_greenlet = gevent.getcurrent()
465+
self.current_greenlet = gevent.getcurrent()
458466

459467
# TODO are we sure the current job is doing the save_status() on itself?
460-
if hasattr(current_greenlet, "_trace_time"):
468+
if hasattr(self.current_greenlet, "_trace_time"):
461469
# pylint: disable=protected-access
462-
db_updates["time"] = current_greenlet._trace_time
463-
db_updates["switches"] = current_greenlet._trace_switches
470+
db_updates["time"] = self.current_greenlet._trace_time
471+
db_updates["switches"] = self.current_greenlet._trace_switches
464472

465473
if exception:
466474
trace = traceback.format_exc()

mrq/worker.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,18 @@ def report_worker(self, w=0):
328328
except Exception as e: # pylint: disable=broad-except
329329
self.log.debug("Worker report failed: %s" % e)
330330

331+
def greenlet_command_handler(self):
332+
"""
333+
This greenlet is used to execute commands directly in the worker
334+
"""
335+
while True:
336+
command = self.redis.blpop("{}:wcmd:{}".format(get_current_config()["redis_prefix"],
337+
self.id)
338+
).split(" ")
339+
if command[0] == "kill":
340+
Job(command[1]).current_greenlet.kill(block=True)
341+
pass
342+
331343
def greenlet_admin(self):
332344
""" This greenlet is used to get status information about the worker
333345
when --admin_port was given
@@ -436,6 +448,9 @@ def work_init(self):
436448
if self.config["admin_port"]:
437449
self.greenlets["admin"] = gevent.spawn(self.greenlet_admin)
438450

451+
# ehould add a condition using the config
452+
self.greenlets["command_handler"] = gevent.spawn(self.greenlet_command_handler)
453+
439454
self.install_signal_handlers()
440455

441456
def work_loop(self, max_jobs=None, max_time=None):

0 commit comments

Comments
 (0)