Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
freezefrog==0.4.1
psutil==5.9.8
pytest==8.1.1
pytest==8.1.1
pytz
2 changes: 2 additions & 0 deletions tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
ACTIVE = "active"
SCHEDULED = "scheduled"
ERROR = "error"
WAITING = "waiting"
COMPLETED = "completed"

# This lock is acquired in the main process when forking, and must be acquired
# in any thread of the main process when performing an operation that triggers a
Expand Down
6 changes: 5 additions & 1 deletion tasktiger/lua/move_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ local key_active_queue = KEYS[6]
local key_queued_queue = KEYS[7]
local key_error_queue = KEYS[8]
local key_scheduled_queue = KEYS[9]
local key_activity = KEYS[10]
local key_waiting_queue = KEYS[10]
local key_completed_queue = KEYS[11]
local key_activity = KEYS[12]

local id = ARGV[1]
local queue = ARGV[2]
Expand All @@ -36,6 +38,8 @@ local state_queues_keys_by_state = {
queued = key_queued_queue,
error = key_error_queue,
scheduled = key_scheduled_queue,
waiting = key_waiting_queue,
completed = key_completed_queue,
}
local key_from_state_queue = state_queues_keys_by_state[from_state]
local key_to_state_queue = state_queues_keys_by_state[to_state]
Expand Down
6 changes: 5 additions & 1 deletion tasktiger/redis_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from redis import Redis

from ._internal import ACTIVE, ERROR, QUEUED, SCHEDULED
from ._internal import ACTIVE, ERROR, QUEUED, SCHEDULED, WAITING, COMPLETED

try:
from redis.commands.core import Script
Expand Down Expand Up @@ -634,6 +634,8 @@ def _none_to_empty_str(v: Optional[str]) -> str:
key_queued_queue = key_func(QUEUED, queue)
key_error_queue = key_func(ERROR, queue)
key_scheduled_queue = key_func(SCHEDULED, queue)
key_waiting_queue = key_func(WAITING, queue)
key_completed_queue = key_func(COMPLETED, queue)
key_activity = key_func("activity")

