diff --git a/pika_heartbeat/pika_heartbeat.py b/pika_heartbeat/pika_heartbeat.py index 7f9514b..99c7819 100644 --- a/pika_heartbeat/pika_heartbeat.py +++ b/pika_heartbeat/pika_heartbeat.py @@ -1,17 +1,20 @@ import argparse import logging import time +import threading import pika +from repeated_timer import RepeatedTimer + DEFAULT_HEARTBEAT_SECS = 60 def connect( host="localhost", virtual_host="/", - username="guest", - password="guest", + username='guest', + password='guest', heartbeat=DEFAULT_HEARTBEAT_SECS, ): param = pika.ConnectionParameters( @@ -22,6 +25,17 @@ def connect( ) return pika.BlockingConnection(parameters=param) +def rabbit_sleep(lock, connection): + """ + This is used to ensure that we are sending heartbeats to rabbitMQ + """ + if lock.acquire(timeout=0): + try: + connection.sleep(0) + except: + print("Error calling rabbit connection sleep") + finally: + lock.release() def main(): logging.basicConfig( @@ -30,15 +44,23 @@ def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter ) + lock = threading.Lock() parser.add_argument("--heartbeat", type=int, default=DEFAULT_HEARTBEAT_SECS) + parser.add_argument("--username", type=str, default='guest') + parser.add_argument("--password", type=str, default='guest') args = parser.parse_args() heartbeat_secs = args.heartbeat sleep_secs = heartbeat_secs * 5 # sleep enough so that broker closes connection - conn = connect(heartbeat=args.heartbeat) + conn = connect(username=args.username, password=args.password, heartbeat=args.heartbeat) + repeat_timer = RepeatedTimer(heartbeat_secs, rabbit_sleep, lock, conn) ch = conn.channel() + lock.acquire() ch.basic_publish("", "test", "") + lock.release() time.sleep(sleep_secs) + lock.acquire() ch.basic_publish("", "test", "") # this is expected to fail + lock.release() if __name__ == "__main__": diff --git a/pika_heartbeat/repeated_timer.py b/pika_heartbeat/repeated_timer.py new file mode 100644 index 0000000..5610d71 --- /dev/null +++ b/pika_heartbeat/repeated_timer.py @@ -0,0 +1,42 @@ +''' +Creates background thread which runs task regularly +''' +from threading import Timer + +class RepeatedTimer(object): + ''' + Class for repeat timer + ''' + def __init__(self, interval, task, *args, **kwargs): + self._timer = None + self.interval = interval + self.task = task + self.args = args + self.kwargs = kwargs + self.is_running = False + self.start() + + def _run(self): + ''' + start the timer again and call the function + ''' + self.is_running = False + self.start() + self.task(*self.args, **self.kwargs) + + def start(self): + ''' + start timer thread + ''' + if not self.is_running: + self._timer = Timer(self.interval, self._run) + self._timer.daemon = True + self._timer.start() + self.is_running = True + + def stop(self): + ''' + stop timer thread + ''' + self._timer.cancel() + self.is_running = False