From 285e38957c8abb8b6a91dbca95d8a94172453b20 Mon Sep 17 00:00:00 2001 From: "opencode-agent[bot]" Date: Mon, 11 May 2026 04:56:19 +0000 Subject: [PATCH] Fix Python SDK timer: use async loop Co-authored-by: Divkix --- sdks/python/src/logwell/queue.py | 97 +++++++++++++++++++++------- sdks/python/tests/unit/test_queue.py | 20 +++--- 2 files changed, 85 insertions(+), 32 deletions(-) diff --git a/sdks/python/src/logwell/queue.py b/sdks/python/src/logwell/queue.py index 4c11171..cb42b91 100644 --- a/sdks/python/src/logwell/queue.py +++ b/sdks/python/src/logwell/queue.py @@ -15,6 +15,9 @@ from logwell.types import IngestResponse, LogEntry if TYPE_CHECKING: + from concurrent.futures import Future as ConcurrentFuture + from typing import Any + from logwell.types import LogwellConfig # Type alias for the send batch callback @@ -88,9 +91,11 @@ def __init__( self._send_batch = send_batch self._queue: list[LogEntry] = [] self._lock = threading.Lock() - self._timer: threading.Timer | None = None + self._timer_future: ConcurrentFuture[Any] | None = None self._flushing = False self._stopped = False + self._queue_loop: asyncio.AbstractEventLoop | None = None + self._queue_thread: threading.Thread | None = None @property def size(self) -> int: @@ -131,7 +136,8 @@ def add(self, entry: LogEntry) -> None: self._queue.append(entry) # Start timer on first entry - if self._timer is None and not self._stopped: + timer_future = getattr(self, '_timer_future', None) + if (timer_future is None or timer_future.done()) and not self._stopped: self._start_timer() # Flush immediately if batch size reached @@ -146,12 +152,8 @@ def _trigger_flush(self) -> None: This method schedules the flush to run in the background without blocking the caller. """ - try: - loop = asyncio.get_running_loop() - loop.create_task(self.flush()) - except RuntimeError: - # No running event loop, run in new loop - asyncio.run(self.flush()) + loop = self._ensure_loop() + asyncio.run_coroutine_threadsafe(self._do_flush(), loop) async def flush(self) -> IngestResponse | None: """Flush all queued logs immediately. @@ -159,6 +161,19 @@ async def flush(self) -> IngestResponse | None: Returns: Response from the server, or None if queue was empty or flush in progress """ + if threading.current_thread() is self._queue_thread: + return await self._do_flush() + + loop = self._ensure_loop() + future = asyncio.run_coroutine_threadsafe(self._do_flush(), loop) + try: + asyncio.get_running_loop() + return await asyncio.wrap_future(future) + except RuntimeError: + return future.result() + + async def _do_flush(self) -> IngestResponse | None: + """Internal flush implementation.""" with self._lock: # Prevent concurrent flushes if self._flushing or len(self._queue) == 0: @@ -217,33 +232,67 @@ async def shutdown(self) -> None: if self.size > 0: await self.flush() + self._stop_loop() + + def _ensure_loop(self) -> asyncio.AbstractEventLoop: + """Ensure a background event loop is running.""" + if self._queue_loop is None or self._queue_loop.is_closed(): + self._queue_loop = asyncio.new_event_loop() + self._queue_thread = threading.Thread(target=self._run_loop, daemon=True) + self._queue_thread.start() + return self._queue_loop + + def _run_loop(self) -> None: + """Run the background event loop.""" + assert self._queue_loop is not None + asyncio.set_event_loop(self._queue_loop) + self._queue_loop.run_forever() + + def _stop_loop(self) -> None: + """Stop the background event loop and thread.""" + if self._queue_loop is None or self._queue_thread is None: + return + + if self._queue_loop.is_running() and not self._queue_loop.is_closed(): + self._queue_loop.call_soon_threadsafe(self._queue_loop.stop) + + if threading.current_thread() is not self._queue_thread: + self._queue_thread.join(timeout=5.0) + + self._queue_loop = None + self._queue_thread = None + def _start_timer(self) -> None: """Start the flush timer. Note: Must be called while holding the lock. """ self._stop_timer() - self._timer = threading.Timer( - self._config.flush_interval, - self._on_timer_expired, - ) - self._timer.daemon = True - self._timer.start() + loop = self._ensure_loop() + self._timer_future = asyncio.run_coroutine_threadsafe(self._timer_coro(), loop) def _stop_timer(self) -> None: """Stop the flush timer. Note: Must be called while holding the lock. """ - if self._timer is not None: - self._timer.cancel() - self._timer = None + future = getattr(self, '_timer_future', None) + if future is not None: + if not future.done(): + future.cancel() + self._timer_future = None - def _on_timer_expired(self) -> None: + async def _timer_coro(self) -> None: """Handle timer expiration by triggering a flush.""" - with self._lock: - self._timer = None - if self._stopped: - return - - self._trigger_flush() + my_future = getattr(self, '_timer_future', None) + try: + await asyncio.sleep(self._config.flush_interval) + with self._lock: + if self._stopped: + return + self._trigger_flush() + except asyncio.CancelledError: + pass + finally: + if getattr(self, '_timer_future', None) is my_future: + self._timer_future = None diff --git a/sdks/python/tests/unit/test_queue.py b/sdks/python/tests/unit/test_queue.py index 92fd37c..73716e7 100644 --- a/sdks/python/tests/unit/test_queue.py +++ b/sdks/python/tests/unit/test_queue.py @@ -358,15 +358,16 @@ async def test_flush_calls_on_error_callback_on_failure(self) -> None: async def test_concurrent_flush_prevented(self) -> None: """Concurrent flush() calls are prevented.""" call_count = 0 - flush_started = asyncio.Event() - flush_continue = asyncio.Event() + flush_started = threading.Event() + flush_continue = threading.Event() async def slow_send(batch: list[LogEntry]) -> IngestResponse: nonlocal call_count call_count += 1 flush_started.set() # Wait until test signals to continue - await flush_continue.wait() + while not flush_continue.is_set(): + await asyncio.sleep(0.01) return {"accepted": len(batch)} mock = MagicMock(side_effect=slow_send) @@ -379,7 +380,8 @@ async def slow_send(batch: list[LogEntry]) -> IngestResponse: task1 = asyncio.create_task(queue.flush()) # Wait for first flush to start - await flush_started.wait() + while not flush_started.is_set(): + await asyncio.sleep(0.01) # Try second flush while first is in progress result2 = await queue.flush() @@ -827,14 +829,15 @@ async def failing_then_success(batch: list[LogEntry]) -> IngestResponse: @pytest.mark.asyncio async def test_entries_added_during_flush_are_preserved(self) -> None: """Entries added during flush are not lost.""" - flush_started = asyncio.Event() - flush_continue = asyncio.Event() + flush_started = threading.Event() + flush_continue = threading.Event() captured_batches: list[list[LogEntry]] = [] async def slow_send(batch: list[LogEntry]) -> IngestResponse: captured_batches.append(batch) flush_started.set() - await flush_continue.wait() + while not flush_continue.is_set(): + await asyncio.sleep(0.01) return {"accepted": len(batch)} mock = MagicMock(side_effect=slow_send) @@ -846,7 +849,8 @@ async def slow_send(batch: list[LogEntry]) -> IngestResponse: flush_task = asyncio.create_task(queue.flush()) # Wait for flush to start - await flush_started.wait() + while not flush_started.is_set(): + await asyncio.sleep(0.01) # Add during flush queue.add(make_log_entry("during"))