Skip to content
44 changes: 44 additions & 0 deletions matrix/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from nio import AsyncClient, Event, MatrixRoom

from matrix.message import Message
from matrix.types import File

from .room import Room, make_room
from .space import Space
from .group import Group
Expand Down Expand Up @@ -423,3 +426,44 @@ async def _build_context(self, matrix_room: Room, event: Event) -> Context:
ctx.command = cmd

return ctx

# ROOMS

async def broadcast(
self,
rooms: list[Room],
content: str | None = None,
*,
raw: bool = False,
notice: bool = False,
file: File | None = None,
) -> list[Message]:
"""Broadcasts a message to the specified rooms.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that the space version checks if the bot isn't in the room (and skip them). It's even more important to validate this here.

Although, with the changes I propose we might be able to let it fail but it's probably better to avoid it still.


Supports text messages (with optional markdown formatting)
and file uploads (including images, videos, and audio).
If a space is provided, it is silently skipped.

## Example

```python
# Broadcast a markdown-formatted text message
await bot.broadcast([room1, room2, ...], "Hello **world**!")

# Broadcast a notice message
await bot.broadcast([room1, room2, ...], "Event started", notice=True)

# Broadcast a file
file = File(path="mxc://...", filename="document.pdf", mimetype="application/pdf")
await bot.broadcast([room1, room2, ...], file=file)

# Broadcast an image
image = Image(path="mxc://...", filename="photo.jpg", mimetype="image/jpeg", width=800, height=600)
await bot.broadcast([room1, room2, ...], file=image)
```
"""
rooms = list(filter(lambda child: not isinstance(child, Space), rooms))
async_send = [
room.send(content, raw=raw, notice=notice, file=file) for room in rooms
]
return await asyncio.gather(*async_send)

@PenguinBoi12 PenguinBoi12 Jul 3, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a good chance we get rate limited doing this, so we need to think about how to avoid it. Here's what I have in mind. To avoid rate limiting, we first need to implement this (the BroadcastResult return type), so a rate-limited room doesn't blow up the whole broadcast and we have a place to report per-room failures.

Then, broadcast would look something like:

async def broadcast(self, rooms, content=None, *, max_concurrent=8, **kwargs) -> list[BroadcastResult]:
    semaphore = asyncio.Semaphore(max_concurrent)
    rooms = filter(lambda r: not isinstance(r, Space), rooms)

    async def _send(room):
        async with semaphore:
            try:
                msg = await with_retry(lambda: room.send(content, **kwargs))
                return BroadcastResult(room=room, message=msg, error=None)
            except Exception as e:
                return BroadcastResult(room=room, message=None, error=e)

    return await asyncio.gather(*[_send(room) for room in rooms])

It's also a good opportunity to add a new helper in api.py called with_retry, which would let us retry any coroutine on failure:

async def with_retry(coro, *, retries=3):
    ...

I am also debating if it would be worth it to extract the semaphore logic into another bounded_gather helper or something like that:

async def bounded_gather(coros, *, max_concurrent=8):
    semaphore = asyncio.Semaphore(max_concurrent)
    async def _run(coro):
        async with semaphore:
            return await coro
    return await asyncio.gather(*[_run(c) for c in coros])

This is mainly for consistency with the with_retry and to avoid too much repetitions but it's not blocking either if it's directly in the broadcast so maybe we should

Of course, all this would require some tests as well.


As a FYI, the with_retry and the semaphore solve 2 different problems. The semaphore keeps us from causing rate limit and with_retr handles it if # it happens anyway (or other error happen)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.gather(*async_send) without return_exceptions=True means one failed room.send() (e.g. bot not joined, rate limit, permission error) aborts the whole broadcast and the caller gets an exception with no indication of which rooms succeeded.

So, I think in the case of broadcasting, it would be more relevant to set return_exceptions=Trueand return something that captures both successes and failures per-room rather than letting one failed send blow up everything:

@dataclass
class BroadcastResult:
    room: Room
    message: Message | None
    error: Exception | None

    @property
    def ok(self) -> bool:
        return self.error is None

broadcast() would return a list[BroadcastResult] instead.

52 changes: 52 additions & 0 deletions matrix/space.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import asyncio

from matrix.message import Message
from typing import Self
from matrix.room import Room, make_room

from matrix.types import File


class Space(Room, room_type="m.space"):
def get_children(self, depth: int = 1) -> list[Room | Self]:
Expand Down Expand Up @@ -43,3 +48,50 @@ def get_children(self, depth: int = 1) -> list[Room | Self]:
children.extend(child.get_children(depth - 1))

return children

