Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 73 additions & 24 deletions sdks/python/src/logwell/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from logwell.types import IngestResponse, LogEntry

if TYPE_CHECKING:
from concurrent.futures import Future as ConcurrentFuture
from typing import Any

from logwell.types import LogwellConfig

# Type alias for the send batch callback
Expand Down Expand Up @@ -88,9 +91,11 @@ def __init__(
self._send_batch = send_batch
self._queue: list[LogEntry] = []
self._lock = threading.Lock()
self._timer: threading.Timer | None = None
self._timer_future: ConcurrentFuture[Any] | None = None
self._flushing = False
self._stopped = False
self._queue_loop: asyncio.AbstractEventLoop | None = None
self._queue_thread: threading.Thread | None = None

@property
def size(self) -> int:
Expand Down Expand Up @@ -131,7 +136,8 @@ def add(self, entry: LogEntry) -> None:
self._queue.append(entry)

# Start timer on first entry
if self._timer is None and not self._stopped:
timer_future = getattr(self, '_timer_future', None)
if (timer_future is None or timer_future.done()) and not self._stopped:
self._start_timer()

# Flush immediately if batch size reached
Expand All @@ -146,19 +152,28 @@ def _trigger_flush(self) -> None:
This method schedules the flush to run in the background
without blocking the caller.
"""
try:
loop = asyncio.get_running_loop()
loop.create_task(self.flush())
except RuntimeError:
# No running event loop, run in new loop
asyncio.run(self.flush())
loop = self._ensure_loop()
asyncio.run_coroutine_threadsafe(self._do_flush(), loop)

async def flush(self) -> IngestResponse | None:
"""Flush all queued logs immediately.

Returns:
Response from the server, or None if queue was empty or flush in progress
"""
if threading.current_thread() is self._queue_thread:
return await self._do_flush()

loop = self._ensure_loop()
future = asyncio.run_coroutine_threadsafe(self._do_flush(), loop)
try:
asyncio.get_running_loop()
return await asyncio.wrap_future(future)
except RuntimeError:
return future.result()

async def _do_flush(self) -> IngestResponse | None:
"""Internal flush implementation."""
with self._lock:
# Prevent concurrent flushes
if self._flushing or len(self._queue) == 0:
Expand Down Expand Up @@ -217,33 +232,67 @@ async def shutdown(self) -> None:
if self.size > 0:
await self.flush()

self._stop_loop()

def _ensure_loop(self) -> asyncio.AbstractEventLoop:
"""Ensure a background event loop is running."""
if self._queue_loop is None or self._queue_loop.is_closed():
self._queue_loop = asyncio.new_event_loop()
self._queue_thread = threading.Thread(target=self._run_loop, daemon=True)
self._queue_thread.start()
return self._queue_loop

def _run_loop(self) -> None:
"""Run the background event loop."""
assert self._queue_loop is not None
asyncio.set_event_loop(self._queue_loop)
self._queue_loop.run_forever()

def _stop_loop(self) -> None:
"""Stop the background event loop and thread."""
if self._queue_loop is None or self._queue_thread is None:
return

if self._queue_loop.is_running() and not self._queue_loop.is_closed():
self._queue_loop.call_soon_threadsafe(self._queue_loop.stop)

if threading.current_thread() is not self._queue_thread:
self._queue_thread.join(timeout=5.0)

self._queue_loop = None
self._queue_thread = None

def _start_timer(self) -> None:
"""Start the flush timer.

Note: Must be called while holding the lock.
"""
self._stop_timer()
self._timer = threading.Timer(
self._config.flush_interval,
self._on_timer_expired,
)
self._timer.daemon = True
self._timer.start()
loop = self._ensure_loop()
self._timer_future = asyncio.run_coroutine_threadsafe(self._timer_coro(), loop)

def _stop_timer(self) -> None:
"""Stop the flush timer.

Note: Must be called while holding the lock.
"""
if self._timer is not None:
self._timer.cancel()
self._timer = None
future = getattr(self, '_timer_future', None)
if future is not None:
if not future.done():
future.cancel()
self._timer_future = None

def _on_timer_expired(self) -> None:
async def _timer_coro(self) -> None:
"""Handle timer expiration by triggering a flush."""
with self._lock:
self._timer = None
if self._stopped:
return

self._trigger_flush()
my_future = getattr(self, '_timer_future', None)
try:
await asyncio.sleep(self._config.flush_interval)
with self._lock:
if self._stopped:
return
self._trigger_flush()
except asyncio.CancelledError:
pass
finally:
if getattr(self, '_timer_future', None) is my_future:
self._timer_future = None
20 changes: 12 additions & 8 deletions sdks/python/tests/unit/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,16 @@ async def test_flush_calls_on_error_callback_on_failure(self) -> None:
async def test_concurrent_flush_prevented(self) -> None:
"""Concurrent flush() calls are prevented."""
call_count = 0
flush_started = asyncio.Event()
flush_continue = asyncio.Event()
flush_started = threading.Event()
flush_continue = threading.Event()

async def slow_send(batch: list[LogEntry]) -> IngestResponse:
nonlocal call_count
call_count += 1
flush_started.set()
# Wait until test signals to continue
await flush_continue.wait()
while not flush_continue.is_set():
await asyncio.sleep(0.01)
return {"accepted": len(batch)}

mock = MagicMock(side_effect=slow_send)
Expand All @@ -379,7 +380,8 @@ async def slow_send(batch: list[LogEntry]) -> IngestResponse:
task1 = asyncio.create_task(queue.flush())

# Wait for first flush to start
await flush_started.wait()
while not flush_started.is_set():
await asyncio.sleep(0.01)

# Try second flush while first is in progress
result2 = await queue.flush()
Expand Down Expand Up @@ -827,14 +829,15 @@ async def failing_then_success(batch: list[LogEntry]) -> IngestResponse:
@pytest.mark.asyncio
async def test_entries_added_during_flush_are_preserved(self) -> None:
"""Entries added during flush are not lost."""
flush_started = asyncio.Event()
flush_continue = asyncio.Event()
flush_started = threading.Event()
flush_continue = threading.Event()
captured_batches: list[list[LogEntry]] = []

async def slow_send(batch: list[LogEntry]) -> IngestResponse:
captured_batches.append(batch)
flush_started.set()
await flush_continue.wait()
while not flush_continue.is_set():
await asyncio.sleep(0.01)
return {"accepted": len(batch)}

mock = MagicMock(side_effect=slow_send)
Expand All @@ -846,7 +849,8 @@ async def slow_send(batch: list[LogEntry]) -> IngestResponse:
flush_task = asyncio.create_task(queue.flush())

# Wait for flush to start
await flush_started.wait()
while not flush_started.is_set():
await asyncio.sleep(0.01)

# Add during flush
queue.add(make_log_entry("during"))
Expand Down
Loading