From 0d3fbf924829e8f0a8bf5da3b57461ef473a877f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Jun 2026 16:01:47 -0500 Subject: [PATCH 1/6] Remove `till_deferred_has_result` from `tests/synapse_rust/test_http_client.py` --- tests/synapse_rust/test_http_client.py | 61 +++++++------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 56fab3a0e1d..cb9e05a962a 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -15,9 +15,8 @@ import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer -from typing import Any, Coroutine, Generator, TypeVar, Union +from typing import Any, TypeVar -from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.logging.context import ( @@ -118,31 +117,6 @@ def tearDown(self) -> None: for callbable, args, kwargs in triggers: callbable(*args, **kwargs) - def till_deferred_has_result( - self, - awaitable: Union[ - "Coroutine[Deferred[Any], Any, T]", - "Generator[Deferred[Any], Any, T]", - "Deferred[T]", - ], - ) -> "Deferred[T]": - """Wait until a deferred has a result. - - This is useful because the Rust HTTP client will resolve the deferred - using reactor.callFromThread, which are only run when we call - reactor.advance. - """ - deferred = ensureDeferred(awaitable) - tries = 0 - while not deferred.called: - time.sleep(0.1) - self.reactor.advance(0) - tries += 1 - if tries > 100: - raise Exception("Timed out waiting for deferred to resolve") - - return deferred - def _check_current_logcontext(self, expected_logcontext_string: str) -> None: context = current_context() assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( @@ -159,32 +133,31 @@ def test_request_response(self) -> None: Test to make sure we can make a basic request and get the expected response. """ + request_d = self._rust_http_client.get( + url=self.server.endpoint, + response_limit=1 * 1024 * 1024, + ) + self.wait_on_thread(request_d) + + resp_body = self.get_success(request_d) + raw_response = json_decoder.decode(resp_body.decode("utf-8")) + self.assertEqual(raw_response, {"ok": True}) - async def do_request() -> None: - resp_body = await self._rust_http_client.get( - url=self.server.endpoint, - response_limit=1 * 1024 * 1024, - ) - raw_response = json_decoder.decode(resp_body.decode("utf-8")) - self.assertEqual(raw_response, {"ok": True}) - - self.get_success(self.till_deferred_has_result(do_request())) self.assertEqual(self.server.calls, 1) def test_request_response_limit_exceeded(self) -> None: """ Test to make sure we handle the response limit being exceeded """ - - async def do_request() -> None: - await self._rust_http_client.get( - url=self.server.endpoint, - # Small limit so we hit the limit - response_limit=1, - ) + request_d = self._rust_http_client.get( + url=self.server.endpoint, + # Small limit so we hit the limit + response_limit=1, + ) + self.wait_on_thread(request_d) self.assertFailure( - self.till_deferred_has_result(do_request()), + request_d, RuntimeError, ) self.assertEqual(self.server.calls, 1) From f23dabaf3d4475afa154cf72488689b2985952a4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Jun 2026 17:10:01 -0500 Subject: [PATCH 2/6] Refine `wait_on_thread` --- tests/handlers/test_oauth_delegation.py | 85 +++++++++---------------- tests/synapse_rust/test_http_client.py | 34 +++++----- tests/unittest.py | 40 ++++++++++-- 3 files changed, 86 insertions(+), 73 deletions(-) diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index c88f2c2d155..ca939dcbc8f 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -21,11 +21,10 @@ import json import threading -import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO -from typing import Any, ClassVar, Coroutine, Generator, TypeVar, Union +from typing import Any, ClassVar, TypeVar from unittest.mock import ANY, AsyncMock, Mock from urllib.parse import parse_qs @@ -37,7 +36,6 @@ ) from signedjson.sign import sign_json -from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.api.auth.mas import MasDelegatedAuth @@ -809,31 +807,6 @@ class MasAuthDelegation(HomeserverTestCase): def device_scope(self) -> str: return self.device_scope_prefix + DEVICE - def till_deferred_has_result( - self, - awaitable: Union[ - "Coroutine[Deferred[Any], Any, T]", - "Generator[Deferred[Any], Any, T]", - "Deferred[T]", - ], - ) -> "Deferred[T]": - """Wait until a deferred has a result. - - This is useful because the Rust HTTP client will resolve the deferred - using reactor.callFromThread, which are only run when we call - reactor.advance. - """ - deferred = ensureDeferred(awaitable) - tries = 0 - while not deferred.called: - time.sleep(0.1) - self.reactor.advance(0) - tries += 1 - if tries > 100: - raise Exception("Timed out waiting for deferred to resolve") - - return deferred - def default_config(self) -> dict[str, Any]: config = super().default_config() config["public_baseurl"] = BASE_URL @@ -884,9 +857,9 @@ def test_simple_introspection(self) -> None: } requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) ) self.assertEqual(requester.user.to_string(), USER_ID) @@ -907,9 +880,9 @@ def test_unexpiring_token(self) -> None: } requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) ) self.assertEqual(requester.user.to_string(), USER_ID) @@ -931,9 +904,9 @@ def test_inexistent_device(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), InvalidClientTokenError, ) self.assertEqual(failure.value.code, 401) @@ -948,9 +921,9 @@ def test_inexistent_user(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), AuthError, ) # This is a 500, it should never happen really @@ -966,9 +939,9 @@ def test_missing_scope(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), InvalidClientTokenError, ) self.assertEqual(failure.value.code, 401) @@ -977,9 +950,9 @@ def test_invalid_response(self) -> None: self.server.introspection_response = {} failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), SynapseError, ) self.assertEqual(failure.value.code, 503) @@ -995,9 +968,9 @@ def test_device_id_in_body(self) -> None: } requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) ) self.assertEqual(requester.device_id, DEVICE) @@ -1012,9 +985,9 @@ def test_admin_scope(self) -> None: } requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) ) self.assertEqual(requester.user.to_string(), USER_ID) @@ -1041,7 +1014,9 @@ def test_cached_expired_introspection(self) -> None: # The first CS-API request causes a successful introspection self.get_success( - self.till_deferred_has_result(self._auth.get_user_by_req(request)) + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_req(request)) ) self.assertEqual(self.server.calls, 1) @@ -1050,7 +1025,9 @@ def test_cached_expired_introspection(self) -> None: # Now the CS-API request fails because the token expired self.assertFailure( - self.till_deferred_has_result(self._auth.get_user_by_req(request)), + # We have to wait for the async Rust HTTP client (running on the Tokio + # thread pool) to do its thing (see `create_deferred(...)` usage) + self.wait_on_thread(self._auth.get_user_by_req(request)), InvalidClientTokenError, ) # Ensure another introspection request was not sent diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index cb9e05a962a..6592be5c4ef 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -133,31 +133,35 @@ def test_request_response(self) -> None: Test to make sure we can make a basic request and get the expected response. """ - request_d = self._rust_http_client.get( - url=self.server.endpoint, - response_limit=1 * 1024 * 1024, + resp_body = self.get_success( + # We have to wait for the async Rust (running on the Tokio thread pool) to do + # its thing (see `create_deferred(...)` usage) + self.wait_on_thread( + self._rust_http_client.get( + url=self.server.endpoint, + response_limit=1 * 1024 * 1024, + ) + ) ) - self.wait_on_thread(request_d) - - resp_body = self.get_success(request_d) raw_response = json_decoder.decode(resp_body.decode("utf-8")) - self.assertEqual(raw_response, {"ok": True}) + self.assertEqual(raw_response, {"ok": True}) self.assertEqual(self.server.calls, 1) def test_request_response_limit_exceeded(self) -> None: """ Test to make sure we handle the response limit being exceeded """ - request_d = self._rust_http_client.get( - url=self.server.endpoint, - # Small limit so we hit the limit - response_limit=1, - ) - self.wait_on_thread(request_d) - self.assertFailure( - request_d, + # We have to wait for the async Rust (running on the Tokio thread pool) to do + # its thing (see `create_deferred(...)` usage) + self.wait_on_thread( + self._rust_http_client.get( + url=self.server.endpoint, + # Small limit so we hit the limit + response_limit=1, + ) + ), RuntimeError, ) self.assertEqual(self.server.calls, 1) diff --git a/tests/unittest.py b/tests/unittest.py index 93131521d03..953538f8ef1 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -96,6 +96,8 @@ logger = logging.getLogger(__name__) +T = TypeVar("T") + TV = TypeVar("TV") _ExcType = TypeVar("_ExcType", bound=BaseException, covariant=True) @@ -474,17 +476,47 @@ def tearDown(self) -> None: # Reset to not use frozen dicts. events.USE_FROZEN_DICTS = False - def wait_on_thread(self, deferred: Deferred, timeout: int = 10) -> None: + def wait_on_thread( + self, + awaitable: Awaitable[TV], + timeout: int = 10, + ) -> Deferred[TV]: """ - Wait until a Deferred is done, where it's waiting on a real thread. + Wait until the Awaitable is done, where it's waiting on a real thread. This + could be things spawned on the Twisted reactor threadpool or Tokio runtime + (async Rust code). + + Ideally, this behavior (giving time for other threads to drive forward) would be + built-in to wherever we drive the Twisted reactor but we don't want to slow down + the entire test suite with real sleeps. + + With other functions like `get_success(...)`, it only advances the reactor's + *virtual* clock in a tight loop, never yielding real wall-clock time, so the + Tokio threads never get a chance to run. Instead we advance the Twisted reactor + while also sleeping a little real time each iteration. + + Args: + awaitable: The thing to wait for + timeout: The maximum amount of real time we should before giving up """ start_time = time.time() + deferred: Deferred[TV] = ensureDeferred(awaitable) # type: ignore[arg-type] while not deferred.called: if start_time + timeout < time.time(): - raise ValueError("Timed out waiting for threadpool") - self.reactor.advance(0.01) + raise ValueError( + "Timed out waiting for work happening on a thread to finish" + ) + # Give some real wall-clock time for other threads to do work. This could be + # things spawned on the Twisted reactor threadpool or Tokio thread pool + # (async Rust code). time.sleep(0.01) + # Advance the Twisted reactor as the thread may have scheduled something on + # the reactor to run (like `reactor.callFromThread(...)`) + self.reactor.advance(0) + + # Make it easy to chain other things + return deferred def wait_for_background_updates(self) -> None: """Block until all background database updates have completed.""" From 1bd4c9d5bbe89936729cc98192ef87f681b85947 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Jun 2026 17:15:41 -0500 Subject: [PATCH 3/6] Refine --- tests/unittest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 953538f8ef1..b494de691e3 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -96,8 +96,6 @@ logger = logging.getLogger(__name__) -T = TypeVar("T") - TV = TypeVar("TV") _ExcType = TypeVar("_ExcType", bound=BaseException, covariant=True) @@ -498,6 +496,9 @@ def wait_on_thread( Args: awaitable: The thing to wait for timeout: The maximum amount of real time we should before giving up + + Returns: + Deferred (wrapping the awaitable that was passed in) """ start_time = time.time() From 488f055239a146f6d024ffead64770fcdf67de12 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Jun 2026 17:18:55 -0500 Subject: [PATCH 4/6] Add changelog --- changelog.d/19867.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19867.misc diff --git a/changelog.d/19867.misc b/changelog.d/19867.misc new file mode 100644 index 00000000000..99fa09cec24 --- /dev/null +++ b/changelog.d/19867.misc @@ -0,0 +1 @@ +Remove custom `till_deferred_has_result(...)` in favor of `HomeserverTestCase.wait_on_thread(...)` to drive async Rust (Tokio runtime/thread pool). From 58018446abdecd7fdfde6679739185f59e0e54a9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 Jun 2026 17:57:52 -0500 Subject: [PATCH 5/6] Fix `trial-olddeps` failing with `builtins.TypeError: 'type' object is not subscriptable` ``` File "/home/runner/work/synapse/synapse/tests/unittest.py", line 481, in HomeserverTestCase ) -> Deferred[TV]: builtins.TypeError: 'type' object is not subscriptable ``` --- tests/unittest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index b494de691e3..5159114c08e 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -478,7 +478,7 @@ def wait_on_thread( self, awaitable: Awaitable[TV], timeout: int = 10, - ) -> Deferred[TV]: + ) -> "Deferred[TV]": """ Wait until the Awaitable is done, where it's waiting on a real thread. This could be things spawned on the Twisted reactor threadpool or Tokio runtime From ef59b6d0b437367cec309086048b1c9af21ab053 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 16:19:09 -0500 Subject: [PATCH 6/6] Try `time.sleep(...)` and `os.sched_yield()` See https://github.com/element-hq/synapse/pull/19867#discussion_r3441774685 --- tests/unittest.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index 5159114c08e..9b7e0a18086 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -19,6 +19,7 @@ # [This file includes modifications made by New Vector Limited] # # +import os import functools import gc import hashlib @@ -511,7 +512,13 @@ def wait_on_thread( # Give some real wall-clock time for other threads to do work. This could be # things spawned on the Twisted reactor threadpool or Tokio thread pool # (async Rust code). - time.sleep(0.01) + # time.sleep(0) + # Suspend execution of this thread to allow other threads to do work. This + # could be things spawned on the Twisted reactor threadpool or Tokio thread + # pool (async Rust code). + # + # We could also use `time.sleep(0)` here + os.sched_yield() # Advance the Twisted reactor as the thread may have scheduled something on # the reactor to run (like `reactor.callFromThread(...)`) self.reactor.advance(0)