Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19867.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove custom `till_deferred_has_result(...)` in favor of `HomeserverTestCase.wait_on_thread(...)` to drive async Rust (Tokio runtime/thread pool).
85 changes: 31 additions & 54 deletions tests/handlers/test_oauth_delegation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@

import json
import threading
import time
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO
from typing import Any, ClassVar, Coroutine, Generator, TypeVar, Union
from typing import Any, ClassVar, TypeVar
from unittest.mock import ANY, AsyncMock, Mock
from urllib.parse import parse_qs

Expand All @@ -37,7 +36,6 @@
)
from signedjson.sign import sign_json

from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.testing import MemoryReactor

from synapse.api.auth.mas import MasDelegatedAuth
Expand Down Expand Up @@ -809,31 +807,6 @@ class MasAuthDelegation(HomeserverTestCase):
def device_scope(self) -> str:
return self.device_scope_prefix + DEVICE

def till_deferred_has_result(
self,
awaitable: Union[
"Coroutine[Deferred[Any], Any, T]",
"Generator[Deferred[Any], Any, T]",
"Deferred[T]",
],
) -> "Deferred[T]":
"""Wait until a deferred has a result.

This is useful because the Rust HTTP client will resolve the deferred
using reactor.callFromThread, which are only run when we call
reactor.advance.
"""
deferred = ensureDeferred(awaitable)
tries = 0
while not deferred.called:
time.sleep(0.1)
self.reactor.advance(0)
tries += 1
if tries > 100:
raise Exception("Timed out waiting for deferred to resolve")

return deferred

def default_config(self) -> dict[str, Any]:
config = super().default_config()
config["public_baseurl"] = BASE_URL
Expand Down Expand Up @@ -884,9 +857,9 @@ def test_simple_introspection(self) -> None:
}

requester = self.get_success(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
)
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token"))
)
Comment on lines 859 to 863

@MadLittleMods MadLittleMods Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, not very satisfied with this API shape. I think it's good enough as another iteration. Perhaps a better name?

Ideally, we could just do self.get_success(self._auth.get_user_by_access_token("some_token"))) which would drive things to completion regardless of the kind of work necessary (Python or Rust).

But adding real sleeps to get_success(...) for everything would result in slowing down the entire test suite.

We could potentially add a new attribute to HomeserverTestCase like drive_work_on_threads (like the existing needs_threadpool) which would conditionally sleep for real in get_success(...). This is half-decent but having to realize this obscure detail makes things work kinda sucks.

For example, this is how needs_threadpool is defined on a test case basis:

class MediaStorageTests(unittest.HomeserverTestCase):
needs_threadpool = True

Or even better if we could automatically detect when there is work to be done on the Tokio thread pool and do the real sleep loop. Probably have to detect this by using the Tokio RuntimeMetrics. I think it would be better to explore this as a follow-up though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think that we can actually change the sleep to be time.sleep(0), since that is apparently enough to signal that other threads should run. I've tried it locally and the tests tests/handlers/test_oauth_delegation.py tests/synapse_rust/test_http_client.py seem to pass?

At which point this could be added to get_success I think?

@MadLittleMods MadLittleMods Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.sleep(0) does seem to work 👍. If I look at the docs for time.sleep(...) ("Suspend execution of the calling thread [...]"), it also mentions that you can use os.sched_yield() ("Voluntarily relinquish the CPU.") which also works.

https://discuss.python.org/t/time-sleep-0-yield-behaviour/27185/5 mentions that time.sleep(0) releases the GIL while os.sched_yield() doesn't. But I think that is now fixed (looks like it was part of Python 3.10): python/cpython#96078

It's unclear what's better, constant context switching or just allowing some time. I would have assumed that constant context switching would have some impact but the results from #19871 show that it doesn't seem to slow things down in a noticeable way.


self.assertEqual(requester.user.to_string(), USER_ID)
Expand All @@ -907,9 +880,9 @@ def test_unexpiring_token(self) -> None:
}

requester = self.get_success(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
)
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token"))
Comment on lines +883 to +885

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is repetitive but it's nice to know why the wait_on_thread(...) complication is being used here.

)

self.assertEqual(requester.user.to_string(), USER_ID)
Expand All @@ -931,9 +904,9 @@ def test_inexistent_device(self) -> None:
}

failure = self.get_failure(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
),
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token")),
InvalidClientTokenError,
)
self.assertEqual(failure.value.code, 401)
Expand All @@ -948,9 +921,9 @@ def test_inexistent_user(self) -> None:
}

failure = self.get_failure(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
),
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token")),
AuthError,
)
# This is a 500, it should never happen really
Expand All @@ -966,9 +939,9 @@ def test_missing_scope(self) -> None:
}