return self._move_task(
Expand All @@ -647,6 +649,8 @@ def _none_to_empty_str(v: Optional[str]) -> str:
key_queued_queue,
key_error_queue,
key_scheduled_queue,
key_waiting_queue,
key_completed_queue,
key_activity,
],
args=[
Expand Down
63 changes: 59 additions & 4 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
from structlog.stdlib import BoundLogger

from ._internal import (
ACTIVE,
ERROR,
QUEUED,
SCHEDULED,
WAITING,
COMPLETED,
g,
gen_id,
gen_unique_id,
Expand Down Expand Up @@ -53,6 +56,7 @@ def __init__(
unique_key: Optional[Collection[str]] = None,
lock: Optional[bool] = None,
lock_key: Optional[Collection[str]] = None,
depends: Optional[Union[str, Collection[str]]] = None,
retry: Optional[bool] = None,
retry_on: Optional[Collection[Type[BaseException]]] = None,
retry_method: Optional[
Expand Down Expand Up @@ -140,6 +144,8 @@ def __init__(
task["unique"] = True
if unique_key:
task["unique_key"] = unique_key
if depends:
task["depends"] = depends
if lock or lock_key:
task["lock"] = True
if lock_key:
Expand Down Expand Up @@ -212,6 +218,10 @@ def serialized_func(self) -> str:
def lock(self) -> bool:
return self._data.get("lock", False)

@property
def depends(self) -> List[str]:
return self._data.get("depends", None)

@property
def lock_key(self) -> Optional[str]:
return self._data.get("lock_key")
Expand Down Expand Up @@ -495,6 +505,48 @@ def from_id(
else:
raise TaskNotFound("Task {} not found.".format(task_id))

def get_dependencies(self, states: Optional[List[str]] = None) -> List["Task"]:
"""
Get the dependency tasks, use the states param to filter which states to search.
Use only for reporting!
"""
tasks: List[Task] = []
if not self.depends:
return tasks
if not states:
states = [QUEUED, ACTIVE, SCHEDULED, ERROR, WAITING, COMPLETED]
for dep_task_id in self.depends:
dep_task = None
for state in states:
try:
dep_task = self._get_dependency(state, self.queue, dep_task_id)
if dep_task:
break
except Exception:
pass
if dep_task:
tasks.append(dep_task)
else:
tasks.append(Task(self.tiger, queue="Not Found", _data={"id": dep_task_id}))
return tasks

def _get_dependency(
self, state: str, queue: str, task_id: str
) -> Union["Task", None]:
"""
Get the dependency task for the queue if it exists to avoid raising exceptions.
"""
exists = self.tiger.connection.zscore(self.tiger._key(state, queue), task_id)
if exists:
dep_task = Task.from_id(
tiger=self.tiger,
queue=queue,
state=state,
task_id=task_id,
)
return dep_task
return None

@classmethod
def tasks_from_queue(
cls,
Expand Down Expand Up @@ -621,12 +673,15 @@ def cancel(self) -> None:

def delete(self) -> None:
"""
Removes a task that's in the error queue.
Removes a task that's in the error or completed queue.

Raises TaskNotFound if the task could not be found in the ERROR
queue.
Raises TaskNotFound if the task could not be found
in the COMPLETED or ERROR queue.
"""
self._move(from_state=ERROR)
if self.state == COMPLETED:
self._move(from_state=COMPLETED)
else:
self._move(from_state=ERROR)

def clone(self) -> "Task":
"""Returns a clone of the this task"""
Expand Down
83 changes: 80 additions & 3 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import datetime
import importlib
import logging
import sys
import os
import signal
import time
from collections import defaultdict
from typing import (
Any,
Expand All @@ -25,6 +29,8 @@
ERROR,
QUEUED,
SCHEDULED,
WAITING,
COMPLETED,
classproperty,
g,
queue_matches,
Expand Down Expand Up @@ -395,6 +401,7 @@ def run_worker(
module: Optional[str] = None,
exclude_queues: Optional[str] = None,
max_workers_per_queue: Optional[int] = None,
max_parallel_workers: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor_class: Optional[Type[Executor]] = None,
exit_after: Optional[datetime.timedelta] = None,
Expand All @@ -405,7 +412,67 @@ def run_worker(
The arguments are explained in the module-level run_worker() method's
click options.
"""

if max_parallel_workers is None or max_parallel_workers <= 0:
max_parallel_workers = 1

# when run parallel workers we ignore ctrl-c for the main process
# since the children workers will handle the signal and finish gracefully
if max_parallel_workers > 0:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)

worker_pids: set[int] = set()
for _ in range(max_parallel_workers):
pid = os.getpid()
if max_parallel_workers > 1:
pid = os.fork()
if pid != 0:
worker_pids.add(pid)
continue

self._start_worker(
queues,
module,
exclude_queues,
max_workers_per_queue,
store_tracebacks,
executor_class,
exit_after,
)

if pid == 0:
# for children we clear the inherited values from the parent
worker_pids.clear()
max_parallel_workers = 1
break

# wait for children to finish
while len(worker_pids) > 0:
time.sleep(1)
try:
pid, _ = os.waitpid(-1, os.WNOHANG)
if pid != 0:
worker_pids.remove(pid)
except ChildProcessError as ex:
if ex.errno == 10: # no more children
worker_pids.clear()
except Exception as ex:
print(ex, type(ex), file=sys.stderr)

def _start_worker(
self,
queues: Optional[str] = None,
module: Optional[str] = None,
exclude_queues: Optional[str] = None,
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor_class: Optional[Type[Executor]] = None,
exit_after: Optional[datetime.timedelta] = None,
) -> None:
"""
Start worker
"""
try:
module_names = module or ""
for module_name in module_names.split(","):
Expand Down Expand Up @@ -439,6 +506,7 @@ def delay(
lock: Optional[bool] = None,
lock_key: Optional[Collection[str]] = None,
when: Optional[Union[datetime.datetime, datetime.timedelta]] = None,
depends: Optional[Union[str, Collection[str]]] = None,
retry: Optional[bool] = None,
retry_on: Optional[Collection[Type[BaseException]]] = None,
retry_method: Optional[
Expand All @@ -463,6 +531,7 @@ def delay(
unique_key=unique_key,
lock=lock,
lock_key=lock_key,
depends=depends,
retry=retry,
retry_on=retry_on,
retry_method=retry_method,
Expand All @@ -479,7 +548,7 @@ def get_queue_sizes(self, queue: str) -> Dict[str, int]:
Get the queue's number of tasks in each state.

Returns dict with queue size for the QUEUED, SCHEDULED, and ACTIVE
states. Does not include size of error queue.
states. Does not include size of error queue nor completed queue.
"""

states = [QUEUED, SCHEDULED, ACTIVE]
Expand Down Expand Up @@ -541,13 +610,13 @@ def get_queue_stats(self) -> Dict[str, Dict[str, str]]:
"""
Returns a dict with stats about all the queues. The keys are the queue
names, the values are dicts representing how many tasks are in a given
status ("queued", "active", "error" or "scheduled").
status ("queued", "active", "error", "waiting", "completed" or "scheduled").

Example return value:
{ "default": { "queued": 1, "error": 2 } }
"""

states = (QUEUED, ACTIVE, SCHEDULED, ERROR)
states = (QUEUED, ACTIVE, SCHEDULED, ERROR, WAITING, COMPLETED)

pipeline = self.connection.pipeline()
for state in states:
Expand Down Expand Up @@ -699,6 +768,12 @@ def would_process_configured_queue(self, queue_name: str) -> bool:
help="Maximum workers allowed to process a queue",
type=int,
)
@click.option(
"-P",
"--max-parallel-workers",
help="Maximum parallel workers",
type=int,
)
@click.option(
"--store-tracebacks/--no-store-tracebacks",
help="Store tracebacks with execution history",
Expand Down Expand Up @@ -728,6 +803,7 @@ def run_worker(
module: Optional[str] = None,
exclude_queues: Optional[str] = None,
max_workers_per_queue: Optional[int] = None,
max_parallel_workers: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor: Optional[str] = "fork",
exit_after: Optional[int] = None,
Expand Down Expand Up @@ -755,6 +831,7 @@ def run_worker(
module=module,
exclude_queues=exclude_queues,
max_workers_per_queue=max_workers_per_queue,
max_parallel_workers=max_parallel_workers,
store_tracebacks=store_tracebacks,
executor_class=executor_class,
exit_after=exit_after_td,
Expand Down
Loading