Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions aw_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ def wait_for_start(self, timeout: int = 10) -> None:
else:
raise Exception(f"Server at {self.server_address} did not start in time")

def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool:
"""Wait for all queued requests to be sent. See RequestQueue.wait_for_queue_empty."""
return self.request_queue.wait_for_queue_empty(timeout=timeout)


QueuedRequest = namedtuple("QueuedRequest", ["endpoint", "data"])
Bucket = namedtuple("Bucket", ["id", "type"])
Expand Down Expand Up @@ -481,6 +485,26 @@ def wait(self, seconds) -> bool:
def should_stop(self) -> bool:
return self._stop_event.is_set()

def wait_for_queue_empty(self, timeout: Optional[float] = None) -> bool:
"""
Wait until the queue is empty, or until timeout (in seconds) is reached.
Returns instantly (True) if the queue thread isn't running.

:param timeout: max time to wait, in seconds. Waits indefinitely if None.
:return: True if the queue became empty, False if the timeout was reached.
"""
if not self.is_alive():
return True

start_time = datetime.now()
while self._persistqueue.qsize() > 0 or self._current is not None:
if timeout is not None and (datetime.now() - start_time).total_seconds() >= timeout:
return False
if self.wait(0.1):
# stop() was called while waiting
return False
return True
Comment on lines +500 to +506

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Thread death leaves wait_for_queue_empty spinning indefinitely

is_alive() is checked once at entry, but not inside the polling loop. If the RequestQueue thread dies unexpectedly (e.g., an unhandled exception propagates out of run()'s inner loops without setting _stop_event), self.wait(0.1) will keep returning False on timeout every 100 ms, qsize() and _current will never change, and any call with timeout=None will spin forever. Adding if not self.is_alive(): return False as the first check inside the loop would break out cleanly in that scenario.


def _dispatch_request(self) -> None:
request = self._get_next()
if not request:
Expand Down Expand Up @@ -551,7 +575,10 @@ def add_request(self, endpoint: str, data: dict) -> None:
"""
assert "/heartbeat" in endpoint
assert isinstance(data, dict)
self._persistqueue.put(QueuedRequest(endpoint, data))
try:
self._persistqueue.put(QueuedRequest(endpoint, data))
except OSError as e:
logger.warning(f"Failed to queue request, possibly due to insufficient disk space: {e}")

def register_bucket(self, bucket_id: str, event_type: str) -> None:
self._registered_buckets.append(Bucket(bucket_id, event_type))
self._registered_buckets.append(Bucket(bucket_id, event_type))
57 changes: 57 additions & 0 deletions tests/test_requestqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,60 @@ def test_complex():
sleep(1)
rq.stop()
rq.join()

def test_add_request_disk_full():
"""Ensures that add_request doesn't crash if the queue can't be written to disk"""
client = MockClient()
rq = RequestQueue(client) # type: ignore

def raise_oserror(*args, **kwargs):
raise OSError("No space left on device")

rq._persistqueue.put = raise_oserror # type: ignore

# Should not raise, the OSError should be caught internally and logged instead
rq.add_request("/api/0/buckets/test/heartbeat", {})

def test_wait_for_queue_empty_basic():
"""Queue empties normally while connected and running."""
client = MockClient()
rq = RequestQueue(client) # type: ignore
rq.start()

rq.add_request("/api/0/buckets/test/heartbeat", {})
result = rq.wait_for_queue_empty(timeout=5)

rq.stop()
rq.join()
assert result is True


def test_wait_for_queue_empty_not_running():
"""Returns True immediately if the queue thread isn't running."""
client = MockClient()
rq = RequestQueue(client) # type: ignore
# Thread never started, should return True instantly
result = rq.wait_for_queue_empty(timeout=5)
assert result is True


def test_wait_for_queue_empty_timeout():
"""Returns False if the queue doesn't empty before the timeout."""
import unittest.mock as mock

client = MockClient()
rq = RequestQueue(client) # type: ignore

# Make _post block long enough that the queue won't empty before timeout
def slow_post(endpoint, data):
from time import sleep
sleep(10)

with mock.patch.object(client, "_post", slow_post):
rq.start()
rq.add_request("/api/0/buckets/test/heartbeat", {})
result = rq.wait_for_queue_empty(timeout=0.5)
rq.stop()
rq.join()
Comment on lines +112 to +117

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 rq.join() blocks for ~10 seconds in the timeout test

slow_post sleeps for 10 seconds unconditionally. After wait_for_queue_empty(timeout=0.5) returns, rq.stop() is called but the queue thread is already inside slow_post with no way to interrupt it. rq.join() then has to wait the remaining ~9.5 seconds before the thread exits. The test effectively adds ~10 s to the suite on every run. Using a much shorter sleep (e.g. 0.5 s) or a threading.Event that can be signaled by stop() would keep the test fast while still exercising the timeout path reliably.


assert result is False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing newline at end of file. Both changed files (client.py and test_requestqueue.py) are missing a trailing newline, which can cause noisy diffs and violates POSIX text file conventions.

Suggested change
assert result is False
assert result is False

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!