failure = self.get_failure(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
),
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token")),
InvalidClientTokenError,
)
self.assertEqual(failure.value.code, 401)
Expand All @@ -977,9 +950,9 @@ def test_invalid_response(self) -> None:
self.server.introspection_response = {}

failure = self.get_failure(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
),
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token")),
SynapseError,
)
self.assertEqual(failure.value.code, 503)
Expand All @@ -995,9 +968,9 @@ def test_device_id_in_body(self) -> None:
}

requester = self.get_success(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
)
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token"))
)

self.assertEqual(requester.device_id, DEVICE)
Expand All @@ -1012,9 +985,9 @@ def test_admin_scope(self) -> None:
}

requester = self.get_success(
self.till_deferred_has_result(
self._auth.get_user_by_access_token("some_token")
)
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_access_token("some_token"))
)

self.assertEqual(requester.user.to_string(), USER_ID)
Expand All @@ -1041,7 +1014,9 @@ def test_cached_expired_introspection(self) -> None:

# The first CS-API request causes a successful introspection
self.get_success(
self.till_deferred_has_result(self._auth.get_user_by_req(request))
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_req(request))
)
self.assertEqual(self.server.calls, 1)

Expand All @@ -1050,7 +1025,9 @@ def test_cached_expired_introspection(self) -> None:

# Now the CS-API request fails because the token expired
self.assertFailure(
self.till_deferred_has_result(self._auth.get_user_by_req(request)),
# We have to wait for the async Rust HTTP client (running on the Tokio
# thread pool) to do its thing (see `create_deferred(...)` usage)
self.wait_on_thread(self._auth.get_user_by_req(request)),
InvalidClientTokenError,
)
# Ensure another introspection request was not sent
Expand Down
65 changes: 21 additions & 44 deletions tests/synapse_rust/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Coroutine, Generator, TypeVar, Union
from typing import Any, TypeVar

from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.testing import MemoryReactor

