Skip to content

Commit 8a2ff23

Browse files
authored
Merge pull request #8 from RHDZMOTA/DEV
2022-09-06 RELEASE-PRD (1)
2 parents 8ee0da6 + 58b2dc6 commit 8a2ff23

10 files changed

Lines changed: 129 additions & 6 deletions

File tree

requirements-extras.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ipython==8.4.0
2+
celery[redis]==5.1.2

src/rhdzmota/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.2.0
1+
2.0.0

src/rhdzmota/celery.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from celery import Celery
2+
3+
from .settings import (
4+
RHDZMOTA_CELERY_BROKER_HOST
5+
)
6+
7+
8+
# Celery application instance
9+
app = Celery(
10+
"tasks",
11+
broker=f"redis://{RHDZMOTA_CELERY_BROKER_HOST}",
12+
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
2+
from .celery import app
3+
from .heartbeat.daemon import Daemon
4+
5+
6+
@app.task
7+
def daemon_publisher(**kwargs):
8+
print(kwargs)
9+
10+
11+
# Daemon instance
12+
daemon = Daemon(name="hello-world", publisher=daemon_publisher)
13+
daemon.broadcast()
14+
15+
16+
@app.task
17+
def worker(name: str) -> str:
18+
return f"Hello, {name or 'world'}!"

src/rhdzmota/cli.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from typing import Optional
23

34
from .settings import logger_manager
@@ -10,11 +11,20 @@
1011

1112
class CLI(CLIBase):
1213

13-
@CLIBase.Formatter()
14-
def hello(self, world: Optional[str] = None):
15-
logger.debug("CLI Hello command execution.")
14+
@CLIBase.Formatter(default=str)
15+
def hello(self, world: Optional[str] = None, sleep: int = 1, delegate: bool = False):
1616
world = world or "world"
17-
return f"Hello, {world}!"
17+
time.sleep(sleep)
18+
try:
19+
from .celery_worker_hello import worker
20+
21+
if delegate:
22+
return worker.delay(name=world)
23+
24+
logger.debug("CLI Hello command execution.")
25+
return worker(name=world)
26+
except ImportError:
27+
return f"Hello, {world}!"
1828

1929
@CLIBase.Formatter()
2030
def gist(

src/rhdzmota/heartbeat/__init__.py

Whitespace-only changes.

src/rhdzmota/heartbeat/daemon.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from threading import Thread
2+
from typing import Any, Callable, Optional, Union, TYPE_CHECKING
3+
4+
from .daemon_heart import DaemonHeart
5+
6+
7+
if TYPE_CHECKING:
8+
# TODO: Quickfix guided by https://stackoverflow.com/questions/63714223/correct-type-annotation-for-a-celery-task
9+
from celery.task import Task
10+
from celery.local import PromiseProxy
11+
12+
CELERY_TASK_TYPE = Union[Task, PromiseProxy]
13+
else:
14+
CELERY_TASK_TYPE = Any
15+
16+
17+
class Daemon:
18+
19+
@staticmethod
20+
def summon(name: str, publisher: CELERY_TASK_TYPE, **heart_configs):
21+
def decorator(worker: CELERY_TASK_TYPE):
22+
def wrapper(*args, **kwargs):
23+
# Initialize the daemon and broadcast heartbeat
24+
daemon = Daemon(name=name, publisher=publisher, **heart_configs)
25+
daemon.broadcast()
26+
# Execute the worker function
27+
output = worker.delay(*args, **kwargs)
28+
# Kill the daemon
29+
daemon.kill()
30+
return output
31+
return wrapper
32+
return decorator
33+
34+
def __init__(
35+
self,
36+
name: str,
37+
publisher: CELERY_TASK_TYPE,
38+
**heart_configs
39+
):
40+
self.name = name
41+
self.publisher = publisher
42+
self.broadcast_thread: Optional[Thread] = None
43+
self.heart = DaemonHeart.beat(
44+
app_name=name,
45+
**{ # type: ignore
46+
"mode": DaemonHeart.Mode.MONITOR,
47+
"enable_beat_logs": True,
48+
"enable_pulse_monitor": True,
49+
"pulse_monitor_frequency": None,
50+
"pulse_monitor_sensibility_factor": 1.5,
51+
**heart_configs
52+
}
53+
)
54+
55+
def kill(self):
56+
# Stop heart
57+
self.heart.stroke()
58+
self.heart.join()
59+
self.heart.close()
60+
# Stop broadcast
61+
self.broadcast_thread.join() if self.broadcast_thread is not None else None
62+
63+
def broadcast(self):
64+
self.broadcast_thread: Thread = Thread(
65+
target=lambda: self._broadcast(),
66+
daemon=True
67+
)
68+
self.broadcast_thread.start()
69+
70+
def _broadcast(self):
71+
while not self.heart.no_pulse.is_set():
72+
payload = self.heart.queue_beat_logs.get(block=True)
73+
self.publisher.delay(**payload)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from multiprocessing import Event, Queue, Process, Value
77
from threading import Thread
88

9-
from .runtime import system_describe
9+
from ..utils.runtime import system_describe
1010

1111

1212
class DaemonHeart(Process):

src/rhdzmota/settings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ def singleton(cls, overwrite_config_dictionary: Optional[Dict] = None) -> 'Logge
123123
logger_manager = LoggerManager.singleton()
124124
logger = logger_manager.get_logger(name=__name__)
125125

126+
# Celery
127+
128+
RHDZMOTA_CELERY_BROKER_HOST = get_environ_variable(
129+
name="RHDZMOTA_CELERY_BROKER_HOST",
130+
default="127.0.0.1"
131+
)
132+
126133
# JWT
127134

128135
JWT_ENCRYPTION_ALGORITHM: str = get_environ_variable(

worker-hello.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
celery -A rhdzmota.celery_worker_hello worker --loglevel=INFO --concurrency=2 -O fair -P prefork

0 commit comments

Comments
 (0)