From 8946fb398190bd6de6e610c4df4955c37c9bb752 Mon Sep 17 00:00:00 2001 From: TimeToBuildBob Date: Sun, 7 Jun 2026 17:40:28 +0000 Subject: [PATCH 1/3] fix(queue): warn before connect and create buckets immediately --- aw_client/client.py | 27 ++++++++++++++++++++++++++- tests/test_client.py | 23 +++++++++++++++++++++++ tests/test_requestqueue.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/aw_client/client.py b/aw_client/client.py index 4e454be..7c442b3 100644 --- a/aw_client/client.py +++ b/aw_client/client.py @@ -4,6 +4,7 @@ import os import socket import threading +import warnings from collections import namedtuple from datetime import datetime from time import sleep @@ -100,6 +101,7 @@ def __init__( self.request_queue = RequestQueue(self) # Dict of each last heartbeat in each bucket self.last_heartbeat = {} # type: Dict[str, Event] + self._warned_queue_before_connect = False # # Get/Post base requests @@ -243,6 +245,7 @@ def heartbeat( _commit_interval = commit_interval or self.commit_interval if queued: + self._warn_queue_before_connect() # Pre-merge heartbeats if bucket_id not in self.last_heartbeat: self.last_heartbeat[bucket_id] = event @@ -278,6 +281,7 @@ def get_buckets(self) -> dict: def create_bucket(self, bucket_id: str, event_type: str, queued=False): if queued: + self._warn_queue_before_connect() self.request_queue.register_bucket(bucket_id, event_type) else: endpoint = f"buckets/{bucket_id}" @@ -395,6 +399,18 @@ def wait_for_start(self, timeout: int = 10) -> None: else: raise Exception(f"Server at {self.server_address} did not start in time") + def _warn_queue_before_connect(self) -> None: + if self._warned_queue_before_connect or self.request_queue.is_alive(): + return + + warnings.warn( + "Queued requests require calling connect() or using `with client:` " + "before buckets can be created and queued events can flush.", + UserWarning, + stacklevel=3, + ) + self._warned_queue_before_connect = True + QueuedRequest = namedtuple("QueuedRequest", ["endpoint", "data"]) Bucket = namedtuple("Bucket", ["id", "type"]) @@ -554,4 +570,13 @@ def add_request(self, endpoint: str, data: dict) -> None: self._persistqueue.put(QueuedRequest(endpoint, data)) def register_bucket(self, bucket_id: str, event_type: str) -> None: - self._registered_buckets.append(Bucket(bucket_id, event_type)) + bucket = Bucket(bucket_id, event_type) + self._registered_buckets.append(bucket) + + if not self.connected: + return + + try: + self.client.create_bucket(bucket_id, event_type) + except req.RequestException: + self.connected = False diff --git a/tests/test_client.py b/tests/test_client.py index cdc6e6a..026fb26 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import time +import warnings from random import random from datetime import datetime, timedelta, timezone from requests.exceptions import HTTPError @@ -106,3 +107,25 @@ def test_full(): # Delete bucket client.delete_bucket(bucket_name) + + +def test_queued_usage_warns_once_before_connect(): + client = ActivityWatchClient(f"aw-test-client-{random()}", testing=True) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + client.create_bucket("test-bucket", "test", queued=True) + client.heartbeat( + "test-bucket", + create_unique_event(), + pulsetime=1, + queued=True, + ) + + queue_warnings = [ + warning + for warning in caught + if "connect()" in str(warning.message) + or "with client:" in str(warning.message) + ] + assert len(queue_warnings) == 1 diff --git a/tests/test_requestqueue.py b/tests/test_requestqueue.py index 0e28fff..836b69c 100644 --- a/tests/test_requestqueue.py +++ b/tests/test_requestqueue.py @@ -21,12 +21,14 @@ class MockClient: def __init__(self): self.testing = True + self.create_bucket_calls = [] def get_buckets(self, *args, **kwargs): print("Called get_buckets") return [{"id": "test", "name": "Test"}] def create_bucket(self, *args, **kwargs): + self.create_bucket_calls.append((args, kwargs)) print("Called create_bucket") def _post(self, *args, **kwargs): @@ -60,3 +62,29 @@ def test_complex(): sleep(1) rq.stop() rq.join() + + +def test_register_bucket_creates_immediately_when_connected(): + client = MockClient() + rq = RequestQueue(client) # type: ignore + rq.connected = True + + rq.register_bucket("test-bucket", "test-type") + + assert client.create_bucket_calls == [(("test-bucket", "test-type"), {})] + + +def test_register_bucket_marks_queue_disconnected_on_create_failure(): + class FailingClient(MockClient): + def create_bucket(self, *args, **kwargs): + super().create_bucket(*args, **kwargs) + raise requests.exceptions.ConnectionError() + + client = FailingClient() + rq = RequestQueue(client) # type: ignore + rq.connected = True + + rq.register_bucket("test-bucket", "test-type") + + assert rq.connected is False + assert client.create_bucket_calls == [(("test-bucket", "test-type"), {})] From 91836346616048a5c380ee4afff758008b118fd0 Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 7 Jun 2026 17:50:30 +0000 Subject: [PATCH 2/3] fix(tests): satisfy black on queued warning test --- tests/test_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 026fb26..8136a37 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -125,7 +125,6 @@ def test_queued_usage_warns_once_before_connect(): queue_warnings = [ warning for warning in caught - if "connect()" in str(warning.message) - or "with client:" in str(warning.message) + if "connect()" in str(warning.message) or "with client:" in str(warning.message) ] assert len(queue_warnings) == 1 From 3cc9e12fb18059731c5bf2d891e3e0ca2591dc75 Mon Sep 17 00:00:00 2001 From: TimeToBuildBob Date: Mon, 29 Jun 2026 19:45:06 +0000 Subject: [PATCH 3/3] fix(queue): reset warn flag in disconnect() so warning fires after reconnect --- aw_client/client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aw_client/client.py b/aw_client/client.py index 7c442b3..de68a74 100644 --- a/aw_client/client.py +++ b/aw_client/client.py @@ -384,6 +384,8 @@ def disconnect(self): # Throw away old thread object, create new one since same thread cannot be started twice self.request_queue = RequestQueue(self) + # Reset so warn-before-connect fires again if user calls queued ops before reconnecting + self._warned_queue_before_connect = False def wait_for_start(self, timeout: int = 10) -> None: """Wait for the server to start by trying to get the server info."""