🔥 Feature: Add broadcast to bot and space#86
Conversation
…ure/add-space-broadcast
PenguinBoi12
left a comment
There was a problem hiding this comment.
In general the PR looks pretty good.
There's one big thing we need to address regarding rate limite and retry on failing send. I've left a comment that talks about it but we can discuss on matrix if needed.
There's also a couple of missing tests, I haven't listed all of them so you should make sure every cases are well tested, similarily, some cases were tested in one broadcast and notntested in the other, it's important to test the same cases for both broadcast.
Make sure the docstring also reflect those tests or special edge case and to remove what's not part of the things you've done.
| """ | ||
| rooms = filter(lambda child: not isinstance(child, Space), rooms) | ||
| async_send = [room.send(content, raw=raw, notice=notice, file=file) for room in rooms] | ||
| return await asyncio.gather(*async_send) |
There was a problem hiding this comment.
There's a good chance we get rate limited doing this, so we need to think about how to avoid it. Here's what I have in mind. To avoid rate limiting, we first need to implement this (the BroadcastResult return type), so a rate-limited room doesn't blow up the whole broadcast and we have a place to report per-room failures.
Then, broadcast would look something like:
async def broadcast(self, rooms, content=None, *, max_concurrent=8, **kwargs) -> list[BroadcastResult]:
semaphore = asyncio.Semaphore(max_concurrent)
rooms = filter(lambda r: not isinstance(r, Space), rooms)
async def _send(room):
async with semaphore:
try:
msg = await with_retry(lambda: room.send(content, **kwargs))
return BroadcastResult(room=room, message=msg, error=None)
except Exception as e:
return BroadcastResult(room=room, message=None, error=e)
return await asyncio.gather(*[_send(room) for room in rooms])It's also a good opportunity to add a new helper in api.py called with_retry, which would let us retry any coroutine on failure:
async def with_retry(coro, *, retries=3):
...I am also debating if it would be worth it to extract the semaphore logic into another bounded_gather helper or something like that:
async def bounded_gather(coros, *, max_concurrent=8):
semaphore = asyncio.Semaphore(max_concurrent)
async def _run(coro):
async with semaphore:
return await coro
return await asyncio.gather(*[_run(c) for c in coros])This is mainly for consistency with the with_retry and to avoid too much repetitions but it's not blocking either if it's directly in the broadcast so maybe we should
Of course, all this would require some tests as well.
As a FYI, the with_retry and the semaphore solve 2 different problems. The semaphore keeps us from causing rate limit and with_retr handles it if # it happens anyway (or other error happen)
| """ | ||
| rooms = filter(lambda child: not isinstance(child, Space), rooms) | ||
| async_send = [room.send(content, raw=raw, notice=notice, file=file) for room in rooms] | ||
| return await asyncio.gather(*async_send) |
There was a problem hiding this comment.
Using asyncio.gather(*async_send) without return_exceptions=True means one failed room.send() (e.g. bot not joined, rate limit, permission error) aborts the whole broadcast and the caller gets an exception with no indication of which rooms succeeded.
So, I think in the case of broadcasting, it would be more relevant to set return_exceptions=Trueand return something that captures both successes and failures per-room rather than letting one failed send blow up everything:
@dataclass
class BroadcastResult:
room: Room
message: Message | None
error: Exception | None
@property
def ok(self) -> bool:
return self.error is Nonebroadcast() would return a list[BroadcastResult] instead.
| Supports text messages (with optional markdown formatting) | ||
| and file uploads (including images, videos, and audio). | ||
|
|
||
| Children the bot has not joined are silently omitted. Use `depth` to |
There was a problem hiding this comment.
The doc says that space are silently omitted and the code seems to do just that but there is not test for it.
| Supports text messages (with optional markdown formatting) | ||
| and file uploads (including images, videos, and audio). | ||
|
|
||
| Children the bot has not joined are silently omitted. Use `depth` to |
There was a problem hiding this comment.
I am not sure if this is possible; we should validate that children of space can return children that the bot isn't in. I also don't see where it does that in the code.
If we don't want that we can remove that part of the comment. If we do, we need a test to validate that and something that actually check it.
But as in the earlier comment, with the changes I propose we might be able to let if fail without too much cost.
We probably still want a test because it might be a recurring edge case.
| notice: bool = False, | ||
| file: File | None = None, | ||
| ) -> list[Message]: | ||
| """Broadcasts a message to the specified rooms. |
There was a problem hiding this comment.
It's weird that the space version checks if the bot isn't in the room (and skip them). It's even more important to validate this here.
Although, with the changes I propose we might be able to let it fail but it's probably better to avoid it still.
| await bot.broadcast([room1, room2, ...], file=image) | ||
| ``` | ||
| """ | ||
| rooms = filter(lambda child: not isinstance(child, Space), rooms) |
There was a problem hiding this comment.
In both broadcast we probably want to do filter duplicate room. I'm not sure if we can just use a set in that situation but it's worth looking into it and afterwards make a test for this.
Description
This PR adds a
broadcastmethod tomatrix/bot.pyandmatrix/space.py.matrix/bot.py:
Behavior: Calls
send()on the provided list of rooms (skips over spaces).matrix/space.py:
Behavior: Calls
send()on all child rooms of the given space up to the specifieddepth(default is1, which means only broadcast to immediate sub rooms).Fixes: #66