Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,60 @@ class EchoCallback(Command):

and off you go!

## Async Support (FastAPI, aiohttp, etc.)

If you're integrating the bot into an existing async application (like FastAPI), use the `run_async()` method instead of `run()`:

### FastAPI Example

```python
import os
import asyncio
from contextlib import asynccontextmanager

from fastapi import FastAPI
from webex_bot.webex_bot import WebexBot


@asynccontextmanager
async def lifespan(app: FastAPI):
# Create and start the bot on startup
bot = WebexBot(
teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN"),
bot_name="My FastAPI Bot"
)
# Start the bot in a background task
bot_task = asyncio.create_task(bot.run_async())

yield # Application runs here

# Stop the bot on shutdown
bot.stop()
bot_task.cancel()
try:
await bot_task
except asyncio.CancelledError:
pass


app = FastAPI(lifespan=lifespan)


@app.get("/health")
async def health_check():
return {"status": "healthy"}
```

### Key differences

| Use Case | Method | Description |
|----------|--------|-------------|
| Standalone script | `bot.run()` | Blocks and manages its own event loop |
| Async framework | `await bot.run_async()` | Integrates with existing event loop |
| Stop the bot | `bot.stop()` | Works from any thread/context |

**Note:** Calling `bot.run()` when an event loop is already running will raise a `RuntimeError` with instructions to use `run_async()` instead.

# Help

* If you are a Cisco employee, and find this useful, consider sending me a [Connected Recognition][8] (cec: `fibrady`) 🙂
Expand Down Expand Up @@ -436,6 +490,15 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN")

* Fix handling websocket 404 to refresh device registrations.

### 1.3.0 (2026-Feb-03)

