From 20b0e021a3a3b2ac5ed3879925d24bd349376a82 Mon Sep 17 00:00:00 2001 From: Sadat Tarique Date: Tue, 16 Jun 2026 16:38:03 -0400 Subject: [PATCH 1/2] Catch OSError when persisting heartbeat to queue Prevents a crash when the disk is full by catching OSError in add_request() and logging a warning instead of letting it propagate. Fixes #8 --- aw_client/client.py | 7 +++++-- tests/test_requestqueue.py | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/aw_client/client.py b/aw_client/client.py index 4e454be..13eabfd 100644 --- a/aw_client/client.py +++ b/aw_client/client.py @@ -551,7 +551,10 @@ def add_request(self, endpoint: str, data: dict) -> None: """ assert "/heartbeat" in endpoint assert isinstance(data, dict) - self._persistqueue.put(QueuedRequest(endpoint, data)) + try: + self._persistqueue.put(QueuedRequest(endpoint, data)) + except OSError as e: + logger.warning(f"Failed to queue request, possibly due to insufficient disk space: {e}") def register_bucket(self, bucket_id: str, event_type: str) -> None: - self._registered_buckets.append(Bucket(bucket_id, event_type)) + self._registered_buckets.append(Bucket(bucket_id, event_type)) \ No newline at end of file diff --git a/tests/test_requestqueue.py b/tests/test_requestqueue.py index 0e28fff..117e61a 100644 --- a/tests/test_requestqueue.py +++ b/tests/test_requestqueue.py @@ -60,3 +60,16 @@ def test_complex(): sleep(1) rq.stop() rq.join() + +def test_add_request_disk_full(): + """Ensures that add_request doesn't crash if the queue can't be written to disk""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + + def raise_oserror(*args, **kwargs): + raise OSError("No space left on device") + + rq._persistqueue.put = raise_oserror # type: ignore + + # Should not raise, the OSError should be caught internally and logged instead + rq.add_request("/api/0/buckets/test/heartbeat", {}) \ No newline at end of file From 5bc06921760016c33f7558e7c218c8c06f93f9b3 Mon Sep 17 00:00:00 2001 From: Sadat Tarique Date: Thu, 18 Jun 2026 21:44:04 -0400 Subject: [PATCH 2/2] Implement wait_for_queue_empty method Adds RequestQueue.wait_for_queue_empty() which blocks until all queued heartbeats have been dispatched, with an optional timeout. Also exposes it via ActivityWatchClient.wait_for_queue_empty() for convenience. Replaces the need for callers to use a hardcoded sleep() to wait for the queue to drain. Fixes #24 --- aw_client/client.py | 24 ++++++++++++++++++++ tests/test_requestqueue.py | 46 +++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/aw_client/client.py b/aw_client/client.py index 13eabfd..00a915f 100644 --- a/aw_client/client.py +++ b/aw_client/client.py @@ -395,6 +395,10 @@ def wait_for_start(self, timeout: int = 10) -> None: else: raise Exception(f"Server at {self.server_address} did not start in time") + def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool: + """Wait for all queued requests to be sent. See RequestQueue.wait_for_queue_empty.""" + return self.request_queue.wait_for_queue_empty(timeout=timeout) + QueuedRequest = namedtuple("QueuedRequest", ["endpoint", "data"]) Bucket = namedtuple("Bucket", ["id", "type"]) @@ -481,6 +485,26 @@ def wait(self, seconds) -> bool: def should_stop(self) -> bool: return self._stop_event.is_set() + def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool: + """ + Wait until the queue is empty, or until timeout (in seconds) is reached. + Returns instantly (True) if the queue thread isn't running. + + :param timeout: max time to wait, in seconds. Waits indefinitely if None. + :return: True if the queue became empty, False if the timeout was reached. + """ + if not self.is_alive(): + return True + + start_time = datetime.now() + while self._persistqueue.qsize() > 0 or self._current is not None: + if timeout is not None and (datetime.now() - start_time).total_seconds() >= timeout: + return False + if self.wait(0.1): + # stop() was called while waiting + return False + return True + def _dispatch_request(self) -> None: request = self._get_next() if not request: diff --git a/tests/test_requestqueue.py b/tests/test_requestqueue.py index 117e61a..1d5720f 100644 --- a/tests/test_requestqueue.py +++ b/tests/test_requestqueue.py @@ -72,4 +72,48 @@ def raise_oserror(*args, **kwargs): rq._persistqueue.put = raise_oserror # type: ignore # Should not raise, the OSError should be caught internally and logged instead - rq.add_request("/api/0/buckets/test/heartbeat", {}) \ No newline at end of file + rq.add_request("/api/0/buckets/test/heartbeat", {}) + +def test_wait_for_queue_empty_basic(): + """Queue empties normally while connected and running.""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + rq.start() + + rq.add_request("/api/0/buckets/test/heartbeat", {}) + result = rq.wait_for_queue_empty(timeout=5) + + rq.stop() + rq.join() + assert result is True + + +def test_wait_for_queue_empty_not_running(): + """Returns True immediately if the queue thread isn't running.""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + # Thread never started, should return True instantly + result = rq.wait_for_queue_empty(timeout=5) + assert result is True + + +def test_wait_for_queue_empty_timeout(): + """Returns False if the queue doesn't empty before the timeout.""" + import unittest.mock as mock + + client = MockClient() + rq = RequestQueue(client) # type: ignore + + # Make _post block long enough that the queue won't empty before timeout + def slow_post(endpoint, data): + from time import sleep + sleep(10) + + with mock.patch.object(client, "_post", slow_post): + rq.start() + rq.add_request("/api/0/buckets/test/heartbeat", {}) + result = rq.wait_for_queue_empty(timeout=0.5) + rq.stop() + rq.join() + + assert result is False \ No newline at end of file