from synapse.logging.context import (
Expand Down Expand Up @@ -118,31 +117,6 @@ def tearDown(self) -> None:
for callbable, args, kwargs in triggers:
callbable(*args, **kwargs)

def till_deferred_has_result(
self,
awaitable: Union[
"Coroutine[Deferred[Any], Any, T]",
"Generator[Deferred[Any], Any, T]",
"Deferred[T]",
],
) -> "Deferred[T]":
"""Wait until a deferred has a result.

This is useful because the Rust HTTP client will resolve the deferred
using reactor.callFromThread, which are only run when we call
reactor.advance.
"""
deferred = ensureDeferred(awaitable)
tries = 0
while not deferred.called:
time.sleep(0.1)
self.reactor.advance(0)
tries += 1
if tries > 100:
raise Exception("Timed out waiting for deferred to resolve")

return deferred

def _check_current_logcontext(self, expected_logcontext_string: str) -> None:
context = current_context()
assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), (
Expand All @@ -159,32 +133,35 @@ def test_request_response(self) -> None:
Test to make sure we can make a basic request and get the expected
response.
"""

async def do_request() -> None:
resp_body = await self._rust_http_client.get(
url=self.server.endpoint,
response_limit=1 * 1024 * 1024,
resp_body = self.get_success(
# We have to wait for the async Rust (running on the Tokio thread pool) to do
# its thing (see `create_deferred(...)` usage)
self.wait_on_thread(
self._rust_http_client.get(
url=self.server.endpoint,
response_limit=1 * 1024 * 1024,
)
)
raw_response = json_decoder.decode(resp_body.decode("utf-8"))
self.assertEqual(raw_response, {"ok": True})
)
raw_response = json_decoder.decode(resp_body.decode("utf-8"))

self.get_success(self.till_deferred_has_result(do_request()))
self.assertEqual(raw_response, {"ok": True})
self.assertEqual(self.server.calls, 1)

def test_request_response_limit_exceeded(self) -> None:
"""
Test to make sure we handle the response limit being exceeded
"""

async def do_request() -> None:
await self._rust_http_client.get(
url=self.server.endpoint,
# Small limit so we hit the limit
response_limit=1,
)

self.assertFailure(
self.till_deferred_has_result(do_request()),
# We have to wait for the async Rust (running on the Tokio thread pool) to do
# its thing (see `create_deferred(...)` usage)
self.wait_on_thread(
self._rust_http_client.get(
url=self.server.endpoint,
# Small limit so we hit the limit
response_limit=1,
)
),
RuntimeError,
)
self.assertEqual(self.server.calls, 1)
Expand Down
50 changes: 45 additions & 5 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,77 +19,78 @@
# [This file includes modifications made by New Vector Limited]
#
#
import os
import functools
import gc
import hashlib
import hmac
import json
import logging
import secrets
import time
from typing import (
AbstractSet,
Any,
Awaitable,
Callable,
ClassVar,
Generic,
Iterable,
Mapping,
NoReturn,
Optional,
Protocol,
TypeVar,
)
from unittest.mock import Mock, patch

import canonicaljson
import signedjson.key
import unpaddedbase64
from typing_extensions import Concatenate, ParamSpec

from twisted.internet.defer import Deferred, ensureDeferred
from twisted.internet.testing import MemoryReactor, MemoryReactorClock
from twisted.python.failure import Failure
from twisted.python.threadpool import ThreadPool
from twisted.trial import unittest
from twisted.web.resource import Resource
from twisted.web.server import Request

from synapse import events
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.config._base import Config, RootConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.config.server import DEFAULT_ROOM_VERSION
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
LoggingContext,
current_context,
set_current_context,
)
from synapse.rest import RegisterServletsFunc
from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester
from synapse.util.clock import Clock
from synapse.util.httpresourcetree import create_resource_tree

from tests.server import (
CustomHeaderType,
FakeChannel,
ThreadedMemoryReactorClock,
get_clock,
make_request,
setup_test_homeserver,
)
from tests.test_utils import event_injection, setup_awaitable_errors
from tests.test_utils.logging_setup import setup_logging
from tests.utils import checked_cast, default_config, setupdb

Check failure on line 93 in tests/unittest.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (I001)

tests/unittest.py:22:1: I001 Import block is un-sorted or un-formatted

setupdb()
setup_logging()
Expand Down Expand Up @@ -474,17 +475,56 @@
# Reset to not use frozen dicts.
events.USE_FROZEN_DICTS = False

def wait_on_thread(self, deferred: Deferred, timeout: int = 10) -> None:
def wait_on_thread(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_on_thread(...) was first introduced in matrix-org/synapse#5475. It's unclear from that PR itself but I'm guessing this is because of the defer_to_thread(...) usage in the media repo.

self,
awaitable: Awaitable[TV],

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to accept any awaitable (Deferred or Coroutine)

timeout: int = 10,
) -> "Deferred[TV]":
"""
Wait until a Deferred is done, where it's waiting on a real thread.
Wait until the Awaitable is done, where it's waiting on a real thread. This
could be things spawned on the Twisted reactor threadpool or Tokio runtime
(async Rust code).

Ideally, this behavior (giving time for other threads to drive forward) would be
built-in to wherever we drive the Twisted reactor but we don't want to slow down
the entire test suite with real sleeps.

With other functions like `get_success(...)`, it only advances the reactor's
*virtual* clock in a tight loop, never yielding real wall-clock time, so the
Tokio threads never get a chance to run. Instead we advance the Twisted reactor
while also sleeping a little real time each iteration.

Args:
awaitable: The thing to wait for
timeout: The maximum amount of real time we should before giving up

Returns:
Deferred (wrapping the awaitable that was passed in)
"""
start_time = time.time()

deferred: Deferred[TV] = ensureDeferred(awaitable) # type: ignore[arg-type]
while not deferred.called:
if start_time + timeout < time.time():
raise ValueError("Timed out waiting for threadpool")
self.reactor.advance(0.01)
time.sleep(0.01)
raise ValueError(
"Timed out waiting for work happening on a thread to finish"
)
# Give some real wall-clock time for other threads to do work. This could be
# things spawned on the Twisted reactor threadpool or Tokio thread pool
# (async Rust code).
# time.sleep(0)
# Suspend execution of this thread to allow other threads to do work. This
# could be things spawned on the Twisted reactor threadpool or Tokio thread
# pool (async Rust code).
#
# We could also use `time.sleep(0)` here
os.sched_yield()
# Advance the Twisted reactor as the thread may have scheduled something on
# the reactor to run (like `reactor.callFromThread(...)`)
self.reactor.advance(0)

@MadLittleMods MadLittleMods Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this from advancing time by 0.01 each iteration to 0. We shouldn't need to advance the Twisted reactor time at all bolstered by the fact that the tests still pass.

Comment on lines +522 to +524

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the order to advance the Twisted reactor after we give some time to other threads to make some progress. The thinking is that the thread can do some work, potentially scheduling some things on reactor, and then we unblock that.

Since we're iterating in a loop, the order probably doesn't matter that much but perhaps this makes more sense.


# Make it easy to chain other things
return deferred

def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
Expand Down
Loading