From 520a4bcddb9a6c7958d8a37c4ee84a55d58ded3b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 17:23:59 -0500 Subject: [PATCH 01/57] Refactor `get_success(...)` to allow other threads to make progress --- tests/unittest.py | 48 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 93131521d03..946b9dcb6ca 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -25,6 +25,7 @@ import hmac import json import logging +import os import secrets import time from typing import ( @@ -48,6 +49,7 @@ import unpaddedbase64 from typing_extensions import Concatenate, ParamSpec +from twisted.internet import defer from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor, MemoryReactorClock from twisted.python.failure import Failure @@ -77,6 +79,7 @@ from synapse.storage.keys import FetchKeyResult from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester from synapse.util.clock import Clock +from synapse.util.duration import Duration from synapse.util.httpresourcetree import create_resource_tree from tests.server import ( @@ -736,9 +739,50 @@ def pump(self, by: float = 0.0) -> None: # whole chain to completion. self.reactor.pump([by] * 100) - def get_success(self, d: Awaitable[TV], by: float = 0.0) -> TV: + def get_success( + self, d: Awaitable[TV], timeout: Duration = Duration(seconds=10) + ) -> TV: + """ + Get the success result of an awaitable. + + Does not advance time in the Twisted reactor clock but will loop until the + real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` + scheduled callbacks to run if they are scheduled to run now and 2) will also + allow other threads to make progress. This could be things spawned on the + Twisted reactor threadpool or Tokio runtime (async Rust code). + + Args: + d: awaitable + timeout: Real-time time to wait for the awaitable to have a result. + We use real-time as we may have to wait for work on other threads. + + Raises: + defer.TimeoutError: If the timeout expires before the awaitable completes. + SynchronousTestCase.failureException: If the awaitable has a failure result or has no result + (although you would probably run into `defer.TimeoutError` in that case). + """ + start_time_seconds = time.time() + deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] - self.pump(by=by) + while not deferred.called: + if start_time_seconds + timeout.as_secs() < time.time(): + raise defer.TimeoutError( + "Timed out waiting for work happening on a thread to finish" + ) + + # Suspend execution of this thread to allow other threads to do work. This + # could be things spawned on the Twisted reactor threadpool or Tokio thread + # pool (async Rust code). + # + # We could also use `time.sleep(0)` here but this is more precise + os.sched_yield() + + # Advance the Twisted reactor and run any scheduled callbacks + # + # In terms of other threads, they may have scheduled something on the + # reactor to run (like `reactor.callFromThread(...)`) + self.reactor.advance(0) + return self.successResultOf(deferred) def get_failure( From 65a1c59dc8e41edfc5d9364b5f69a925d513eb8f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 17:32:41 -0500 Subject: [PATCH 02/57] Refactor `get_failure` --- tests/synapse_rust/test_http_client.py | 35 +++--------------- tests/unittest.py | 49 +++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 56fab3a0e1d..3e92bdb2c2a 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -118,31 +118,6 @@ def tearDown(self) -> None: for callbable, args, kwargs in triggers: callbable(*args, **kwargs) - def till_deferred_has_result( - self, - awaitable: Union[ - "Coroutine[Deferred[Any], Any, T]", - "Generator[Deferred[Any], Any, T]", - "Deferred[T]", - ], - ) -> "Deferred[T]": - """Wait until a deferred has a result. - - This is useful because the Rust HTTP client will resolve the deferred - using reactor.callFromThread, which are only run when we call - reactor.advance. - """ - deferred = ensureDeferred(awaitable) - tries = 0 - while not deferred.called: - time.sleep(0.1) - self.reactor.advance(0) - tries += 1 - if tries > 100: - raise Exception("Timed out waiting for deferred to resolve") - - return deferred - def _check_current_logcontext(self, expected_logcontext_string: str) -> None: context = current_context() assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( @@ -168,7 +143,7 @@ async def do_request() -> None: raw_response = json_decoder.decode(resp_body.decode("utf-8")) self.assertEqual(raw_response, {"ok": True}) - self.get_success(self.till_deferred_has_result(do_request())) + self.get_success(do_request()) self.assertEqual(self.server.calls, 1) def test_request_response_limit_exceeded(self) -> None: @@ -183,8 +158,8 @@ async def do_request() -> None: response_limit=1, ) - self.assertFailure( - self.till_deferred_has_result(do_request()), + self.get_failure( + do_request(), RuntimeError, ) self.assertEqual(self.server.calls, 1) @@ -227,8 +202,8 @@ async def do_request() -> None: # Now wait for the function under test to have run with PreserveLoggingContext(): while not callback_finished: - # await self.hs.get_clock().sleep(0) - time.sleep(0.1) + # Allow the async Rust to run + time.sleep(0) self.reactor.advance(0) # check that the logcontext is left in a sane state. diff --git a/tests/unittest.py b/tests/unittest.py index 946b9dcb6ca..d42eb490843 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -786,15 +786,56 @@ def get_success( return self.successResultOf(deferred) def get_failure( - self, d: Awaitable[Any], exc: type[_ExcType], by: float = 0.0 + self, + d: Awaitable[Any], + exc: type[_ExcType], + timeout: Duration = Duration(seconds=10), ) -> _TypedFailure[_ExcType]: """ - Run a Deferred and get a Failure from it. The failure must be of the type `exc`. + Get the failure result of an awaitable. The failure must be of the type `exc`. + + Does not advance time in the Twisted reactor clock but will loop until the + real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` + scheduled callbacks to run if they are scheduled to run now and 2) will also + allow other threads to make progress. This could be things spawned on the + Twisted reactor threadpool or Tokio runtime (async Rust code). + + Args: + d: awaitable + exc: Exception type to expect + timeout: Real-time time to wait for the awaitable to have a result. + We use real-time as we may have to wait for work on other threads. + + Raises: + defer.TimeoutError: If the timeout expires before the awaitable completes. + SynchronousTestCase.failureException: If the awaitable has a failure result or has no result + (although you would probably run into `defer.TimeoutError` in that case). """ - deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] - self.pump(by) + start_time_seconds = time.time() + + deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] + while not deferred.called: + if start_time_seconds + timeout.as_secs() < time.time(): + raise defer.TimeoutError( + "Timed out waiting for work happening on a thread to finish" + ) + + # Suspend execution of this thread to allow other threads to do work. This + # could be things spawned on the Twisted reactor threadpool or Tokio thread + # pool (async Rust code). + # + # We could also use `time.sleep(0)` here but this is more precise + os.sched_yield() + + # Advance the Twisted reactor and run any scheduled callbacks + # + # In terms of other threads, they may have scheduled something on the + # reactor to run (like `reactor.callFromThread(...)`) + self.reactor.advance(0) + return self.failureResultOf(deferred, exc) + # FIXME: Remove def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV: """Drive deferred to completion and return result or raise exception on failure. From fdeed9afb2a030cfffd91d5775de997886ed50a9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 17:34:23 -0500 Subject: [PATCH 03/57] Fix `get_failure(...)` raises docstring --- tests/unittest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index d42eb490843..f250617849b 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -808,8 +808,9 @@ def get_failure( Raises: defer.TimeoutError: If the timeout expires before the awaitable completes. - SynchronousTestCase.failureException: If the awaitable has a failure result or has no result - (although you would probably run into `defer.TimeoutError` in that case). + SynchronousTestCase.failureException: If the awaitable has a success result, + or has an unexpected failure result, or has no result (although you would + probably run into `defer.TimeoutError` in that case). """ start_time_seconds = time.time() From 5ca905016bdceb62c32cdbcae4beb19622a9d1d8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 17:36:35 -0500 Subject: [PATCH 04/57] Add changelog --- changelog.d/19871.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19871.misc diff --git a/changelog.d/19871.misc b/changelog.d/19871.misc new file mode 100644 index 00000000000..be10ee05403 --- /dev/null +++ b/changelog.d/19871.misc @@ -0,0 +1 @@ +Update `HomeserverTestCase.get_success(...)` and friends to drive async Rust (Tokio runtime/thread pool). From 6e9b2a2b0b11c0539d21d9a8b9bfa621c16f638f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 17:39:29 -0500 Subject: [PATCH 05/57] Fix `get_failure` lint --- tests/unittest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index f250617849b..c1bbe62e1ea 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -814,7 +814,7 @@ def get_failure( """ start_time_seconds = time.time() - deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] + deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] while not deferred.called: if start_time_seconds + timeout.as_secs() < time.time(): raise defer.TimeoutError( From c45774c5da80f2a8ef5874c226e5cf751616e002 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 18:14:33 -0500 Subject: [PATCH 06/57] Reduce timeout so you don't have to wait as long when something goes wrong --- tests/unittest.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index c1bbe62e1ea..34dcbf82214 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -495,9 +495,7 @@ def wait_for_background_updates(self) -> None: while not self.get_success( store.db_pool.updates.has_completed_background_updates() ): - self.get_success( - store.db_pool.updates.do_next_background_update(False), by=0.1 - ) + self.get_success(store.db_pool.updates.do_next_background_update(False)) def make_homeserver( self, reactor: ThreadedMemoryReactorClock, clock: Clock @@ -740,7 +738,10 @@ def pump(self, by: float = 0.0) -> None: self.reactor.pump([by] * 100) def get_success( - self, d: Awaitable[TV], timeout: Duration = Duration(seconds=10) + self, + d: Awaitable[TV], + # 2-second default timeout as tests should be fast + timeout: Duration = Duration(seconds=2), ) -> TV: """ Get the success result of an awaitable. @@ -789,7 +790,8 @@ def get_failure( self, d: Awaitable[Any], exc: type[_ExcType], - timeout: Duration = Duration(seconds=10), + # 2-second default timeout as tests should be fast + timeout: Duration = Duration(seconds=2), ) -> _TypedFailure[_ExcType]: """ Get the failure result of an awaitable. The failure must be of the type `exc`. From ae7e3670ed2e6df87eae09b0b71e5bf12cde019f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 18:23:12 -0500 Subject: [PATCH 07/57] Fix test cases that don't need `by=` --- tests/handlers/test_federation.py | 1 - tests/handlers/test_profile.py | 6 +++--- tests/handlers/test_room_member.py | 4 ---- tests/storage/test_event_chain.py | 4 ++-- tests/synapse_rust/test_http_client.py | 3 +-- 5 files changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 794c0a3185f..0c7edbaa2da 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -357,7 +357,6 @@ def create_invite() -> EventBase: event.room_version, ), exc=LimitExceededError, - by=0.5, ) def _build_and_send_join_event( diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 5152e8fc536..ef4f6a1c788 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -200,7 +200,7 @@ async def slow_update_membership(*args: Any, **kwargs: Any) -> tuple[str, int]: self.assertEqual(membership[state_tuple].content["displayname"], "Frank") # Let's be sure we are over the delay introduced by slow_update_membership - self.get_success(self.clock.sleep(Duration(milliseconds=20)), by=1) + self.reactor.advance(Duration(milliseconds=20).as_secs()) membership = self.get_success( self.storage_controllers.state.get_current_state( @@ -278,7 +278,7 @@ async def potentially_slow_update_membership( # Let's be sure we are over the delay introduced by slow_update_membership # and that the task was not executed as expected - self.get_success(self.clock.sleep(Duration(milliseconds=20)), by=1) + self.reactor.advance(Duration(milliseconds=20).as_secs()) membership = self.get_success( self.storage_controllers.state.get_current_state( @@ -300,7 +300,7 @@ async def potentially_slow_update_membership( ) # Let's be sure we are over the delay introduced by slow_update_membership - self.get_success(self.clock.sleep(Duration(milliseconds=20)), by=1) + self.reactor.advance(Duration(milliseconds=20).as_secs()) # Updates should have been resumed from room 2 after the restart # so room 1 should not have been updated this time diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index d5b95e4ef6b..0a7475856a8 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -71,7 +71,6 @@ def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> Non action=Membership.JOIN, ), LimitExceededError, - by=0.5, ) @override_config({"rc_joins_per_room": {"per_second": 0.1, "burst_count": 2}}) @@ -213,7 +212,6 @@ def test_remote_joins_contribute_to_rate_limit(self) -> None: remote_room_hosts=[self.OTHER_SERVER_NAME], ), LimitExceededError, - by=0.5, ) # TODO: test that remote joins to a room are rate limited. @@ -281,7 +279,6 @@ def test_local_users_joining_on_another_worker_contribute_to_rate_limit( action=Membership.JOIN, ), LimitExceededError, - by=0.5, ) # Try to join as Chris on the original worker. Should get denied because Alice @@ -294,7 +291,6 @@ def test_local_users_joining_on_another_worker_contribute_to_rate_limit( action=Membership.JOIN, ), LimitExceededError, - by=0.5, ) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 175a5ffc788..d09437c080b 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -755,7 +755,7 @@ def test_background_update_single_large_room(self) -> None: ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(False), by=0.1 + self.store.db_pool.updates.do_next_background_update(False) ) # Ensure that we did actually take multiple iterations to process the @@ -814,7 +814,7 @@ def test_background_update_multiple_large_room(self) -> None: ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(False), by=0.1 + self.store.db_pool.updates.do_next_background_update(False) ) # Ensure that we did actually take multiple iterations to process the diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 3e92bdb2c2a..7c85cb68399 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -15,9 +15,8 @@ import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer -from typing import Any, Coroutine, Generator, TypeVar, Union +from typing import Any, TypeVar -from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.logging.context import ( From f54d0c06c130695d2445007036d7d51bfb543f0a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 19:08:23 -0500 Subject: [PATCH 08/57] Fix `tests/storage/test_background_update.py` --- tests/storage/test_background_update.py | 62 ++++++++++++++++--------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index e3f79d76707..139906e97ca 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -59,8 +59,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = self.hs.get_datastores().main async def update(self, progress: JsonDict, count: int) -> int: - duration_ms = 10 - await self.clock.sleep(Duration(milliseconds=count * duration_ms)) + fake_work_duration = Duration(seconds=1) + await self.clock.sleep(fake_work_duration) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", @@ -86,10 +86,15 @@ def test_do_background_update(self) -> None: self.update_handler.side_effect = self.update self.update_handler.reset_mock() - res = self.get_success( - self.updates.do_next_background_update(False), - by=0.02, - ) + background_update_d = ensureDeferred( + self.updates.do_next_background_update(False) + ) + # Wait for database queries to run in `do_next_background_update(...)` so the + # background update actually gets scheduled + self.reactor.advance(0) + # Wait for the actual background update `fake_work_duration` + self.reactor.advance(Duration(seconds=1).as_secs()) + res = self.get_success(background_update_d) self.assertFalse(res) # on the first call, we should get run with the default background update size @@ -143,10 +148,15 @@ def test_background_update_default_batch_set_by_config(self) -> None: self.update_handler.side_effect = self.update self.update_handler.reset_mock() - res = self.get_success( - self.updates.do_next_background_update(False), - by=0.01, - ) + background_update_d = ensureDeferred( + self.updates.do_next_background_update(False) + ) + # Wait for database queries to run in `do_next_background_update(...)` so the + # background update actually gets scheduled + self.reactor.advance(0) + # Wait for the actual background update `fake_work_duration` + self.reactor.advance(Duration(seconds=1).as_secs()) + res = self.get_success(background_update_d) self.assertFalse(res) # on the first call, we should get run with the default background update size specified in the config @@ -265,10 +275,15 @@ def test_background_update_duration_set_in_config(self) -> None: self.update_handler.side_effect = self.update self.update_handler.reset_mock() - res = self.get_success( - self.updates.do_next_background_update(False), - by=0.02, - ) + background_update_d = ensureDeferred( + self.updates.do_next_background_update(False) + ) + # Wait for database queries to run in `do_next_background_update(...)` so the + # background update actually gets scheduled + self.reactor.advance(0) + # Wait for the actual background update `fake_work_duration` + self.reactor.advance(Duration(seconds=1).as_secs()) + res = self.get_success(background_update_d) self.assertFalse(res) # the first update was run with the default batch size, this should be run with 500ms as the @@ -298,9 +313,6 @@ def test_background_update_min_batch_set_in_config(self) -> None: """ Test that the minimum batch size set in the config is used """ - # a very long-running individual update - duration_ms = 50 - self.get_success( self.store.db_pool.simple_insert( "background_updates", @@ -310,7 +322,8 @@ def test_background_update_min_batch_set_in_config(self) -> None: # Run the update with the long-running update item async def update_long(progress: JsonDict, count: int) -> int: - await self.clock.sleep(Duration(milliseconds=count * duration_ms)) + very_long_fake_work_duration = Duration(seconds=5) + await self.clock.sleep(very_long_fake_work_duration) progress = {"my_key": progress["my_key"] + 1} await self.store.db_pool.runInteraction( "update_progress", @@ -322,10 +335,15 @@ async def update_long(progress: JsonDict, count: int) -> int: self.update_handler.side_effect = update_long self.update_handler.reset_mock() - res = self.get_success( - self.updates.do_next_background_update(False), - by=1, - ) + background_update_d = ensureDeferred( + self.updates.do_next_background_update(False) + ) + # Wait for database queries to run in `do_next_background_update(...)` so the + # background update actually gets scheduled + self.reactor.advance(0) + # Wait for the actual background update `very_long_fake_work_duration` + self.reactor.advance(Duration(seconds=5).as_secs()) + res = self.get_success(background_update_d) self.assertFalse(res) # the first update was run with the default batch size, this should be run with minimum batch size From 997a160f28cc58c026aef5a9638cce6ebba5b943 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 19:42:17 -0500 Subject: [PATCH 09/57] Fix `tests/app/test_homeserver_shutdown.py` --- tests/app/test_homeserver_shutdown.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/app/test_homeserver_shutdown.py b/tests/app/test_homeserver_shutdown.py index 0f5d1c73387..070eb7eb60c 100644 --- a/tests/app/test_homeserver_shutdown.py +++ b/tests/app/test_homeserver_shutdown.py @@ -24,6 +24,8 @@ from typing import Any from unittest.mock import patch +from twisted.internet.defer import ensureDeferred + from synapse.app.homeserver import SynapseHomeServer from synapse.logging.context import LoggingContext from synapse.storage.background_updates import UpdaterStatus @@ -76,6 +78,13 @@ async def shutdown() -> None: self.get_success(shutdown()) + # XXX: There can be a few already dispatched database queries (from normal + # background tasks in Synapse) and the threadless `ThreadPool` that we use in + # tests uses *untracked* clock calls to pass database results back so `shutdown` + # doesn't cancel those calls. This is a quirk of our test infrastructure + # (threadless `ThreadPool`) so this kind of "hack" is fine. + self.reactor.advance(0) + # Cleanup the internal reference in our test case del self.hs @@ -106,7 +115,10 @@ def test_clean_homeserver_shutdown_mid_background_updates(self) -> None: # Pump the background updates by a single iteration, just to ensure any extra # resources it uses have been started. store = weakref.proxy(self.hs.get_datastores().main) - self.get_success(store.db_pool.updates.do_next_background_update(False), by=0.1) + background_update_d = ensureDeferred( + store.db_pool.updates.do_next_background_update(False) + ) + self.get_success(background_update_d) hs_ref = weakref.ref(self.hs) @@ -127,6 +139,13 @@ async def shutdown() -> None: self.get_success(shutdown()) + # XXX: There can be a few already dispatched database queries (from normal + # background tasks in Synapse) and the threadless `ThreadPool` that we use in + # tests uses *untracked* clock calls to pass database results back so `shutdown` + # doesn't cancel those calls. This is a quirk of our test infrastructure + # (threadless `ThreadPool`) so this kind of "hack" is fine. + self.reactor.advance(0) + # Cleanup the internal reference in our test case del self.hs From b501ad116e33d4d6dbd877bc1d67e17143392b50 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 19:53:14 -0500 Subject: [PATCH 10/57] Fix `tests/handlers/test_presence.py` --- tests/handlers/test_presence.py | 82 ++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 44f1e6432d6..2aeb9a927a9 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -29,6 +29,7 @@ get_verify_key, ) +from twisted.internet.defer import ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.api.constants import EventTypes, Membership, PresenceState @@ -58,6 +59,7 @@ from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -948,12 +950,17 @@ def test_external_process_timeout(self) -> None: ) worker_presence_handler = worker_to_sync_against.get_presence_handler() - self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( self.user_id, self.device_id, True, PresenceState.ONLINE - ), - by=0.1, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + self.get_success(sync_d) # Check that if we wait a while without telling the handler the user has # stopped syncing that their presence state doesn't get timed out. @@ -1264,30 +1271,40 @@ def test_set_presence_from_syncing_multi_device( worker_presence_handler = worker_to_sync_against.get_presence_handler() # 1. Sync with the first device. - self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( user_id, "dev-1", affect_presence=dev_1_state != PresenceState.OFFLINE, presence_state=dev_1_state, - ), - by=0.01, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + self.get_success(sync_d) # 2. Wait half the idle timer. self.reactor.advance(IDLE_TIMER / 1000 / 2) self.reactor.pump([0.1]) # 3. Sync with the second device. - self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( user_id, "dev-2", affect_presence=dev_2_state != PresenceState.OFFLINE, presence_state=dev_2_state, - ), - by=0.01, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + self.get_success(sync_d) # 4. Assert the expected presence state. state = self.get_success( @@ -1305,15 +1322,21 @@ def test_set_presence_from_syncing_multi_device( # # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER. if test_with_workers: - with self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( f"@other-user:{self.hs.config.server.server_name}", "dev-3", affect_presence=True, presence_state=PresenceState.ONLINE, - ), - by=0.01, - ): + ) + ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + + with self.get_success(sync_d): pass # 5. Advance such that the first device should be discarded (the idle timer), @@ -1501,26 +1524,36 @@ def test_set_presence_from_non_syncing_multi_device( worker_presence_handler = worker_to_sync_against.get_presence_handler() # 1. Sync with the first device. - sync_1 = self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( user_id, "dev-1", affect_presence=dev_1_state != PresenceState.OFFLINE, presence_state=dev_1_state, - ), - by=0.1, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + sync_1 = self.get_success(sync_d) # 2. Sync with the second device. - sync_2 = self.get_success( + sync_d = ensureDeferred( worker_presence_handler.user_syncing( user_id, "dev-2", affect_presence=dev_2_state != PresenceState.OFFLINE, presence_state=dev_2_state, - ), - by=0.1, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + sync_2 = self.get_success(sync_d) # 3. Assert the expected presence state. state = self.get_success( @@ -1622,12 +1655,17 @@ def test_set_presence_from_syncing_keeps_busy( # Perform a sync with a presence state other than busy. This should NOT change # our presence status; we only change from busy if we explicitly set it via # /presence/*. - self.get_success( + sync_d = ensureDeferred( worker_to_sync_against.get_presence_handler().user_syncing( self.user_id, self.device_id, True, PresenceState.ONLINE - ), - by=0.1, + ) ) + # `user_syncing` proxies the presence write to the main process over an HTTP + # replication request. The request body is streamed by a `Cooperator` that uses + # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so + # we need to actually advance the clock for it to fire. + self.reactor.advance(Duration(microseconds=1).as_secs()) + self.get_success(sync_d) # Check against the main process that the user's presence did not change. state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) From 9cfd0f9840f6cd3c905a387f66aae26bed7938e3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:13:47 -0500 Subject: [PATCH 11/57] Fix `tests/handlers/test_send_email.py` --- tests/handlers/test_send_email.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_send_email.py b/tests/handlers/test_send_email.py index eea88cd136b..80d34791b65 100644 --- a/tests/handlers/test_send_email.py +++ b/tests/handlers/test_send_email.py @@ -145,8 +145,12 @@ def test_send_email(self) -> None: ) ) + # This matches the two `callLater` delays in `FakeTransport.registerProducer` + self.reactor.advance(0) + self.reactor.advance(0.1) + # the message should now get delivered - self.get_success(d, by=0.1) + self.get_success(d) # check it arrived self.assertEqual(len(message_delivery.messages), 1) @@ -212,8 +216,12 @@ def test_send_email_force_tls(self) -> None: ) ) + # This matches the two `callLater` delays in `FakeTransport.registerProducer` + self.reactor.advance(0) + self.reactor.advance(0.1) + # the message should now get delivered - self.get_success(d, by=0.1) + self.get_success(d) # check it arrived self.assertEqual(len(message_delivery.messages), 1) From 66a515b57295c3ac54e1f04d252afeda692e7238 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:23:19 -0500 Subject: [PATCH 12/57] Explain why remove `get_success_or_raise` --- tests/unittest.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index 34dcbf82214..d27bec7dcd8 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -838,7 +838,13 @@ def get_failure( return self.failureResultOf(deferred, exc) - # FIXME: Remove + # FIXME: Remove as the the exact same semantics as `get_success()`. In + # https://github.com/matrix-org/synapse/pull/8402#discussion_r495992506 where it was + # introduced, it was claimed that "get_success fails the test if the deferred fails + # rather than raising, which I find a bit unintuitive." but `get_success()` actually + # does raise "@raise SynchronousTestCase.failureException : If the + # L{Deferred} has no result or has a failure + # result." at-least in today's world. def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV: """Drive deferred to completion and return result or raise exception on failure. From a1092da603e6e191a767401f21114f23b7c7c2b8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:28:46 -0500 Subject: [PATCH 13/57] Extract logic to `_wait_for_deferred` --- tests/unittest.py | 62 ++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index d27bec7dcd8..ee03b510699 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -737,14 +737,14 @@ def pump(self, by: float = 0.0) -> None: # whole chain to completion. self.reactor.pump([by] * 100) - def get_success( + def _wait_for_deferred( self, - d: Awaitable[TV], + d: Deferred[Any], # 2-second default timeout as tests should be fast timeout: Duration = Duration(seconds=2), - ) -> TV: + ) -> None: """ - Get the success result of an awaitable. + Wait for the awaitable to finish or raise (with real-time timeout). Does not advance time in the Twisted reactor clock but will loop until the real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` @@ -759,13 +759,10 @@ def get_success( Raises: defer.TimeoutError: If the timeout expires before the awaitable completes. - SynchronousTestCase.failureException: If the awaitable has a failure result or has no result - (although you would probably run into `defer.TimeoutError` in that case). """ start_time_seconds = time.time() - deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] - while not deferred.called: + while not d.called: if start_time_seconds + timeout.as_secs() < time.time(): raise defer.TimeoutError( "Timed out waiting for work happening on a thread to finish" @@ -784,6 +781,34 @@ def get_success( # reactor to run (like `reactor.callFromThread(...)`) self.reactor.advance(0) + def get_success( + self, + d: Awaitable[TV], + # 2-second default timeout as tests should be fast + timeout: Duration = Duration(seconds=2), + ) -> TV: + """ + Get the success result of an awaitable. + + Does not advance time in the Twisted reactor clock but will loop until the + real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` + scheduled callbacks to run if they are scheduled to run now and 2) will also + allow other threads to make progress. This could be things spawned on the + Twisted reactor threadpool or Tokio runtime (async Rust code). + + Args: + d: awaitable + timeout: Real-time time to wait for the awaitable to have a result. + We use real-time as we may have to wait for work on other threads. + + Raises: + defer.TimeoutError: If the timeout expires before the awaitable completes. + SynchronousTestCase.failureException: If the awaitable has a failure result or has no result + (although you would probably run into `defer.TimeoutError` in that case). + """ + deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] + self._wait_for_deferred(deferred, timeout) + return self.successResultOf(deferred) def get_failure( @@ -814,27 +839,8 @@ def get_failure( or has an unexpected failure result, or has no result (although you would probably run into `defer.TimeoutError` in that case). """ - start_time_seconds = time.time() - deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] - while not deferred.called: - if start_time_seconds + timeout.as_secs() < time.time(): - raise defer.TimeoutError( - "Timed out waiting for work happening on a thread to finish" - ) - - # Suspend execution of this thread to allow other threads to do work. This - # could be things spawned on the Twisted reactor threadpool or Tokio thread - # pool (async Rust code). - # - # We could also use `time.sleep(0)` here but this is more precise - os.sched_yield() - - # Advance the Twisted reactor and run any scheduled callbacks - # - # In terms of other threads, they may have scheduled something on the - # reactor to run (like `reactor.callFromThread(...)`) - self.reactor.advance(0) + self._wait_for_deferred(deferred, timeout) return self.failureResultOf(deferred, exc) From 09c91d3e83701b8e28ea17139d7382d568962e1e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:29:51 -0500 Subject: [PATCH 14/57] Fix FIXME comment grammar --- tests/unittest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index ee03b510699..4a32dfcdedb 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -844,7 +844,7 @@ def get_failure( return self.failureResultOf(deferred, exc) - # FIXME: Remove as the the exact same semantics as `get_success()`. In + # FIXME: Remove as this has the exact same semantics as `get_success()`. In # https://github.com/matrix-org/synapse/pull/8402#discussion_r495992506 where it was # introduced, it was claimed that "get_success fails the test if the deferred fails # rather than raising, which I find a bit unintuitive." but `get_success()` actually From 4357aa45bd68174f870ef3f697445889b5ac18fe Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:37:50 -0500 Subject: [PATCH 15/57] Use 1 second timeout default --- tests/unittest.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 4a32dfcdedb..5a37a58d215 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -784,8 +784,8 @@ def _wait_for_deferred( def get_success( self, d: Awaitable[TV], - # 2-second default timeout as tests should be fast - timeout: Duration = Duration(seconds=2), + # 1 second default timeout as tests should be fast + timeout: Duration = Duration(seconds=1), ) -> TV: """ Get the success result of an awaitable. @@ -796,6 +796,15 @@ def get_success( allow other threads to make progress. This could be things spawned on the Twisted reactor threadpool or Tokio runtime (async Rust code). + If you need to advance the Twisted reactor by an actual time increment, you can + use the following pattern: + ```python + task_d = ensureDeferred(my_async_task()) + # Please explain why/what scheduled call you're trying to trigger + self.reactor.advance(Duration(seconds=1).as_secs()) + result = self.get_success(sync_d) + ``` + Args: d: awaitable timeout: Real-time time to wait for the awaitable to have a result. @@ -815,8 +824,8 @@ def get_failure( self, d: Awaitable[Any], exc: type[_ExcType], - # 2-second default timeout as tests should be fast - timeout: Duration = Duration(seconds=2), + # 1 second default timeout as tests should be fast + timeout: Duration = Duration(seconds=1), ) -> _TypedFailure[_ExcType]: """ Get the failure result of an awaitable. The failure must be of the type `exc`. From edce488824789ec83b3bf0a18170ee31cde99187 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:39:03 -0500 Subject: [PATCH 16/57] Use "deferred" in `_wait_for_deferred` docstring --- tests/unittest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 5a37a58d215..cb277e87c32 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -744,7 +744,7 @@ def _wait_for_deferred( timeout: Duration = Duration(seconds=2), ) -> None: """ - Wait for the awaitable to finish or raise (with real-time timeout). + Wait for the deferred to finish or raise (with real-time timeout). Does not advance time in the Twisted reactor clock but will loop until the real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` @@ -753,12 +753,12 @@ def _wait_for_deferred( Twisted reactor threadpool or Tokio runtime (async Rust code). Args: - d: awaitable - timeout: Real-time time to wait for the awaitable to have a result. + d: Twisted Deferred + timeout: Real-time time to wait for the deferred to have a result. We use real-time as we may have to wait for work on other threads. Raises: - defer.TimeoutError: If the timeout expires before the awaitable completes. + defer.TimeoutError: If the timeout expires before the deferred completes. """ start_time_seconds = time.time() From 5cc45901d2f6e9cacb8e0161ecb14282efaaf442 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 20:47:13 -0500 Subject: [PATCH 17/57] Add example if you need to advance time --- tests/unittest.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/unittest.py b/tests/unittest.py index cb277e87c32..f6130df9e6a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -799,6 +799,7 @@ def get_success( If you need to advance the Twisted reactor by an actual time increment, you can use the following pattern: ```python + # We use `ensureDeferred(...)` as a `Deferred` can run in the background on its own (unlike a Python coroutine) task_d = ensureDeferred(my_async_task()) # Please explain why/what scheduled call you're trying to trigger self.reactor.advance(Duration(seconds=1).as_secs()) @@ -836,6 +837,16 @@ def get_failure( allow other threads to make progress. This could be things spawned on the Twisted reactor threadpool or Tokio runtime (async Rust code). + If you need to advance the Twisted reactor by an actual time increment, you can + use the following pattern: + ```python + # We use `ensureDeferred(...)` as a `Deferred` can run in the background on its own (unlike a Python coroutine) + task_d = ensureDeferred(my_async_task()) + # Please explain why/what scheduled call you're trying to trigger + self.reactor.advance(Duration(seconds=1).as_secs()) + result = self.get_success(sync_d) + ``` + Args: d: awaitable exc: Exception type to expect From 5b27102f1cca6e6befcae25f54c570fb35df3171 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 21:06:00 -0500 Subject: [PATCH 18/57] Fix `tests.handlers.test_profile.ProfileTestCase.test_background_update_room_membership_resume_after_restart` --- tests/handlers/test_profile.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index ef4f6a1c788..561b45827fd 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -299,6 +299,8 @@ async def potentially_slow_update_membership( ) ) + # Wait for the `TaskScheduler.SCHEDULE_INTERVAL` + self.reactor.advance(Duration(minutes=1).as_secs()) # Let's be sure we are over the delay introduced by slow_update_membership self.reactor.advance(Duration(milliseconds=20).as_secs()) From 44253dfe4f7dd44707d932c30486189c45877515 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 21:08:56 -0500 Subject: [PATCH 19/57] No need to change background update in `tests/app/test_homeserver_shutdown.py` --- tests/app/test_homeserver_shutdown.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/app/test_homeserver_shutdown.py b/tests/app/test_homeserver_shutdown.py index 070eb7eb60c..ed3948bd505 100644 --- a/tests/app/test_homeserver_shutdown.py +++ b/tests/app/test_homeserver_shutdown.py @@ -115,10 +115,7 @@ def test_clean_homeserver_shutdown_mid_background_updates(self) -> None: # Pump the background updates by a single iteration, just to ensure any extra # resources it uses have been started. store = weakref.proxy(self.hs.get_datastores().main) - background_update_d = ensureDeferred( - store.db_pool.updates.do_next_background_update(False) - ) - self.get_success(background_update_d) + self.get_success(store.db_pool.updates.do_next_background_update(False)) hs_ref = weakref.ref(self.hs) From 47297af3ab2bf7fc03e93375f302c8805fbc5c2e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 21:26:55 -0500 Subject: [PATCH 20/57] Fix `tests.handlers.test_user_directory.UserDirectoryTestCase.test_process_join_after_server_leaves_room` `wait_for_background_updates` is not relevant --- tests/handlers/test_user_directory.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index f50fa1f4a02..0f08078b6f4 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -555,7 +555,10 @@ def test_process_join_after_server_leaves_room(self) -> None: # Process the leave and join in one go. dir_handler.update_user_directory = True dir_handler.notify_new_event() - self.wait_for_background_updates() + # `notify_new_event` is fire-and-forget but the actual changes that happen are + # part of a processing loop which we need to wait for. We're specifically + # waiting for the database queries in the `notify_new_event` processing loop. + self.reactor.advance(0) # The user sharing tables should have been updated. public3 = self.get_success(self.user_dir_helper.get_users_in_public_rooms()) @@ -1124,7 +1127,6 @@ def test_local_user_leaving_room_remains_in_user_directory(self) -> None: # Alice leaves the other. She should still be in the directory. self.helper.leave(room2, alice, tok=alice_token) - self.wait_for_background_updates() users, in_public, in_private = self.get_success( self.user_dir_helper.get_tables() ) From cc2c27bb218b653a786ba9780dc544a80459be5e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 21:45:19 -0500 Subject: [PATCH 21/57] Fix `tests/handlers/test_typing.py` --- tests/handlers/test_typing.py | 16 ++++++++++++++++ tests/handlers/test_user_directory.py | 11 ++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 623eef0ecb6..0bbe0845470 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -248,6 +248,14 @@ def test_started_typing_remote_send(self) -> None: ) ) + # Wait for the EDU to get pushed out over federation + # + # `started_typing` is fire-and-forget and handles the remote federation part as + # part of a background process which isn't waited on. + # + # We're specifically waiting for the database queries in the background process + self.reactor.advance(0) + self.mock_federation_client.put_json.assert_called_once_with( "farm", path="/_matrix/federation/v1/send/1000000", @@ -367,6 +375,14 @@ def test_stopped_typing(self) -> None: [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] ) + # Wait for the EDU to get pushed out over federation + # + # `stopped_typing` is fire-and-forget and handles the remote federation part as + # part of a background process which isn't waited on. + # + # We're specifically waiting for the database queries in the background process + self.reactor.advance(0) + self.mock_federation_client.put_json.assert_called_once_with( "farm", path="/_matrix/federation/v1/send/1000000", diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 0f08078b6f4..dc6738ca286 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -555,9 +555,14 @@ def test_process_join_after_server_leaves_room(self) -> None: # Process the leave and join in one go. dir_handler.update_user_directory = True dir_handler.notify_new_event() - # `notify_new_event` is fire-and-forget but the actual changes that happen are - # part of a processing loop which we need to wait for. We're specifically - # waiting for the database queries in the `notify_new_event` processing loop. + + # Wait for the user directory to update + # + # `notify_new_event` is fire-and-forget and the actual changes happen as part of + # a background process loop which isn't waited on. + # + # We're specifically waiting for the database queries in the `notify_new_event` + # background process. self.reactor.advance(0) # The user sharing tables should have been updated. From 26dc51274727dc10206442087f6e2d1d1aec83ec Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 22:18:08 -0500 Subject: [PATCH 22/57] Fix `trial tests.replication.test_federation_ack` --- tests/replication/test_federation_ack.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index e6b9ea53832..07299c96983 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -81,6 +81,14 @@ def test_federation_ack_sent(self) -> None: ) ) + # Wait for the FEDERATION_ACK to be sent + # + # `on_rdata` handles this as part of a background process (see + # `FederationSenderHandler.update_token`) + # + # We're specifically waiting for the database queries in the background process + self.reactor.advance(0) + # now check that the FEDERATION_ACK was sent mock_connection.send_command.assert_called_once() cmd = mock_connection.send_command.call_args[0][0] From 2bce6e7e3ebaa3d27b6ece40c58d5b293f719f8a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 22:19:43 -0500 Subject: [PATCH 23/57] Fix lints --- tests/app/test_homeserver_shutdown.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/app/test_homeserver_shutdown.py b/tests/app/test_homeserver_shutdown.py index ed3948bd505..20d314cb682 100644 --- a/tests/app/test_homeserver_shutdown.py +++ b/tests/app/test_homeserver_shutdown.py @@ -24,8 +24,6 @@ from typing import Any from unittest.mock import patch -from twisted.internet.defer import ensureDeferred - from synapse.app.homeserver import SynapseHomeServer from synapse.logging.context import LoggingContext from synapse.storage.background_updates import UpdaterStatus From ecce8733f526552691e2ebcaa3185223e3b45c99 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 22:34:15 -0500 Subject: [PATCH 24/57] Remove other `till_deferred_has_result` --- tests/handlers/test_oauth_delegation.py | 77 ++++--------------------- 1 file changed, 12 insertions(+), 65 deletions(-) diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index c88f2c2d155..3c939b301c1 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -21,11 +21,10 @@ import json import threading -import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO -from typing import Any, ClassVar, Coroutine, Generator, TypeVar, Union +from typing import Any, ClassVar, TypeVar from unittest.mock import ANY, AsyncMock, Mock from urllib.parse import parse_qs @@ -37,7 +36,6 @@ ) from signedjson.sign import sign_json -from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.api.auth.mas import MasDelegatedAuth @@ -809,31 +807,6 @@ class MasAuthDelegation(HomeserverTestCase): def device_scope(self) -> str: return self.device_scope_prefix + DEVICE - def till_deferred_has_result( - self, - awaitable: Union[ - "Coroutine[Deferred[Any], Any, T]", - "Generator[Deferred[Any], Any, T]", - "Deferred[T]", - ], - ) -> "Deferred[T]": - """Wait until a deferred has a result. - - This is useful because the Rust HTTP client will resolve the deferred - using reactor.callFromThread, which are only run when we call - reactor.advance. - """ - deferred = ensureDeferred(awaitable) - tries = 0 - while not deferred.called: - time.sleep(0.1) - self.reactor.advance(0) - tries += 1 - if tries > 100: - raise Exception("Timed out waiting for deferred to resolve") - - return deferred - def default_config(self) -> dict[str, Any]: config = super().default_config() config["public_baseurl"] = BASE_URL @@ -883,11 +856,7 @@ def test_simple_introspection(self) -> None: "expires_in": 60, } - requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) - ) + requester = self.get_success(self._auth.get_user_by_access_token("some_token")) self.assertEqual(requester.user.to_string(), USER_ID) self.assertEqual(requester.device_id, DEVICE) @@ -906,11 +875,7 @@ def test_unexpiring_token(self) -> None: "username": USERNAME, } - requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) - ) + requester = self.get_success(self._auth.get_user_by_access_token("some_token")) self.assertEqual(requester.user.to_string(), USER_ID) self.assertEqual(requester.device_id, DEVICE) @@ -931,9 +896,7 @@ def test_inexistent_device(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + self._auth.get_user_by_access_token("some_token"), InvalidClientTokenError, ) self.assertEqual(failure.value.code, 401) @@ -948,9 +911,7 @@ def test_inexistent_user(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + self._auth.get_user_by_access_token("some_token"), AuthError, ) # This is a 500, it should never happen really @@ -966,9 +927,7 @@ def test_missing_scope(self) -> None: } failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + self._auth.get_user_by_access_token("some_token"), InvalidClientTokenError, ) self.assertEqual(failure.value.code, 401) @@ -977,9 +936,7 @@ def test_invalid_response(self) -> None: self.server.introspection_response = {} failure = self.get_failure( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ), + self._auth.get_user_by_access_token("some_token"), SynapseError, ) self.assertEqual(failure.value.code, 503) @@ -994,11 +951,7 @@ def test_device_id_in_body(self) -> None: "device_id": DEVICE, } - requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) - ) + requester = self.get_success(self._auth.get_user_by_access_token("some_token")) self.assertEqual(requester.device_id, DEVICE) @@ -1011,11 +964,7 @@ def test_admin_scope(self) -> None: "expires_in": 60, } - requester = self.get_success( - self.till_deferred_has_result( - self._auth.get_user_by_access_token("some_token") - ) - ) + requester = self.get_success(self._auth.get_user_by_access_token("some_token")) self.assertEqual(requester.user.to_string(), USER_ID) self.assertTrue(self.get_success(self._auth.is_server_admin(requester))) @@ -1040,17 +989,15 @@ def test_cached_expired_introspection(self) -> None: request.requestHeaders.getRawHeaders = mock_getRawHeaders() # The first CS-API request causes a successful introspection - self.get_success( - self.till_deferred_has_result(self._auth.get_user_by_req(request)) - ) + self.get_success(self._auth.get_user_by_req(request)) self.assertEqual(self.server.calls, 1) # Sleep for 60 seconds so the token expires. self.reactor.advance(60.0) # Now the CS-API request fails because the token expired - self.assertFailure( - self.till_deferred_has_result(self._auth.get_user_by_req(request)), + self.get_failure( + self._auth.get_user_by_req(request), InvalidClientTokenError, ) # Ensure another introspection request was not sent From 2c511428a37aa3525329da3743faed7344f4c7e1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 22:43:34 -0500 Subject: [PATCH 25/57] Explain better --- tests/synapse_rust/test_http_client.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 7c85cb68399..7a4488d3abd 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -12,8 +12,8 @@ import json import logging +import os import threading -import time from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any, TypeVar @@ -202,7 +202,14 @@ async def do_request() -> None: with PreserveLoggingContext(): while not callback_finished: # Allow the async Rust to run - time.sleep(0) + # + # Suspend execution of this thread to allow other the Tokio thread + # pool to do work. + os.sched_yield() + # Advance the Twisted reactor and run any scheduled callbacks + # + # In terms of other threads, they may have scheduled something on the + # reactor to run (like `reactor.callFromThread(...)`) self.reactor.advance(0) # check that the logcontext is left in a sane state. From 999d22dc6642b27e68dd3527a170051b5581ee7d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 23:07:41 -0500 Subject: [PATCH 26/57] Fix `tests.storage.databases.main.test_events_worker.GetEventCancellationTestCase.test_first_get_event_cancelled` Based on the same fix made in f22e7cda2c72d461acba664cb083e8c4e3c7572a (https://github.com/element-hq/synapse/commit/f22e7cda2c72d461acba664cb083e8c4e3c7572a) --- tests/unittest.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index f6130df9e6a..393bd0334b9 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -762,7 +762,19 @@ def _wait_for_deferred( """ start_time_seconds = time.time() - while not d.called: + # Wait until the deferred has a result + # + # Checking `d.called` by itself is not sufficient by itself as this is possible: + # + # If you have a first `Deferred` `D1`, you can add a callback which returns + # another `Deferred` `D2`, and `D2` must then complete before any further + # callbacks on `D1` will execute (and later callbacks on `D1` get the *result* + # of `D2` rather than `D2` itself). + # + # So, `D1` might have `called=True` (as in, it has started running its + # callbacks), but any new callbacks added to `D1` won't get run until `D2` + # completes. Fortunately, we can detect this by checking `d.paused`. + while not d.called or d.paused: if start_time_seconds + timeout.as_secs() < time.time(): raise defer.TimeoutError( "Timed out waiting for work happening on a thread to finish" From 41642be1166fae2fd740ee922fb391a2c0e49bab Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 23:17:10 -0500 Subject: [PATCH 27/57] Remove `wait_on_thread` --- tests/media/test_media_storage.py | 7 +------ tests/unittest.py | 12 ------------ 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index f25b507aac5..855a623ec09 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -132,12 +132,7 @@ async def test_ensure_media() -> None: # This uses a real blocking threadpool so we have to wait for it to be # actually done :/ - x = defer.ensureDeferred(test_ensure_media()) - - # Hotloop until the threadpool does its job... - self.wait_on_thread(x) - - self.get_success(x) + self.get_success(test_ensure_media()) @attr.s(auto_attribs=True, slots=True, frozen=True) diff --git a/tests/unittest.py b/tests/unittest.py index 393bd0334b9..75980cfaeb3 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -477,18 +477,6 @@ def tearDown(self) -> None: # Reset to not use frozen dicts. events.USE_FROZEN_DICTS = False - def wait_on_thread(self, deferred: Deferred, timeout: int = 10) -> None: - """ - Wait until a Deferred is done, where it's waiting on a real thread. - """ - start_time = time.time() - - while not deferred.called: - if start_time + timeout < time.time(): - raise ValueError("Timed out waiting for threadpool") - self.reactor.advance(0.01) - time.sleep(0.01) - def wait_for_background_updates(self) -> None: """Block until all background database updates have completed.""" store = self.hs.get_datastores().main From 350b15fa554ef4898b5358599bf5ea2a41719193 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 19 Jun 2026 23:17:59 -0500 Subject: [PATCH 28/57] Fix `trial-olddeps`: `builtins.TypeError: 'type' object is not subscriptable` --- tests/unittest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index 75980cfaeb3..16b4b474fbb 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -727,7 +727,7 @@ def pump(self, by: float = 0.0) -> None: def _wait_for_deferred( self, - d: Deferred[Any], + d: "Deferred[Any]", # 2-second default timeout as tests should be fast timeout: Duration = Duration(seconds=2), ) -> None: From 60ddfc615b340fa418b80c603da996f70930c321 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Sat, 20 Jun 2026 01:17:24 -0500 Subject: [PATCH 29/57] Fix `tests.replication.tcp.test_handler.ChannelsTestCase.test_wait_for_stream_position_rdata` --- tests/replication/tcp/test_handler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index a8eb7fc523c..4eda61cd237 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -206,6 +206,12 @@ def test_wait_for_stream_position_rdata(self) -> None: # Finish the context manager, triggering the data to be sent to master. self.get_success(ctx_worker1.__aexit__(None, None, None)) + # Wait for the stream position to be replicated to the master process + # + # Replication travels over `FakeTransport` and we're specifically flushing the + # write + self.reactor.advance(0) + # Master should get told about `next_token2`, so the deferred should # resolve. self.assertTrue(d.called) From 167ad624e5cec4eb008137a9980f06ab2f45abdc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 08:48:03 -0500 Subject: [PATCH 30/57] Run CI again From ad0bbb61b04e5a6771d0aac78b0a4b230d714304 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 13:14:33 -0500 Subject: [PATCH 31/57] Fix `tests.replication.tcp.test_handler.ChannelsTestCase.test_wait_for_stream_position` --- tests/replication/tcp/test_handler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index 4eda61cd237..d35191e654c 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -147,6 +147,12 @@ def test_wait_for_stream_position(self) -> None: # ... but worker1 finishing (and so sending an update) should. self.get_success(ctx_worker1.__aexit__(None, None, None)) + # Wait for the stream position to be replicated to the master process + # + # Replication travels over `FakeTransport` and we're specifically flushing the + # write + self.reactor.advance(0) + self.assertTrue(d.called) def test_wait_for_stream_position_rdata(self) -> None: From f0e968a9f6b2c4afaddf72e4fb8e885d3b07c369 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 13:40:59 -0500 Subject: [PATCH 32/57] Run CI again From 919a94c7a73cdee54918505e351a97e208e66dcb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 15:15:26 -0500 Subject: [PATCH 33/57] Align default timeout with `get_success` --- tests/unittest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 16b4b474fbb..4919e28e707 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -728,8 +728,8 @@ def pump(self, by: float = 0.0) -> None: def _wait_for_deferred( self, d: "Deferred[Any]", - # 2-second default timeout as tests should be fast - timeout: Duration = Duration(seconds=2), + # 1 second default timeout as tests should be fast + timeout: Duration = Duration(seconds=1), ) -> None: """ Wait for the deferred to finish or raise (with real-time timeout). From 2c8ea3adf49e48b90b3b8aab71e866d4f7aee814 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 15:18:30 -0500 Subject: [PATCH 34/57] Remove double space in comment --- tests/synapse_rust/test_http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 7a4488d3abd..c03ea48fef9 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -203,7 +203,7 @@ async def do_request() -> None: while not callback_finished: # Allow the async Rust to run # - # Suspend execution of this thread to allow other the Tokio thread + # Suspend execution of this thread to allow other the Tokio thread # pool to do work. os.sched_yield() # Advance the Twisted reactor and run any scheduled callbacks From 2300a19aa7bbbef4ebee1c2626104cf76da70231 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Jun 2026 17:35:18 -0500 Subject: [PATCH 35/57] Run CI again From a1170c64705cb3ce44500eeed0de15cc00b2fda2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Jun 2026 11:43:46 -0500 Subject: [PATCH 36/57] Run CI again From efcd574635bd1ad47f4d11d449dcfd22d9e4093d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Jun 2026 12:18:02 -0500 Subject: [PATCH 37/57] Run CI again From 749ea8ba7f9c39fc00f9508e41b66b5609c9cb0a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Jun 2026 13:37:54 -0500 Subject: [PATCH 38/57] Run CI again From 92346f2783d4af6b0e374f33b81d97b2a1ee992c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Jun 2026 15:53:43 -0500 Subject: [PATCH 39/57] Switch to cross-compatible `time.sleep()` See https://github.com/element-hq/synapse/pull/19871#discussion_r3462533989 --- tests/synapse_rust/test_http_client.py | 4 ++-- tests/unittest.py | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index c03ea48fef9..845fe2b5033 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -12,8 +12,8 @@ import json import logging -import os import threading +import time from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any, TypeVar @@ -205,7 +205,7 @@ async def do_request() -> None: # # Suspend execution of this thread to allow other the Tokio thread # pool to do work. - os.sched_yield() + time.sleep(0) # Advance the Twisted reactor and run any scheduled callbacks # # In terms of other threads, they may have scheduled something on the diff --git a/tests/unittest.py b/tests/unittest.py index 4919e28e707..d6d8629b849 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -25,7 +25,6 @@ import hmac import json import logging -import os import secrets import time from typing import ( @@ -771,9 +770,7 @@ def _wait_for_deferred( # Suspend execution of this thread to allow other threads to do work. This # could be things spawned on the Twisted reactor threadpool or Tokio thread # pool (async Rust code). - # - # We could also use `time.sleep(0)` here but this is more precise - os.sched_yield() + time.sleep(0) # Advance the Twisted reactor and run any scheduled callbacks # From ce1758ce9647f538ab389f03ed5251da9ac9921c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 23 Jun 2026 15:58:14 -0500 Subject: [PATCH 40/57] Comment about Python's default thread switch interval See https://github.com/element-hq/synapse/pull/19871#discussion_r3462756640 --- tests/unittest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unittest.py b/tests/unittest.py index d6d8629b849..c0cf80b3469 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -770,6 +770,12 @@ def _wait_for_deferred( # Suspend execution of this thread to allow other threads to do work. This # could be things spawned on the Twisted reactor threadpool or Tokio thread # pool (async Rust code). + # + # Note: Since we're waiting real-time (`timeout` duration), the tests also + # pass with `time.sleep(0)` commented out because Python has a default + # thread switch interval (5ms for cpython) (see + # `sys.setswitchinterval(interval)`). We still want this here as we're able + # to preempt and cause the thread context swtich to happen faster. time.sleep(0) # Advance the Twisted reactor and run any scheduled callbacks From a871de0d2607bf547d344b0e27149d80b76c962c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Jun 2026 15:28:30 -0500 Subject: [PATCH 41/57] `time.sleep(0.001)` after a few loops --- tests/unittest.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index c0cf80b3469..2f75162b0c0 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -761,6 +761,7 @@ def _wait_for_deferred( # So, `D1` might have `called=True` (as in, it has started running its # callbacks), but any new callbacks added to `D1` won't get run until `D2` # completes. Fortunately, we can detect this by checking `d.paused`. + loop_count = 0 while not d.called or d.paused: if start_time_seconds + timeout.as_secs() < time.time(): raise defer.TimeoutError( @@ -772,11 +773,20 @@ def _wait_for_deferred( # pool (async Rust code). # # Note: Since we're waiting real-time (`timeout` duration), the tests also - # pass with `time.sleep(0)` commented out because Python has a default + # pass with `time.sleep(...)` commented out because Python has a default # thread switch interval (5ms for cpython) (see # `sys.setswitchinterval(interval)`). We still want this here as we're able # to preempt and cause the thread context swtich to happen faster. - time.sleep(0) + # + # After a few cycles, we use `time.sleep(0.001)` instead of `time.sleep(0)` + # to avoid tightlooping on the main thread (CPU 100%) because it's wasteful + # and may starve out other threads. 10 is arbitrary but many cases will have + # none or only a few round-trips so we can just try to go as fast as + # posssible. + if loop_count < 10: + time.sleep(0) + else: + time.sleep(0.001) # Advance the Twisted reactor and run any scheduled callbacks # @@ -784,6 +794,8 @@ def _wait_for_deferred( # reactor to run (like `reactor.callFromThread(...)`) self.reactor.advance(0) + loop_count += 1 + def get_success( self, d: Awaitable[TV], From 28586c6d30f3bef15ed65faf2cf2dee5d971ebc8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 14:07:34 -0500 Subject: [PATCH 42/57] Remove real-time `timeout` --- tests/unittest.py | 68 +++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 2f75162b0c0..acb98d97a08 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -727,28 +727,22 @@ def pump(self, by: float = 0.0) -> None: def _wait_for_deferred( self, d: "Deferred[Any]", - # 1 second default timeout as tests should be fast - timeout: Duration = Duration(seconds=1), ) -> None: """ Wait for the deferred to finish or raise (with real-time timeout). - Does not advance time in the Twisted reactor clock but will loop until the - real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` - scheduled callbacks to run if they are scheduled to run now and 2) will also - allow other threads to make progress. This could be things spawned on the - Twisted reactor threadpool or Tokio runtime (async Rust code). + Does not advance time in the Twisted reactor clock but will loop 100 times + waiting for a result. The loop 1) allows `clock.call_later` scheduled callbacks + to run if they are scheduled to run now and 2) will also allow other threads to + make progress. This could be things spawned on the Twisted reactor threadpool or + Tokio runtime (async Rust code). Args: d: Twisted Deferred - timeout: Real-time time to wait for the deferred to have a result. - We use real-time as we may have to wait for work on other threads. Raises: defer.TimeoutError: If the timeout expires before the deferred completes. """ - start_time_seconds = time.time() - # Wait until the deferred has a result # # Checking `d.called` by itself is not sufficient by itself as this is possible: @@ -763,20 +757,22 @@ def _wait_for_deferred( # completes. Fortunately, we can detect this by checking `d.paused`. loop_count = 0 while not d.called or d.paused: - if start_time_seconds + timeout.as_secs() < time.time(): - raise defer.TimeoutError( - "Timed out waiting for work happening on a thread to finish" - ) + # 100 loops is arbitrary but based on previous code which used to "pump" and + # advance the reactor 100 times. This also makes the assumption that any + # work on other threads will finish before we give up after sleeping ~0.1s + # of real-time (100 * 0.001). + if loop_count > 100: + raise defer.TimeoutError("Timed out waiting for deferred to finish") # Suspend execution of this thread to allow other threads to do work. This # could be things spawned on the Twisted reactor threadpool or Tokio thread # pool (async Rust code). # - # Note: Since we're waiting real-time (`timeout` duration), the tests also - # pass with `time.sleep(...)` commented out because Python has a default - # thread switch interval (5ms for cpython) (see - # `sys.setswitchinterval(interval)`). We still want this here as we're able - # to preempt and cause the thread context swtich to happen faster. + # Note: Python has a default thread switch interval (5ms for cpython) (see + # `sys.setswitchinterval(interval)`) but we still want this here as we're + # able to preempt and cause the thread context swtich to happen faster. + # Also, without any real-time sleeping, this function would complete before + # the 5ms switch ever happened. # # After a few cycles, we use `time.sleep(0.001)` instead of `time.sleep(0)` # to avoid tightlooping on the main thread (CPU 100%) because it's wasteful @@ -799,17 +795,15 @@ def _wait_for_deferred( def get_success( self, d: Awaitable[TV], - # 1 second default timeout as tests should be fast - timeout: Duration = Duration(seconds=1), ) -> TV: """ Get the success result of an awaitable. - Does not advance time in the Twisted reactor clock but will loop until the - real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` - scheduled callbacks to run if they are scheduled to run now and 2) will also - allow other threads to make progress. This could be things spawned on the - Twisted reactor threadpool or Tokio runtime (async Rust code). + Does not advance time in the Twisted reactor clock but will loop 100 times + waiting for a result. The loop 1) allows `clock.call_later` scheduled callbacks + to run if they are scheduled to run now and 2) will also allow other threads to + make progress. This could be things spawned on the Twisted reactor threadpool or + Tokio runtime (async Rust code). If you need to advance the Twisted reactor by an actual time increment, you can use the following pattern: @@ -823,8 +817,6 @@ def get_success( Args: d: awaitable - timeout: Real-time time to wait for the awaitable to have a result. - We use real-time as we may have to wait for work on other threads. Raises: defer.TimeoutError: If the timeout expires before the awaitable completes. @@ -832,7 +824,7 @@ def get_success( (although you would probably run into `defer.TimeoutError` in that case). """ deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type] - self._wait_for_deferred(deferred, timeout) + self._wait_for_deferred(deferred) return self.successResultOf(deferred) @@ -840,17 +832,15 @@ def get_failure( self, d: Awaitable[Any], exc: type[_ExcType], - # 1 second default timeout as tests should be fast - timeout: Duration = Duration(seconds=1), ) -> _TypedFailure[_ExcType]: """ Get the failure result of an awaitable. The failure must be of the type `exc`. - Does not advance time in the Twisted reactor clock but will loop until the - real-time `timeout` waiting for a result. The loop 1) allows `clock.call_later` - scheduled callbacks to run if they are scheduled to run now and 2) will also - allow other threads to make progress. This could be things spawned on the - Twisted reactor threadpool or Tokio runtime (async Rust code). + Does not advance time in the Twisted reactor clock but will loop 100 times + waiting for a result. The loop 1) allows `clock.call_later` scheduled callbacks + to run if they are scheduled to run now and 2) will also allow other threads to + make progress. This could be things spawned on the Twisted reactor threadpool or + Tokio runtime (async Rust code). If you need to advance the Twisted reactor by an actual time increment, you can use the following pattern: @@ -865,8 +855,6 @@ def get_failure( Args: d: awaitable exc: Exception type to expect - timeout: Real-time time to wait for the awaitable to have a result. - We use real-time as we may have to wait for work on other threads. Raises: defer.TimeoutError: If the timeout expires before the awaitable completes. @@ -875,7 +863,7 @@ def get_failure( probably run into `defer.TimeoutError` in that case). """ deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] - self._wait_for_deferred(deferred, timeout) + self._wait_for_deferred(deferred) return self.failureResultOf(deferred, exc) From cb91056344d0a541de215f9c7bec3ecaba6bf601 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 16:13:58 -0500 Subject: [PATCH 43/57] Fix lints --- tests/unittest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index acb98d97a08..d22cbee2108 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -78,7 +78,6 @@ from synapse.storage.keys import FetchKeyResult from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester from synapse.util.clock import Clock -from synapse.util.duration import Duration from synapse.util.httpresourcetree import create_resource_tree from tests.server import ( From a975f8309ddb33c3872e8f7db24585db6abea8d4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 16:14:08 -0500 Subject: [PATCH 44/57] Refactor to make `CLOCK_SCHEDULE_EPSILON` generic --- synapse/http/client.py | 10 ++-------- synapse/util/clock.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 05c5f13a874..78f03ae58a9 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -87,8 +87,7 @@ from synapse.metrics import SERVER_NAME_LABEL from synapse.types import ISynapseReactor, StrSequence from synapse.util.async_helpers import timeout_deferred -from synapse.util.clock import Clock -from synapse.util.duration import Duration +from synapse.util.clock import CLOCK_SCHEDULE_EPSILON, Clock from synapse.util.json import json_decoder if TYPE_CHECKING: @@ -163,11 +162,6 @@ def _is_ip_blocked( return False -# The delay used by the scheduler to schedule tasks "as soon as possible", while -# still allowing other tasks to run between runs. -_EPSILON = Duration(microseconds=1) - - def _make_scheduler(clock: Clock) -> Callable[[Callable[[], object]], IDelayedCall]: """Makes a schedular suitable for a Cooperator using the given reactor. @@ -176,7 +170,7 @@ def _make_scheduler(clock: Clock) -> Callable[[Callable[[], object]], IDelayedCa def _scheduler(x: Callable[[], object]) -> IDelayedCall: return clock.call_later( - _EPSILON, + CLOCK_SCHEDULE_EPSILON, x, ) diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 7232a1331c8..8c056757323 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -62,6 +62,19 @@ logging.setLoggerClass(original_logger_class) +CLOCK_SCHEDULE_EPSILON = Duration(microseconds=1) +""" +The smallest value we can use that will schedule tasks "as soon as possible", while +still allowing other tasks to run between runs. + +This should be a non-zero value as the Twisted Reactor API does not specify how calls +get scheduled. If we used `0`, a weird reactor implementation could run it immediately +or run it any order with the other calls that are scheduled now. + +We want the semantics of run this in the "next reactor iteration". +""" + + def _try_wakeup_deferred(d: Deferred) -> None: """Try to wake up a deferred, but ignore any exceptions raised by the callback. This is useful when we want to wake up a deferred that may have From 34f015f8da0e8bb084f79ec5dc1647d2ad6e444c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 16:17:57 -0500 Subject: [PATCH 45/57] Use `callLater(0)` for `FakeTransport` --- tests/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server.py b/tests/server.py index ce5eaad63da..282746d9d93 100644 --- a/tests/server.py +++ b/tests/server.py @@ -940,7 +940,7 @@ def _produce() -> None: # mypy ignored here because: # - this is part of the test infrastructure (outside of Synapse) so tracking # these calls for for homeserver shutdown doesn't make sense. - d.addCallback(lambda x: self._reactor.callLater(0.1, _produce)) # type: ignore[call-later-not-tracked,call-overload] + d.addCallback(lambda x: self._reactor.callLater(0.0, _produce)) # type: ignore[call-later-not-tracked,call-overload] if not streaming: # mypy ignored here because: From e7b09b3e42ebe5ca13c4d0c9d2e49928d94ec2fd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 16:19:53 -0500 Subject: [PATCH 46/57] Use `callLater(0)` for `FakeChannel` --- tests/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server.py b/tests/server.py index 282746d9d93..eea11b301f9 100644 --- a/tests/server.py +++ b/tests/server.py @@ -255,7 +255,7 @@ def registerProducer(self, producer: IProducer, streaming: bool) -> None: def _produce() -> None: if self._producer: self._producer.resumeProducing() - self._reactor.callLater(0.1, _produce) + self._reactor.callLater(0.0, _produce) if not streaming: self._reactor.callLater(0.0, _produce) From f11be8b44f0c653de6da8faa49fd5421d77b128e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 26 Jun 2026 16:24:27 -0500 Subject: [PATCH 47/57] Remove some no longer necessary `advance(...)` in tests (related to `FakeTransport.registerProducer`) --- tests/handlers/test_send_email.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/handlers/test_send_email.py b/tests/handlers/test_send_email.py index 80d34791b65..f343b17a2fb 100644 --- a/tests/handlers/test_send_email.py +++ b/tests/handlers/test_send_email.py @@ -145,9 +145,8 @@ def test_send_email(self) -> None: ) ) - # This matches the two `callLater` delays in `FakeTransport.registerProducer` + # This matches the `callLater` delays in `FakeTransport.registerProducer` self.reactor.advance(0) - self.reactor.advance(0.1) # the message should now get delivered self.get_success(d) @@ -216,9 +215,8 @@ def test_send_email_force_tls(self) -> None: ) ) - # This matches the two `callLater` delays in `FakeTransport.registerProducer` + # This matches the `callLater` delays in `FakeTransport.registerProducer` self.reactor.advance(0) - self.reactor.advance(0.1) # the message should now get delivered self.get_success(d) From 2bff8196dae22472c24cd13104e7d3e96c8f953e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:12:17 -0500 Subject: [PATCH 48/57] Update references to `_EPSILON` (now called `CLOCK_SCHEDULE_EPSILON`) --- tests/handlers/test_presence.py | 54 ++++++++++++++++++--------------- tests/rest/admin/test_user.py | 10 +++--- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 2aeb9a927a9..bfdca9c1d59 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -58,8 +58,7 @@ from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, UserID, get_domain_from_id -from synapse.util.clock import Clock -from synapse.util.duration import Duration +from synapse.util.clock import CLOCK_SCHEDULE_EPSILON, Clock from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -957,9 +956,10 @@ def test_external_process_timeout(self) -> None: ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) self.get_success(sync_d) # Check that if we wait a while without telling the handler the user has @@ -1281,9 +1281,10 @@ def test_set_presence_from_syncing_multi_device( ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) self.get_success(sync_d) # 2. Wait half the idle timer. @@ -1301,9 +1302,10 @@ def test_set_presence_from_syncing_multi_device( ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) self.get_success(sync_d) # 4. Assert the expected presence state. @@ -1331,10 +1333,11 @@ def test_set_presence_from_syncing_multi_device( ) ) # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # replication request. The request body is streamed by a `Cooperator` that + # uses the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for + # it to fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) with self.get_success(sync_d): pass @@ -1534,9 +1537,10 @@ def test_set_presence_from_non_syncing_multi_device( ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) sync_1 = self.get_success(sync_d) # 2. Sync with the second device. @@ -1550,9 +1554,10 @@ def test_set_presence_from_non_syncing_multi_device( ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) sync_2 = self.get_success(sync_d) # 3. Assert the expected presence state. @@ -1662,9 +1667,10 @@ def test_set_presence_from_syncing_keeps_busy( ) # `user_syncing` proxies the presence write to the main process over an HTTP # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so - # we need to actually advance the clock for it to fire. - self.reactor.advance(Duration(microseconds=1).as_secs()) + # the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) self.get_success(sync_d) # Check against the main process that the user's presence did not change. diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index add00453b6d..c8ff697c611 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -59,7 +59,7 @@ from synapse.server import HomeServer from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY from synapse.types import JsonDict, UserID, create_requester -from synapse.util.clock import Clock +from synapse.util.clock import CLOCK_SCHEDULE_EPSILON, Clock from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -5850,10 +5850,12 @@ def test_redact_messages_all_rooms(self) -> None: self.assertEqual(channel.code, 200) id = channel.json_body.get("redact_id") - # Need 1 tick as we send 1 replication request per original event - # and each wait must be >= `_EPSILON` from `http/client.py` + # Need 1 tick as we send 1 replication request per original event. The + # replication request body is streamed by a `Cooperator` that uses the clock to + # schedule each chunk at a tiny *non-zero* delay (`CLOCK_SCHEDULE_EPSILON`), so + # we need to actually advance the clock for it to fire. for _ in range(len(original_event_ids)): - self.reactor.advance(0.001) + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) # Verify the HTTP `redact_status` endpoint reports completion. channel2 = self.make_request( From a12a9e09b00c706c3fa7f8b73fddc81c33e28ed6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:22:18 -0500 Subject: [PATCH 49/57] Remove some no longer necessary `advance(...)` in tests (related to `FakeTransport.registerProducer`) pt. 2 --- tests/handlers/test_send_email.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/handlers/test_send_email.py b/tests/handlers/test_send_email.py index f343b17a2fb..acb88343f2e 100644 --- a/tests/handlers/test_send_email.py +++ b/tests/handlers/test_send_email.py @@ -145,9 +145,6 @@ def test_send_email(self) -> None: ) ) - # This matches the `callLater` delays in `FakeTransport.registerProducer` - self.reactor.advance(0) - # the message should now get delivered self.get_success(d) @@ -215,9 +212,6 @@ def test_send_email_force_tls(self) -> None: ) ) - # This matches the `callLater` delays in `FakeTransport.registerProducer` - self.reactor.advance(0) - # the message should now get delivered self.get_success(d) From d03437387913f03b25df048188ab462a0212d130 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:28:16 -0500 Subject: [PATCH 50/57] Better label fire-and-forget --- tests/replication/test_federation_ack.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 07299c96983..c8de7b1fad6 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -83,7 +83,7 @@ def test_federation_ack_sent(self) -> None: # Wait for the FEDERATION_ACK to be sent # - # `on_rdata` handles this as part of a background process (see + # `on_rdata` handles this as part of a fire-and-forget background process (see # `FederationSenderHandler.update_token`) # # We're specifically waiting for the database queries in the background process From 77184c85bb109ee0d15d9a2f0de9228e34f6affa Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:30:13 -0500 Subject: [PATCH 51/57] Update docstring to be accurate after 100 loop refactor instead of real-time timeout --- tests/unittest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittest.py b/tests/unittest.py index d22cbee2108..53d55b389ea 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -728,7 +728,7 @@ def _wait_for_deferred( d: "Deferred[Any]", ) -> None: """ - Wait for the deferred to finish or raise (with real-time timeout). + Wait for the deferred to finish or raise. Does not advance time in the Twisted reactor clock but will loop 100 times waiting for a result. The loop 1) allows `clock.call_later` scheduled callbacks From 5be3534fdd005a54df10d977f5cce8b845ca398f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:34:29 -0500 Subject: [PATCH 52/57] Better document plan to refactor and discussion where it spawned from --- tests/unittest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unittest.py b/tests/unittest.py index 53d55b389ea..bb3c31683ce 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -873,6 +873,10 @@ def get_failure( # does raise "@raise SynchronousTestCase.failureException : If the # L{Deferred} has no result or has a failure # result." at-least in today's world. + # + # As another alternative, we could also just update `get_success(...)` to have this + # behavior as the default, see + # https://github.com/element-hq/synapse/pull/19871#discussion_r3483616710 def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV: """Drive deferred to completion and return result or raise exception on failure. From 77ce7a547cc1d0d35bc2283975d5760336099a8d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 22:47:48 -0500 Subject: [PATCH 53/57] More accurate `test_redact_messages_all_rooms` description --- tests/rest/admin/test_user.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index c8ff697c611..bce199c564f 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -5850,10 +5850,14 @@ def test_redact_messages_all_rooms(self) -> None: self.assertEqual(channel.code, 200) id = channel.json_body.get("redact_id") - # Need 1 tick as we send 1 replication request per original event. The - # replication request body is streamed by a `Cooperator` that uses the clock to - # schedule each chunk at a tiny *non-zero* delay (`CLOCK_SCHEDULE_EPSILON`), so - # we need to actually advance the clock for it to fire. + # `/redact` just schedules a background task that runs in the background + # (fire-and-forget) so we need to do the waiting here. + # + # Need 1 tick as we send 1 replication request for the redaction of each + # original event. The replication request body is streamed by a `Cooperator` + # that uses the clock to schedule each chunk at a tiny *non-zero* delay + # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to + # fire. for _ in range(len(original_event_ids)): self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) From bc54b6565b1b74b73481bbc04e555430deaf40fc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 23:07:55 -0500 Subject: [PATCH 54/57] Advance by `CLOCK_SCHEDULE_EPSILON` See https://github.com/element-hq/synapse/pull/19871#discussion_r3445201362 --- tests/handlers/test_presence.py | 69 +++++---------------------------- tests/unittest.py | 9 ++++- 2 files changed, 16 insertions(+), 62 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index bfdca9c1d59..5d02c701614 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -29,7 +29,6 @@ get_verify_key, ) -from twisted.internet.defer import ensureDeferred from twisted.internet.testing import MemoryReactor from synapse.api.constants import EventTypes, Membership, PresenceState @@ -58,7 +57,7 @@ from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, UserID, get_domain_from_id -from synapse.util.clock import CLOCK_SCHEDULE_EPSILON, Clock +from synapse.util.clock import Clock from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -949,18 +948,11 @@ def test_external_process_timeout(self) -> None: ) worker_presence_handler = worker_to_sync_against.get_presence_handler() - sync_d = ensureDeferred( + self.get_success( worker_presence_handler.user_syncing( self.user_id, self.device_id, True, PresenceState.ONLINE ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - self.get_success(sync_d) # Check that if we wait a while without telling the handler the user has # stopped syncing that their presence state doesn't get timed out. @@ -1271,7 +1263,7 @@ def test_set_presence_from_syncing_multi_device( worker_presence_handler = worker_to_sync_against.get_presence_handler() # 1. Sync with the first device. - sync_d = ensureDeferred( + self.get_success( worker_presence_handler.user_syncing( user_id, "dev-1", @@ -1279,20 +1271,13 @@ def test_set_presence_from_syncing_multi_device( presence_state=dev_1_state, ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - self.get_success(sync_d) # 2. Wait half the idle timer. self.reactor.advance(IDLE_TIMER / 1000 / 2) self.reactor.pump([0.1]) # 3. Sync with the second device. - sync_d = ensureDeferred( + self.get_success( worker_presence_handler.user_syncing( user_id, "dev-2", @@ -1300,13 +1285,6 @@ def test_set_presence_from_syncing_multi_device( presence_state=dev_2_state, ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - self.get_success(sync_d) # 4. Assert the expected presence state. state = self.get_success( @@ -1324,22 +1302,14 @@ def test_set_presence_from_syncing_multi_device( # # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER. if test_with_workers: - sync_d = ensureDeferred( + with self.get_success( worker_presence_handler.user_syncing( f"@other-user:{self.hs.config.server.server_name}", "dev-3", affect_presence=True, presence_state=PresenceState.ONLINE, ) - ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that - # uses the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for - # it to fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - - with self.get_success(sync_d): + ): pass # 5. Advance such that the first device should be discarded (the idle timer), @@ -1527,7 +1497,7 @@ def test_set_presence_from_non_syncing_multi_device( worker_presence_handler = worker_to_sync_against.get_presence_handler() # 1. Sync with the first device. - sync_d = ensureDeferred( + sync_1 = self.get_success( worker_presence_handler.user_syncing( user_id, "dev-1", @@ -1535,16 +1505,9 @@ def test_set_presence_from_non_syncing_multi_device( presence_state=dev_1_state, ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - sync_1 = self.get_success(sync_d) # 2. Sync with the second device. - sync_d = ensureDeferred( + sync_2 = self.get_success( worker_presence_handler.user_syncing( user_id, "dev-2", @@ -1552,13 +1515,6 @@ def test_set_presence_from_non_syncing_multi_device( presence_state=dev_2_state, ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - sync_2 = self.get_success(sync_d) # 3. Assert the expected presence state. state = self.get_success( @@ -1660,18 +1616,11 @@ def test_set_presence_from_syncing_keeps_busy( # Perform a sync with a presence state other than busy. This should NOT change # our presence status; we only change from busy if we explicitly set it via # /presence/*. - sync_d = ensureDeferred( + self.get_success( worker_to_sync_against.get_presence_handler().user_syncing( self.user_id, self.device_id, True, PresenceState.ONLINE ) ) - # `user_syncing` proxies the presence write to the main process over an HTTP - # replication request. The request body is streamed by a `Cooperator` that uses - # the clock to schedule each chunk at a tiny *non-zero* delay - # (`CLOCK_SCHEDULE_EPSILON`), so we need to actually advance the clock for it to - # fire. - self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) - self.get_success(sync_d) # Check against the main process that the user's presence did not change. state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) diff --git a/tests/unittest.py b/tests/unittest.py index bb3c31683ce..5f7d0b3abf2 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -77,7 +77,7 @@ from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester -from synapse.util.clock import Clock +from synapse.util.clock import CLOCK_SCHEDULE_EPSILON, Clock from synapse.util.httpresourcetree import create_resource_tree from tests.server import ( @@ -787,7 +787,12 @@ def _wait_for_deferred( # # In terms of other threads, they may have scheduled something on the # reactor to run (like `reactor.callFromThread(...)`) - self.reactor.advance(0) + # + # Ideally, we'd advance by `0` but the `Cooperator` used in our HTTP clients + # use `CLOCK_SCHEDULE_EPSILON` and we want to make usage in downstream tests + # as simple as possible. A common use case this helps with is anything that + # needs to make a HTTP request (like a replication requests) + self.reactor.advance(CLOCK_SCHEDULE_EPSILON.as_secs()) loop_count += 1 From 3cb3c662d4084e3125e7c362c68f21a02aebde1c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 23:35:47 -0500 Subject: [PATCH 55/57] Fix `tests.crypto.test_keyring` ``` [FAIL] Traceback (most recent call last): File "/home/runner/work/synapse/synapse/tests/crypto/test_keyring.py", line 617, in test_get_keys_from_perspectives self.assertEqual(res.added_ts, self.reactor.seconds() * 1000) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.10/lib/python3.10/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual super().assertEqual(first, second, msg) File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 845, in assertEqual assertion_func(first, second, msg=msg) File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/unittest/case.py", line 838, in _baseAssertEqual raise self.failureException(msg) twisted.trial.unittest.FailTest: 100000 != 100000.003 tests.crypto.test_keyring.PerspectivesKeyFetcherTestCase.test_get_keys_from_perspectives ``` --- tests/crypto/test_keyring.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 6bc935f2720..01561b0d413 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -499,7 +499,7 @@ async def get_json(destination: str, path: str, **kwargs: Any) -> JsonDict: res = key_json[testverifykey_id] self.assertIsNotNone(res) assert res is not None - self.assertEqual(res.added_ts, self.reactor.seconds() * 1000) + self.assertEqual(res.added_ts, self.clock.time_msec()) self.assertEqual(res.valid_until_ts, VALID_UNTIL_TS) # we expect it to be encoded as canonical json *before* it hits the db @@ -614,7 +614,7 @@ def test_get_keys_from_perspectives(self) -> None: res = key_json[testverifykey_id] self.assertIsNotNone(res) assert res is not None - self.assertEqual(res.added_ts, self.reactor.seconds() * 1000) + self.assertEqual(res.added_ts, self.clock.time_msec()) self.assertEqual(res.valid_until_ts, VALID_UNTIL_TS) self.assertEqual(res.key_json, canonicaljson.encode_canonical_json(response)) @@ -732,7 +732,7 @@ def test_get_perspectives_own_key(self) -> None: res = key_json[testverifykey_id] self.assertIsNotNone(res) assert res is not None - self.assertEqual(res.added_ts, self.reactor.seconds() * 1000) + self.assertEqual(res.added_ts, self.clock.time_msec()) self.assertEqual(res.valid_until_ts, VALID_UNTIL_TS) self.assertEqual(res.key_json, canonicaljson.encode_canonical_json(response)) From c37b6ce6641daf7cb89949eb71e8af6852d70fef Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 23:37:28 -0500 Subject: [PATCH 56/57] Fix `tests.util.test_task_scheduler.TestTaskScheduler.test_cancel_running_task` ``` Traceback (most recent call last): File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 276, in test_cancel_running_task self._test_cancel_task(task_id) File "/home/runner/work/synapse/synapse/tests/util/test_task_scheduler.py", line 241, in _test_cancel_task assert new_counter == current_counter builtins.AssertionError: tests.util.test_task_scheduler.TestTaskScheduler.test_cancel_running_task ``` --- tests/util/test_task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index 94c1d778e63..cab9695d33b 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -260,7 +260,7 @@ async def _incrementing_running_task( await self.task_scheduler.update_task( task.id, result={"counter": current_counter} ) - await self.hs.get_clock().sleep(Duration(microseconds=1)) + await self.hs.get_clock().sleep(Duration(seconds=1)) return TaskStatus.COMPLETE, None, None # type: ignore[unreachable] From 6e86cf314d22caf1c6a5253b58dc92c8457aa386 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Jul 2026 23:52:33 -0500 Subject: [PATCH 57/57] Fix `tests.handlers.test_oidc.OidcHandlerTestCase.test_exchange_code_jwt_key` ``` [FAIL] Traceback (most recent call last): File "/home/runner/work/synapse/synapse/tests/handlers/test_oidc.py", line 984, in test_exchange_code_jwt_key self.assertEqual(claims["iat"], start_time) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.14/lib/python3.14/site-packages/twisted/trial/_synctest.py", line 444, in assertEqual super().assertEqual(first, second, msg) File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 925, in assertEqual assertion_func(first, second, msg=msg) File "/opt/hostedtoolcache/Python/3.14.6/x64/lib/python3.14/unittest/case.py", line 918, in _baseAssertEqual raise self.failureException(msg) twisted.trial.unittest.FailTest: 1000 != 1000.000001 tests.handlers.test_oidc.OidcHandlerTestCase.test_exchange_code_jwt_key ``` --- tests/handlers/test_oidc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index 62b84c77a4d..b81c2954dc2 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -960,7 +960,7 @@ def test_exchange_code_jwt_key(self) -> None: # advance the clock a bit before we start, so we aren't working with zero # timestamps. self.reactor.advance(1000) - start_time = self.reactor.seconds() + start_time_s = int(self.reactor.seconds()) ret = self.get_success(self.provider._exchange_code(code, code_verifier="")) self.assertEqual(ret, token) @@ -981,8 +981,8 @@ def test_exchange_code_jwt_key(self) -> None: self.assertEqual(claims["aud"], ISSUER) self.assertEqual(claims["iss"], "DEFGHI") self.assertEqual(claims["sub"], CLIENT_ID) - self.assertEqual(claims["iat"], start_time) - self.assertGreater(claims["exp"], start_time) + self.assertEqual(claims["iat"], start_time_s) + self.assertGreater(claims["exp"], start_time_s) # check the rest of the POSTed data self.assertEqual(args["grant_type"], ["authorization_code"])