async def broadcast(
self,
content: str | None = None,
*,
raw: bool = False,
notice: bool = False,
file: File | None = None,
depth: int = 1,
) -> list[Message]:
"""Broadcasts a message to all rooms in this space.

Supports text messages (with optional markdown formatting)
and file uploads (including images, videos, and audio).

Children the bot has not joined are silently omitted. Use `depth` to

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc says that space are silently omitted and the code seems to do just that but there is not test for it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is possible; we should validate that children of space can return children that the bot isn't in. I also don't see where it does that in the code.

If we don't want that we can remove that part of the comment. If we do, we need a test to validate that and something that actually check it.

But as in the earlier comment, with the changes I propose we might be able to let if fail without too much cost.

We probably still want a test because it might be a recurring edge case.

recursively broadcast to children of sub-spaces. `depth=1` broadcasts
to direct children only (default).

## Example

```python
# Broadcast a markdown-formatted text message
await space.broadcast("Hello **world**!")

# Broadcast a notice message
await space.broadcast("Event started", notice=True)

# Broadcast a file
file = File(path="mxc://...", filename="document.pdf", mimetype="application/pdf")
await space.broadcast(file=file)

# Broadcast an image
image = Image(path="mxc://...", filename="photo.jpg", mimetype="image/jpeg", width=800, height=600)
await space.broadcast(file=image)

# Broadcast a notice message to space's rooms and the rooms of its subspaces
await space.broadcast("New Announcement", notice=True, depth=2)
```
"""
rooms = filter(
lambda room: not isinstance(room, Space), self.get_children(depth=depth)
)
async_send = [
room.send(content, raw=raw, notice=notice, file=file) for room in rooms
]
return await asyncio.gather(*async_send)
127 changes: 125 additions & 2 deletions tests/test_bot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest

from unittest.mock import AsyncMock, MagicMock, patch
from nio import MatrixRoom, RoomMessageText, LoginError
from unittest.mock import AsyncMock, MagicMock, Mock, patch
from nio import MatrixRoom, RoomMessageText, LoginError, Event

from matrix import Bot, Config, Extension, Room, Space
from matrix.message import Message
from matrix.types import File
from matrix.errors import (
CheckError,
CommandNotFoundError,
Expand Down Expand Up @@ -970,3 +972,124 @@ async def task():
job_names = [j.name for j in bot.scheduler.jobs]

assert "task" in job_names


@pytest.fixture
def mock_send_response(bot):
"""Set up client to return a mock event after room_send and fetch_event."""
send_response = Mock()
send_response.event_id = "$event123"
bot._client.room_send = AsyncMock(return_value=send_response)

mock_event = MagicMock(spec=Event)
mock_event.event_id = "$event123"

get_event_response = Mock()
get_event_response.event = mock_event
bot._client.room_get_event = AsyncMock(return_value=get_event_response)

return send_response


@pytest.fixture
def make_room(bot):
"""Factory that creates a Room instance for a given room ID."""

def _make(room_id):
matrix_room = MatrixRoom(room_id=room_id, own_user_id="grace")
matrix_room.name = room_id
bot._client.rooms = {**bot._client.rooms, room_id: matrix_room}
return Room(matrix_room, bot.client)

return _make


@pytest.mark.asyncio
async def test_broadcast__expect_message_sent_to_all_rooms(
bot, make_room, mock_send_response
):
room1 = make_room("!room1:example.com")
room2 = make_room("!room2:example.com")

results = await bot.broadcast([room1, room2], "Hello!")

assert len(results) == 2
assert all(isinstance(msg, Message) for msg in results)
assert bot._client.room_send.await_count == 2


@pytest.mark.asyncio
async def test_broadcast__with_empty_list__expect_no_messages(bot, mock_send_response):
results = await bot.broadcast([], "Hello!")

assert results == []
bot._client.room_send.assert_not_awaited()


@pytest.mark.asyncio
async def test_broadcast__with_space_in_list__expect_space_skipped(
bot, make_room, mock_send_response
):
room = make_room("!room:example.com")
space_matrix = MatrixRoom(room_id="!space:example.com", own_user_id="grace")
space_matrix.name = "Test Space"
space_matrix.room_type = "m.space"
bot._client.rooms = {**bot._client.rooms, "!space:example.com": space_matrix}
space = Space(space_matrix, bot.client)

results = await bot.broadcast([room, space], "Hello!")

assert len(results) == 1
assert bot._client.room_send.await_count == 1
sent_room_ids = {
call.kwargs["room_id"] for call in bot._client.room_send.await_args_list
}
assert sent_room_ids == {"!room:example.com"}


@pytest.mark.asyncio
async def test_broadcast_raw__expect_unformatted_messages(
bot, make_room, mock_send_response
):
room = make_room("!room:example.com")

await bot.broadcast([room], "Hello world!", raw=True)

call_args = bot._client.room_send.call_args
content = call_args.kwargs["content"]
assert content["msgtype"] == "m.text"
assert content["body"] == "Hello world!"
assert "formatted_body" not in content


@pytest.mark.asyncio
async def test_broadcast_notice__expect_notice_message_type(
bot, make_room, mock_send_response
):
room = make_room("!room:example.com")

await bot.broadcast([room], "Special Event started!", notice=True)

call_args = bot._client.room_send.call_args
content = call_args.kwargs["content"]
assert content["msgtype"] == "m.notice"
assert content["body"] == "Special Event started!"


@pytest.mark.asyncio
async def test_broadcast_file__expect_file_message(bot, make_room, mock_send_response):
room = make_room("!room:example.com")

file = File(
path="mxc://example.com/abc123",
filename="document.pdf",
mimetype="application/pdf",
)

await bot.broadcast([room], file=file)

call_args = bot._client.room_send.call_args
content = call_args.kwargs["content"]
assert content["msgtype"] == "m.file"
assert content["body"] == "document.pdf"
assert content["url"] == "mxc://example.com/abc123"
Loading