diff --git a/README.md b/README.md index 0454bef..0ed7aa5 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,60 @@ class EchoCallback(Command): and off you go! +## Async Support (FastAPI, aiohttp, etc.) + +If you're integrating the bot into an existing async application (like FastAPI), use the `run_async()` method instead of `run()`: + +### FastAPI Example + +```python +import os +import asyncio +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from webex_bot.webex_bot import WebexBot + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Create and start the bot on startup + bot = WebexBot( + teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN"), + bot_name="My FastAPI Bot" + ) + # Start the bot in a background task + bot_task = asyncio.create_task(bot.run_async()) + + yield # Application runs here + + # Stop the bot on shutdown + bot.stop() + bot_task.cancel() + try: + await bot_task + except asyncio.CancelledError: + pass + + +app = FastAPI(lifespan=lifespan) + + +@app.get("/health") +async def health_check(): + return {"status": "healthy"} +``` + +### Key differences + +| Use Case | Method | Description | +|----------|--------|-------------| +| Standalone script | `bot.run()` | Blocks and manages its own event loop | +| Async framework | `await bot.run_async()` | Integrates with existing event loop | +| Stop the bot | `bot.stop()` | Works from any thread/context | + +**Note:** Calling `bot.run()` when an event loop is already running will raise a `RuntimeError` with instructions to use `run_async()` instead. + # Help * If you are a Cisco employee, and find this useful, consider sending me a [Connected Recognition][8] (cec: `fibrady`) 🙂 @@ -436,6 +490,15 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN") * Fix handling websocket 404 to refresh device registrations. +### 1.3.0 (2026-Feb-03) + +* Add async support for integration with existing async applications (FastAPI, aiohttp, etc.) ([#68][i68]) +* New `run_async()` method for use in async contexts +* New `stop_async()` method for graceful async shutdown +* Improved `stop()` method that works reliably from any thread +* Better error message when `run()` is called with an existing event loop + + [1]: https://github.com/aaugustin/websockets @@ -484,3 +547,5 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN") [i20]: https://github.com/fbradyirl/webex_bot/issues/20 [i48]: https://github.com/fbradyirl/webex_bot/issues/48 + +[i68]: https://github.com/fbradyirl/webex_bot/issues/68 diff --git a/tests/test_webex_websocket_client.py b/tests/test_webex_websocket_client.py index f73d970..c8c3d05 100644 --- a/tests/test_webex_websocket_client.py +++ b/tests/test_webex_websocket_client.py @@ -1,13 +1,20 @@ +import asyncio +import pytest + from webex_bot.websockets.webex_websocket_client import ( WebexWebsocketClient, BACKOFF_EXCEPTIONS, InvalidStatus, + _get_running_loop_or_none, ) def _make_client(): client = WebexWebsocketClient.__new__(WebexWebsocketClient) client._get_headers = lambda: {"Authorization": "Bearer test"} + client._loop = None + client._stop_event = None + client.websocket = None return client @@ -53,3 +60,72 @@ def test_invalid_status_not_in_backoff_exceptions(): "InvalidStatus should not be in BACKOFF_EXCEPTIONS. " "404 errors need immediate device refresh, not backoff retries." ) + + +def test_get_running_loop_or_none_returns_none_when_no_loop(): + """ + _get_running_loop_or_none should return None when there's no running event loop. + """ + result = _get_running_loop_or_none() + assert result is None + + +def test_get_running_loop_or_none_returns_loop_when_running(): + """ + _get_running_loop_or_none should return the running loop when inside an async context. + """ + async def check_loop(): + result = _get_running_loop_or_none() + assert result is not None + assert result == asyncio.get_running_loop() + + asyncio.run(check_loop()) + + +def test_run_raises_error_when_loop_already_running(): + """ + run() should raise RuntimeError with helpful message when an event loop is already running. + """ + client = _make_client() + + async def test_in_async_context(): + with pytest.raises(RuntimeError) as exc_info: + client.run() + assert "event loop is already running" in str(exc_info.value) + assert "run_async()" in str(exc_info.value) + + asyncio.run(test_in_async_context()) + + +def test_stop_sets_stop_event(): + """ + stop() should set the stop event when it exists. + """ + client = _make_client() + client._stop_event = asyncio.Event() + assert not client._stop_event.is_set() + client.stop() + assert client._stop_event.is_set() + + +def test_stop_async_sets_stop_event(): + """ + stop_async() should set the stop event. + """ + async def check_stop_async(): + client = _make_client() + client._stop_event = asyncio.Event() + assert not client._stop_event.is_set() + await client.stop_async() + assert client._stop_event.is_set() + + asyncio.run(check_stop_async()) + + +def test_client_initializes_loop_and_stop_event_as_none(): + """ + Client should initialize _loop and _stop_event as None. + """ + client = _make_client() + assert client._loop is None + assert client._stop_event is None diff --git a/webex_bot/websockets/webex_websocket_client.py b/webex_bot/websockets/webex_websocket_client.py index 4b84f8f..db50b95 100644 --- a/webex_bot/websockets/webex_websocket_client.py +++ b/webex_bot/websockets/webex_websocket_client.py @@ -55,6 +55,14 @@ ) +def _get_running_loop_or_none(): + """Get the currently running event loop, or None if there isn't one.""" + try: + return asyncio.get_running_loop() + except RuntimeError: + return None + + class WebexWebsocketClient(object): def __init__(self, access_token, @@ -79,6 +87,9 @@ def __init__(self, self.proxies = proxies self.websocket = None self.share_id = None + # Event loop reference for cross-thread communication + self._loop = None + self._stop_event = None if self.proxies: self.session.proxies = proxies if self.proxies: @@ -197,7 +208,20 @@ def _ack_message(self, message_id): logger.debug(f"WebSocket ack message with id={message_id}") ack_message = {'type': 'ack', 'messageId': message_id} - asyncio.run(self.websocket.send(json.dumps(ack_message))) + # Use run_coroutine_threadsafe since this may be called from an executor thread + if self._loop is not None and self._loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + self.websocket.send(json.dumps(ack_message)), + self._loop + ) + # Wait for the send to complete (with timeout) + try: + future.result(timeout=10) + except Exception as e: + logger.warning(f"Failed to ack message {message_id}: {e}") + else: + # Fallback for edge cases where loop isn't set + asyncio.run(self.websocket.send(json.dumps(ack_message))) logger.info(f"WebSocket ack message with id={message_id}. Complete.") def _get_device_url(self): @@ -244,12 +268,41 @@ def _get_device_info(self, check_existing=True): return self.device_info def stop(self): - def terminate(): - raise SystemExit() + """ + Stop the websocket client gracefully. - asyncio.get_event_loop().create_task(terminate()) + Can be called from any thread. Sets a stop event that the run loop monitors. + """ + if self._stop_event is not None: + self._stop_event.set() + elif self._loop is not None and self._loop.is_running(): + # Schedule the stop event to be set from the event loop + self._loop.call_soon_threadsafe(self._stop_event.set if self._stop_event else lambda: None) + + async def stop_async(self): + """ + Stop the websocket client gracefully (async version). + """ + if self._stop_event is not None: + self._stop_event.set() + + async def run_async(self): + """ + Async entry point for running the websocket client. + + Use this method when integrating with an existing async application + (e.g., FastAPI, aiohttp, or any asyncio-based framework). + + Example usage with FastAPI: + @app.on_event("startup") + async def startup_event(): + bot = WebexBot(teams_bot_token="YOUR_TOKEN") + asyncio.create_task(bot.run_async()) + """ + # Store the event loop reference for cross-thread communication + self._loop = asyncio.get_running_loop() + self._stop_event = asyncio.Event() - def run(self): if self.device_info is None: if self._get_device_info() is None: logger.error('could not get/create device info') @@ -263,8 +316,7 @@ async def _websocket_recv(): logger.debug("WebSocket Received Message(raw): %s\n" % message) try: msg = json.loads(message) - loop = asyncio.get_event_loop() - loop.run_in_executor(None, self._process_incoming_websocket_message, msg) + self._loop.run_in_executor(None, self._process_incoming_websocket_message, msg) except Exception as messageProcessingException: logger.warning( f"An exception occurred while processing message. Ignoring. {messageProcessingException}") @@ -312,18 +364,25 @@ async def _connect_and_listen(): 'data': {'token': 'Bearer ' + self.access_token}} await self.websocket.send(json.dumps(msg)) - while True: - await _websocket_recv() + while not self._stop_event.is_set(): + try: + # Use wait_for with timeout to allow checking stop_event periodically + await asyncio.wait_for(_websocket_recv(), timeout=1.0) + except asyncio.TimeoutError: + # Timeout is expected, just continue to check stop_event + continue # Track the number of consecutive 404 errors to prevent infinite loops max_404_retries = 3 current_404_retries = 0 - while True: + while not self._stop_event.is_set(): try: - asyncio.get_event_loop().run_until_complete(_connect_and_listen()) - # If we get here, the connection was successful, so break out of the loop - break + await _connect_and_listen() + # If stop was requested, break out of the loop + if self._stop_event.is_set(): + logger.info("Stop requested, exiting run loop.") + break except InvalidStatus as e: status_code = getattr(e.response, "status_code", None) logger.error(f"WebSocket handshake to {ws_url} failed with status {status_code}") @@ -342,13 +401,18 @@ async def _connect_and_listen(): # Add a delay before retrying to avoid hammering the server logger.info(f"Waiting 5 seconds before retry attempt {current_404_retries}...") - asyncio.get_event_loop().run_until_complete(asyncio.sleep(5)) + await asyncio.sleep(5) else: # For non-404 errors, just raise the exception raise except Exception as runException: logger.error(f"runException: {runException}") + # Check if stop was requested + if self._stop_event.is_set(): + logger.info("Stop requested during exception handling, exiting.") + break + # Check if we can get device info if self._get_device_info(check_existing=False) is None: logger.error('could not create device info') @@ -359,4 +423,34 @@ async def _connect_and_listen(): # Wait a bit before reconnecting logger.info("Waiting 5 seconds before attempting to reconnect...") - asyncio.get_event_loop().run_until_complete(asyncio.sleep(5)) + await asyncio.sleep(5) + + logger.info("WebSocket client stopped.") + + def run(self): + """ + Synchronous entry point for running the websocket client. + + Use this method for standalone scripts that don't have an existing event loop. + For integration with async frameworks (FastAPI, aiohttp, etc.), use run_async() instead. + + Example usage: + bot = WebexBot(teams_bot_token="YOUR_TOKEN") + bot.run() # Blocks until stopped + """ + # Check if there's already a running event loop + running_loop = _get_running_loop_or_none() + if running_loop is not None: + raise RuntimeError( + "An event loop is already running. " + "Use 'await bot.run_async()' or 'asyncio.create_task(bot.run_async())' instead of 'bot.run()' " + "when integrating with async frameworks like FastAPI." + ) + + # No running loop, safe to use asyncio.run() + try: + asyncio.run(self.run_async()) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down.") + except SystemExit: + logger.info("System exit requested, shutting down.")