From 38dbc932377799d9b3adeebe2b09f1405c2d0513 Mon Sep 17 00:00:00 2001 From: Finbarr Brady Date: Tue, 3 Feb 2026 11:23:53 +0000 Subject: [PATCH] Add async support for WebexWebsocketClient with new run_async() and stop_async() methods. Update README.md with usage examples and key differences for async integration. Enhance error handling for running event loops and improve stop functionality. Add tests for async methods and event loop handling. --- README.md | 65 +++++++++ tests/test_webex_websocket_client.py | 76 +++++++++++ .../websockets/webex_websocket_client.py | 124 +++++++++++++++--- 3 files changed, 250 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 0454bef..0ed7aa5 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,60 @@ class EchoCallback(Command): and off you go! +## Async Support (FastAPI, aiohttp, etc.) + +If you're integrating the bot into an existing async application (like FastAPI), use the `run_async()` method instead of `run()`: + +### FastAPI Example + +```python +import os +import asyncio +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from webex_bot.webex_bot import WebexBot + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Create and start the bot on startup + bot = WebexBot( + teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN"), + bot_name="My FastAPI Bot" + ) + # Start the bot in a background task + bot_task = asyncio.create_task(bot.run_async()) + + yield # Application runs here + + # Stop the bot on shutdown + bot.stop() + bot_task.cancel() + try: + await bot_task + except asyncio.CancelledError: + pass + + +app = FastAPI(lifespan=lifespan) + + +@app.get("/health") +async def health_check(): + return {"status": "healthy"} +``` + +### Key differences + +| Use Case | Method | Description | +|----------|--------|-------------| +| Standalone script | `bot.run()` | Blocks and manages its own event loop | +| Async framework | `await bot.run_async()` | Integrates with existing event loop | +| Stop the bot | `bot.stop()` | Works from any thread/context | + +**Note:** Calling `bot.run()` when an event loop is already running will raise a `RuntimeError` with instructions to use `run_async()` instead. + # Help * If you are a Cisco employee, and find this useful, consider sending me a [Connected Recognition][8] (cec: `fibrady`) 🙂 @@ -436,6 +490,15 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN") * Fix handling websocket 404 to refresh device registrations. +### 1.3.0 (2026-Feb-03) + +* Add async support for integration with existing async applications (FastAPI, aiohttp, etc.) ([#68][i68]) +* New `run_async()` method for use in async contexts +* New `stop_async()` method for graceful async shutdown +* Improved `stop()` method that works reliably from any thread +* Better error message when `run()` is called with an existing event loop + + [1]: https://github.com/aaugustin/websockets @@ -484,3 +547,5 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN") [i20]: https://github.com/fbradyirl/webex_bot/issues/20 [i48]: https://github.com/fbradyirl/webex_bot/issues/48 + +[i68]: https://github.com/fbradyirl/webex_bot/issues/68 diff --git a/tests/test_webex_websocket_client.py b/tests/test_webex_websocket_client.py index f73d970..c8c3d05 100644 --- a/tests/test_webex_websocket_client.py +++ b/tests/test_webex_websocket_client.py @@ -1,13 +1,20 @@ +import asyncio +import pytest + from webex_bot.websockets.webex_websocket_client import ( WebexWebsocketClient, BACKOFF_EXCEPTIONS, InvalidStatus, + _get_running_loop_or_none, ) def _make_client(): client = WebexWebsocketClient.__new__(WebexWebsocketClient) client._get_headers = lambda: {"Authorization": "Bearer test"} + client._loop = None + client._stop_event = None + client.websocket = None return client @@ -53,3 +60,72 @@ def test_invalid_status_not_in_backoff_exceptions(): "InvalidStatus should not be in BACKOFF_EXCEPTIONS. " "404 errors need immediate device refresh, not backoff retries." ) + + +def test_get_running_loop_or_none_returns_none_when_no_loop(): + """ + _get_running_loop_or_none should return None when there's no running event loop. + """ + result = _get_running_loop_or_none() + assert result is None + + +def test_get_running_loop_or_none_returns_loop_when_running(): + """ + _get_running_loop_or_none should return the running loop when inside an async context. + """ + async def check_loop(): + result = _get_running_loop_or_none() + assert result is not None + assert result == asyncio.get_running_loop() + + asyncio.run(check_loop()) + + +def test_run_raises_error_when_loop_already_running(): + """ + run() should raise RuntimeError with helpful message when an event loop is already running. + """ + client = _make_client() + + async def test_in_async_context(): + with pytest.raises(RuntimeError) as exc_info: + client.run() + assert "event loop is already running" in str(exc_info.value) + assert "run_async()" in str(exc_info.value) + + asyncio.run(test_in_async_context()) + + +def test_stop_sets_stop_event(): + """ + stop() should set the stop event when it exists. + """ + client = _make_client() + client._stop_event = asyncio.Event() + assert not client._stop_event.is_set() + client.stop() + assert client._stop_event.is_set() + + +def test_stop_async_sets_stop_event(): + """ + stop_async() should set the stop event. + """ + async def check_stop_async(): + client = _make_client() + client._stop_event = asyncio.Event() + assert not client._stop_event.is_set() + await client.stop_async() + assert client._stop_event.is_set() + + asyncio.run(check_stop_async()) + + +def test_client_initializes_loop_and_stop_event_as_none(): + """ + Client should initialize _loop and _stop_event as None. + """ + client = _make_client() + assert client._loop is None + assert client._stop_event is None diff --git a/webex_bot/websockets/webex_websocket_client.py b/webex_bot/websockets/webex_websocket_client.py index 4b84f8f..db50b95 100644 --- a/webex_bot/websockets/webex_websocket_client.py +++ b/webex_bot/websockets/webex_websocket_client.py @@ -55,6 +55,14 @@ ) +def _get_running_loop_or_none(): + """Get the currently running event loop, or None if there isn't one.""" + try: + return asyncio.get_running_loop() + except RuntimeError: + return None + + class WebexWebsocketClient(object): def __init__(self, access_token, @@ -79,6 +87,9 @@ def __init__(self, self.proxies = proxies self.websocket = None self.share_id = None + # Event loop reference for cross-thread communication + self._loop = None + self._stop_event = None if self.proxies: self.session.proxies = proxies if self.proxies: @@ -197,7 +208,20 @@ def _ack_message(self, message_id): logger.debug(f"WebSocket ack message with id={message_id}") ack_message = {'type': 'ack', 'messageId': message_id} - asyncio.run(self.websocket.send(json.dumps(ack_message))) + # Use run_coroutine_threadsafe since this may be called from an executor thread + if self._loop is not None and self._loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + self.websocket.send(json.dumps(ack_message)), + self._loop + ) + # Wait for the send to complete (with timeout) + try: + future.result(timeout=10) + except Exception as e: + logger.warning(f"Failed to ack message {message_id}: {e}") + else: + # Fallback for edge cases where loop isn't set + asyncio.run(self.websocket.send(json.dumps(ack_message))) logger.info(f"WebSocket ack message with id={message_id}. Complete.") def _get_device_url(self): @@ -244,12 +268,41 @@ def _get_device_info(self, check_existing=True): return self.device_info def stop(self): - def terminate(): - raise SystemExit() + """ + Stop the websocket client gracefully. - asyncio.get_event_loop().create_task(terminate()) + Can be called from any thread. Sets a stop event that the run loop monitors. + """ + if self._stop_event is not None: + self._stop_event.set() + elif self._loop is not None and self._loop.is_running(): + # Schedule the stop event to be set from the event loop + self._loop.call_soon_threadsafe(self._stop_event.set if self._stop_event else lambda: None) + + async def stop_async(self): + """ + Stop the websocket client gracefully (async version). + """ + if self._stop_event is not None: + self._stop_event.set() + + async def run_async(self): + """ + Async entry point for running the websocket client. + + Use this method when integrating with an existing async application + (e.g., FastAPI, aiohttp, or any asyncio-based framework). + + Example usage with FastAPI: + @app.on_event("startup") + async def startup_event(): + bot = WebexBot(teams_bot_token="YOUR_TOKEN") + asyncio.create_task(bot.run_async()) + """ + # Store the event loop reference for cross-thread communication + self._loop = asyncio.get_running_loop() + self._stop_event = asyncio.Event() - def run(self): if self.device_info is None: if self._get_device_info() is None: logger.error('could not get/create device info') @@ -263,8 +316,7 @@ async def _websocket_recv(): logger.debug("WebSocket Received Message(raw): %s\n" % message) try: msg = json.loads(message) - loop = asyncio.get_event_loop() - loop.run_in_executor(None, self._process_incoming_websocket_message, msg) + self._loop.run_in_executor(None, self._process_incoming_websocket_message, msg) except Exception as messageProcessingException: logger.warning( f"An exception occurred while processing message. Ignoring. {messageProcessingException}") @@ -312,18 +364,25 @@ async def _connect_and_listen(): 'data': {'token': 'Bearer ' + self.access_token}} await self.websocket.send(json.dumps(msg)) - while True: - await _websocket_recv() + while not self._stop_event.is_set(): + try: + # Use wait_for with timeout to allow checking stop_event periodically + await asyncio.wait_for(_websocket_recv(), timeout=1.0) + except asyncio.TimeoutError: + # Timeout is expected, just continue to check stop_event + continue # Track the number of consecutive 404 errors to prevent infinite loops max_404_retries = 3 current_404_retries = 0 - while True: + while not self._stop_event.is_set(): try: - asyncio.get_event_loop().run_until_complete(_connect_and_listen()) - # If we get here, the connection was successful, so break out of the loop - break + await _connect_and_listen() + # If stop was requested, break out of the loop + if self._stop_event.is_set(): + logger.info("Stop requested, exiting run loop.") + break except InvalidStatus as e: status_code = getattr(e.response, "status_code", None) logger.error(f"WebSocket handshake to {ws_url} failed with status {status_code}") @@ -342,13 +401,18 @@ async def _connect_and_listen(): # Add a delay before retrying to avoid hammering the server logger.info(f"Waiting 5 seconds before retry attempt {current_404_retries}...") - asyncio.get_event_loop().run_until_complete(asyncio.sleep(5)) + await asyncio.sleep(5) else: # For non-404 errors, just raise the exception raise except Exception as runException: logger.error(f"runException: {runException}") + # Check if stop was requested + if self._stop_event.is_set(): + logger.info("Stop requested during exception handling, exiting.") + break + # Check if we can get device info if self._get_device_info(check_existing=False) is None: logger.error('could not create device info') @@ -359,4 +423,34 @@ async def _connect_and_listen(): # Wait a bit before reconnecting logger.info("Waiting 5 seconds before attempting to reconnect...") - asyncio.get_event_loop().run_until_complete(asyncio.sleep(5)) + await asyncio.sleep(5) + + logger.info("WebSocket client stopped.") + + def run(self): + """ + Synchronous entry point for running the websocket client. + + Use this method for standalone scripts that don't have an existing event loop. + For integration with async frameworks (FastAPI, aiohttp, etc.), use run_async() instead. + + Example usage: + bot = WebexBot(teams_bot_token="YOUR_TOKEN") + bot.run() # Blocks until stopped + """ + # Check if there's already a running event loop + running_loop = _get_running_loop_or_none() + if running_loop is not None: + raise RuntimeError( + "An event loop is already running. " + "Use 'await bot.run_async()' or 'asyncio.create_task(bot.run_async())' instead of 'bot.run()' " + "when integrating with async frameworks like FastAPI." + ) + + # No running loop, safe to use asyncio.run() + try: + asyncio.run(self.run_async()) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down.") + except SystemExit: + logger.info("System exit requested, shutting down.")