diff --git a/aw_client/client.py b/aw_client/client.py index 4e454be..00a915f 100644 --- a/aw_client/client.py +++ b/aw_client/client.py @@ -395,6 +395,10 @@ def wait_for_start(self, timeout: int = 10) -> None: else: raise Exception(f"Server at {self.server_address} did not start in time") + def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool: + """Wait for all queued requests to be sent. See RequestQueue.wait_for_queue_empty.""" + return self.request_queue.wait_for_queue_empty(timeout=timeout) + QueuedRequest = namedtuple("QueuedRequest", ["endpoint", "data"]) Bucket = namedtuple("Bucket", ["id", "type"]) @@ -481,6 +485,26 @@ def wait(self, seconds) -> bool: def should_stop(self) -> bool: return self._stop_event.is_set() + def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool: + """ + Wait until the queue is empty, or until timeout (in seconds) is reached. + Returns instantly (True) if the queue thread isn't running. + + :param timeout: max time to wait, in seconds. Waits indefinitely if None. + :return: True if the queue became empty, False if the timeout was reached. + """ + if not self.is_alive(): + return True + + start_time = datetime.now() + while self._persistqueue.qsize() > 0 or self._current is not None: + if timeout is not None and (datetime.now() - start_time).total_seconds() >= timeout: + return False + if self.wait(0.1): + # stop() was called while waiting + return False + return True + def _dispatch_request(self) -> None: request = self._get_next() if not request: @@ -551,7 +575,10 @@ def add_request(self, endpoint: str, data: dict) -> None: """ assert "/heartbeat" in endpoint assert isinstance(data, dict) - self._persistqueue.put(QueuedRequest(endpoint, data)) + try: + self._persistqueue.put(QueuedRequest(endpoint, data)) + except OSError as e: + logger.warning(f"Failed to queue request, possibly due to insufficient disk space: {e}") def register_bucket(self, bucket_id: str, event_type: str) -> None: - self._registered_buckets.append(Bucket(bucket_id, event_type)) + self._registered_buckets.append(Bucket(bucket_id, event_type)) \ No newline at end of file diff --git a/tests/test_requestqueue.py b/tests/test_requestqueue.py index 0e28fff..1d5720f 100644 --- a/tests/test_requestqueue.py +++ b/tests/test_requestqueue.py @@ -60,3 +60,60 @@ def test_complex(): sleep(1) rq.stop() rq.join() + +def test_add_request_disk_full(): + """Ensures that add_request doesn't crash if the queue can't be written to disk""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + + def raise_oserror(*args, **kwargs): + raise OSError("No space left on device") + + rq._persistqueue.put = raise_oserror # type: ignore + + # Should not raise, the OSError should be caught internally and logged instead + rq.add_request("/api/0/buckets/test/heartbeat", {}) + +def test_wait_for_queue_empty_basic(): + """Queue empties normally while connected and running.""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + rq.start() + + rq.add_request("/api/0/buckets/test/heartbeat", {}) + result = rq.wait_for_queue_empty(timeout=5) + + rq.stop() + rq.join() + assert result is True + + +def test_wait_for_queue_empty_not_running(): + """Returns True immediately if the queue thread isn't running.""" + client = MockClient() + rq = RequestQueue(client) # type: ignore + # Thread never started, should return True instantly + result = rq.wait_for_queue_empty(timeout=5) + assert result is True + + +def test_wait_for_queue_empty_timeout(): + """Returns False if the queue doesn't empty before the timeout.""" + import unittest.mock as mock + + client = MockClient() + rq = RequestQueue(client) # type: ignore + + # Make _post block long enough that the queue won't empty before timeout + def slow_post(endpoint, data): + from time import sleep + sleep(10) + + with mock.patch.object(client, "_post", slow_post): + rq.start() + rq.add_request("/api/0/buckets/test/heartbeat", {}) + result = rq.wait_for_queue_empty(timeout=0.5) + rq.stop() + rq.join() + + assert result is False \ No newline at end of file