* Add async support for integration with existing async applications (FastAPI, aiohttp, etc.) ([#68][i68])
* New `run_async()` method for use in async contexts
* New `stop_async()` method for graceful async shutdown
* Improved `stop()` method that works reliably from any thread
* Better error message when `run()` is called with an existing event loop



[1]: https://github.com/aaugustin/websockets

Expand Down Expand Up @@ -484,3 +547,5 @@ bot = WebexBot(teams_bot_token=os.getenv("WEBEX_ACCESS_TOKEN")
[i20]: https://github.com/fbradyirl/webex_bot/issues/20

[i48]: https://github.com/fbradyirl/webex_bot/issues/48

[i68]: https://github.com/fbradyirl/webex_bot/issues/68
76 changes: 76 additions & 0 deletions tests/test_webex_websocket_client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import asyncio
import pytest

from webex_bot.websockets.webex_websocket_client import (
WebexWebsocketClient,
BACKOFF_EXCEPTIONS,
InvalidStatus,
_get_running_loop_or_none,
)


def _make_client():
client = WebexWebsocketClient.__new__(WebexWebsocketClient)
client._get_headers = lambda: {"Authorization": "Bearer test"}
client._loop = None
client._stop_event = None
client.websocket = None
return client


Expand Down Expand Up @@ -53,3 +60,72 @@ def test_invalid_status_not_in_backoff_exceptions():
"InvalidStatus should not be in BACKOFF_EXCEPTIONS. "
"404 errors need immediate device refresh, not backoff retries."
)


def test_get_running_loop_or_none_returns_none_when_no_loop():
"""
_get_running_loop_or_none should return None when there's no running event loop.
"""
result = _get_running_loop_or_none()
assert result is None


def test_get_running_loop_or_none_returns_loop_when_running():
"""
_get_running_loop_or_none should return the running loop when inside an async context.
"""
async def check_loop():
result = _get_running_loop_or_none()
assert result is not None
assert result == asyncio.get_running_loop()

asyncio.run(check_loop())


def test_run_raises_error_when_loop_already_running():
"""
run() should raise RuntimeError with helpful message when an event loop is already running.
"""
client = _make_client()

async def test_in_async_context():
with pytest.raises(RuntimeError) as exc_info:
client.run()
assert "event loop is already running" in str(exc_info.value)
assert "run_async()" in str(exc_info.value)

asyncio.run(test_in_async_context())


def test_stop_sets_stop_event():
"""
stop() should set the stop event when it exists.
"""
client = _make_client()
client._stop_event = asyncio.Event()
assert not client._stop_event.is_set()
client.stop()
assert client._stop_event.is_set()


def test_stop_async_sets_stop_event():
"""
stop_async() should set the stop event.
"""
async def check_stop_async():
client = _make_client()
client._stop_event = asyncio.Event()
assert not client._stop_event.is_set()
await client.stop_async()
assert client._stop_event.is_set()

asyncio.run(check_stop_async())


def test_client_initializes_loop_and_stop_event_as_none():
"""
Client should initialize _loop and _stop_event as None.
"""
client = _make_client()
assert client._loop is None
assert client._stop_event is None
124 changes: 109 additions & 15 deletions webex_bot/websockets/webex_websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@
)


def _get_running_loop_or_none():
"""Get the currently running event loop, or None if there isn't one."""
try:
return asyncio.get_running_loop()
except RuntimeError:
return None


class WebexWebsocketClient(object):
def __init__(self,
access_token,
Expand All @@ -79,6 +87,9 @@ def __init__(self,
self.proxies = proxies
self.websocket = None
self.share_id = None
# Event loop reference for cross-thread communication
self._loop = None
self._stop_event = None
if self.proxies:
self.session.proxies = proxies
if self.proxies:
Expand Down Expand Up @@ -197,7 +208,20 @@ def _ack_message(self, message_id):
logger.debug(f"WebSocket ack message with id={message_id}")
ack_message = {'type': 'ack',
'messageId': message_id}
asyncio.run(self.websocket.send(json.dumps(ack_message)))
# Use run_coroutine_threadsafe since this may be called from an executor thread
if self._loop is not None and self._loop.is_running():
future = asyncio.run_coroutine_threadsafe(
self.websocket.send(json.dumps(ack_message)),
self._loop
)
# Wait for the send to complete (with timeout)
try:
future.result(timeout=10)
except Exception as e:
logger.warning(f"Failed to ack message {message_id}: {e}")
else:
# Fallback for edge cases where loop isn't set
asyncio.run(self.websocket.send(json.dumps(ack_message)))
logger.info(f"WebSocket ack message with id={message_id}. Complete.")

def _get_device_url(self):
Expand Down Expand Up @@ -244,12 +268,41 @@ def _get_device_info(self, check_existing=True):
return self.device_info

def stop(self):
def terminate():
raise SystemExit()
"""
Stop the websocket client gracefully.

asyncio.get_event_loop().create_task(terminate())
Can be called from any thread. Sets a stop event that the run loop monitors.
"""
if self._stop_event is not None:
self._stop_event.set()
elif self._loop is not None and self._loop.is_running():
# Schedule the stop event to be set from the event loop
self._loop.call_soon_threadsafe(self._stop_event.set if self._stop_event else lambda: None)

async def stop_async(self):
"""
Stop the websocket client gracefully (async version).
"""
if self._stop_event is not None:
self._stop_event.set()

async def run_async(self):
"""
Async entry point for running the websocket client.

Use this method when integrating with an existing async application
(e.g., FastAPI, aiohttp, or any asyncio-based framework).

Example usage with FastAPI:
@app.on_event("startup")
async def startup_event():
bot = WebexBot(teams_bot_token="YOUR_TOKEN")
asyncio.create_task(bot.run_async())
"""
# Store the event loop reference for cross-thread communication
self._loop = asyncio.get_running_loop()
self._stop_event = asyncio.Event()

def run(self):
if self.device_info is None:
if self._get_device_info() is None:
logger.error('could not get/create device info')
Expand All @@ -263,8 +316,7 @@ async def _websocket_recv():
logger.debug("WebSocket Received Message(raw): %s\n" % message)
try:
msg = json.loads(message)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, self._process_incoming_websocket_message, msg)
self._loop.run_in_executor(None, self._process_incoming_websocket_message, msg)
except Exception as messageProcessingException:
logger.warning(
f"An exception occurred while processing message. Ignoring. {messageProcessingException}")
Expand Down Expand Up @@ -312,18 +364,25 @@ async def _connect_and_listen():
'data': {'token': 'Bearer ' + self.access_token}}
await self.websocket.send(json.dumps(msg))

while True:
await _websocket_recv()
while not self._stop_event.is_set():
try:
# Use wait_for with timeout to allow checking stop_event periodically
await asyncio.wait_for(_websocket_recv(), timeout=1.0)
except asyncio.TimeoutError:
# Timeout is expected, just continue to check stop_event
continue

# Track the number of consecutive 404 errors to prevent infinite loops
max_404_retries = 3
current_404_retries = 0

while True:
while not self._stop_event.is_set():
try:
asyncio.get_event_loop().run_until_complete(_connect_and_listen())
# If we get here, the connection was successful, so break out of the loop
break
await _connect_and_listen()
# If stop was requested, break out of the loop
if self._stop_event.is_set():
logger.info("Stop requested, exiting run loop.")
break
except InvalidStatus as e:
status_code = getattr(e.response, "status_code", None)
logger.error(f"WebSocket handshake to {ws_url} failed with status {status_code}")
Expand All @@ -342,13 +401,18 @@ async def _connect_and_listen():

# Add a delay before retrying to avoid hammering the server
logger.info(f"Waiting 5 seconds before retry attempt {current_404_retries}...")
asyncio.get_event_loop().run_until_complete(asyncio.sleep(5))
await asyncio.sleep(5)
else:
# For non-404 errors, just raise the exception
raise
except Exception as runException:
logger.error(f"runException: {runException}")

# Check if stop was requested
if self._stop_event.is_set():
logger.info("Stop requested during exception handling, exiting.")
break

# Check if we can get device info
if self._get_device_info(check_existing=False) is None:
logger.error('could not create device info')
Expand All @@ -359,4 +423,34 @@ async def _connect_and_listen():

# Wait a bit before reconnecting
logger.info("Waiting 5 seconds before attempting to reconnect...")
asyncio.get_event_loop().run_until_complete(asyncio.sleep(5))
await asyncio.sleep(5)

logger.info("WebSocket client stopped.")

def run(self):
"""
Synchronous entry point for running the websocket client.

Use this method for standalone scripts that don't have an existing event loop.
For integration with async frameworks (FastAPI, aiohttp, etc.), use run_async() instead.

Example usage:
bot = WebexBot(teams_bot_token="YOUR_TOKEN")
bot.run() # Blocks until stopped
"""
# Check if there's already a running event loop
running_loop = _get_running_loop_or_none()
if running_loop is not None:
raise RuntimeError(
"An event loop is already running. "
"Use 'await bot.run_async()' or 'asyncio.create_task(bot.run_async())' instead of 'bot.run()' "
"when integrating with async frameworks like FastAPI."
)

# No running loop, safe to use asyncio.run()
try:
asyncio.run(self.run_async())
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down.")
except SystemExit:
logger.info("System exit requested, shutting down.")