-
Notifications
You must be signed in to change notification settings - Fork 561
Remove custom till_deferred_has_result(...) in favor of HomeserverTestCase.wait_on_thread(...) to drive async Rust (Tokio runtime/thread pool)
#19867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0d3fbf9
f23daba
1bd4c9d
488f055
5801844
ef59b6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Remove custom `till_deferred_has_result(...)` in favor of `HomeserverTestCase.wait_on_thread(...)` to drive async Rust (Tokio runtime/thread pool). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,11 +21,10 @@ | |
|
|
||
| import json | ||
| import threading | ||
| import time | ||
| from http import HTTPStatus | ||
| from http.server import BaseHTTPRequestHandler, HTTPServer | ||
| from io import BytesIO | ||
| from typing import Any, ClassVar, Coroutine, Generator, TypeVar, Union | ||
| from typing import Any, ClassVar, TypeVar | ||
| from unittest.mock import ANY, AsyncMock, Mock | ||
| from urllib.parse import parse_qs | ||
|
|
||
|
|
@@ -37,7 +36,6 @@ | |
| ) | ||
| from signedjson.sign import sign_json | ||
|
|
||
| from twisted.internet.defer import Deferred, ensureDeferred | ||
| from twisted.internet.testing import MemoryReactor | ||
|
|
||
| from synapse.api.auth.mas import MasDelegatedAuth | ||
|
|
@@ -809,31 +807,6 @@ class MasAuthDelegation(HomeserverTestCase): | |
| def device_scope(self) -> str: | ||
| return self.device_scope_prefix + DEVICE | ||
|
|
||
| def till_deferred_has_result( | ||
| self, | ||
| awaitable: Union[ | ||
| "Coroutine[Deferred[Any], Any, T]", | ||
| "Generator[Deferred[Any], Any, T]", | ||
| "Deferred[T]", | ||
| ], | ||
| ) -> "Deferred[T]": | ||
| """Wait until a deferred has a result. | ||
|
|
||
| This is useful because the Rust HTTP client will resolve the deferred | ||
| using reactor.callFromThread, which are only run when we call | ||
| reactor.advance. | ||
| """ | ||
| deferred = ensureDeferred(awaitable) | ||
| tries = 0 | ||
| while not deferred.called: | ||
| time.sleep(0.1) | ||
| self.reactor.advance(0) | ||
| tries += 1 | ||
| if tries > 100: | ||
| raise Exception("Timed out waiting for deferred to resolve") | ||
|
|
||
| return deferred | ||
|
|
||
| def default_config(self) -> dict[str, Any]: | ||
| config = super().default_config() | ||
| config["public_baseurl"] = BASE_URL | ||
|
|
@@ -884,9 +857,9 @@ def test_simple_introspection(self) -> None: | |
| } | ||
|
|
||
| requester = self.get_success( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ) | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) | ||
| ) | ||
|
|
||
| self.assertEqual(requester.user.to_string(), USER_ID) | ||
|
|
@@ -907,9 +880,9 @@ def test_unexpiring_token(self) -> None: | |
| } | ||
|
|
||
| requester = self.get_success( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ) | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) | ||
|
Comment on lines
+883
to
+885
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is repetitive but it's nice to know why the |
||
| ) | ||
|
|
||
| self.assertEqual(requester.user.to_string(), USER_ID) | ||
|
|
@@ -931,9 +904,9 @@ def test_inexistent_device(self) -> None: | |
| } | ||
|
|
||
| failure = self.get_failure( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ), | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), | ||
| InvalidClientTokenError, | ||
| ) | ||
| self.assertEqual(failure.value.code, 401) | ||
|
|
@@ -948,9 +921,9 @@ def test_inexistent_user(self) -> None: | |
| } | ||
|
|
||
| failure = self.get_failure( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ), | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), | ||
| AuthError, | ||
| ) | ||
| # This is a 500, it should never happen really | ||
|
|
@@ -966,9 +939,9 @@ def test_missing_scope(self) -> None: | |
| } | ||
|
|
||
| failure = self.get_failure( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ), | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), | ||
| InvalidClientTokenError, | ||
| ) | ||
| self.assertEqual(failure.value.code, 401) | ||
|
|
@@ -977,9 +950,9 @@ def test_invalid_response(self) -> None: | |
| self.server.introspection_response = {} | ||
|
|
||
| failure = self.get_failure( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ), | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")), | ||
| SynapseError, | ||
| ) | ||
| self.assertEqual(failure.value.code, 503) | ||
|
|
@@ -995,9 +968,9 @@ def test_device_id_in_body(self) -> None: | |
| } | ||
|
|
||
| requester = self.get_success( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ) | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) | ||
| ) | ||
|
|
||
| self.assertEqual(requester.device_id, DEVICE) | ||
|
|
@@ -1012,9 +985,9 @@ def test_admin_scope(self) -> None: | |
| } | ||
|
|
||
| requester = self.get_success( | ||
| self.till_deferred_has_result( | ||
| self._auth.get_user_by_access_token("some_token") | ||
| ) | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_access_token("some_token")) | ||
| ) | ||
|
|
||
| self.assertEqual(requester.user.to_string(), USER_ID) | ||
|
|
@@ -1041,7 +1014,9 @@ def test_cached_expired_introspection(self) -> None: | |
|
|
||
| # The first CS-API request causes a successful introspection | ||
| self.get_success( | ||
| self.till_deferred_has_result(self._auth.get_user_by_req(request)) | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_req(request)) | ||
| ) | ||
| self.assertEqual(self.server.calls, 1) | ||
|
|
||
|
|
@@ -1050,7 +1025,9 @@ def test_cached_expired_introspection(self) -> None: | |
|
|
||
| # Now the CS-API request fails because the token expired | ||
| self.assertFailure( | ||
| self.till_deferred_has_result(self._auth.get_user_by_req(request)), | ||
| # We have to wait for the async Rust HTTP client (running on the Tokio | ||
| # thread pool) to do its thing (see `create_deferred(...)` usage) | ||
| self.wait_on_thread(self._auth.get_user_by_req(request)), | ||
| InvalidClientTokenError, | ||
| ) | ||
| # Ensure another introspection request was not sent | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,77 +19,78 @@ | |
| # [This file includes modifications made by New Vector Limited] | ||
| # | ||
| # | ||
| import os | ||
| import functools | ||
| import gc | ||
| import hashlib | ||
| import hmac | ||
| import json | ||
| import logging | ||
| import secrets | ||
| import time | ||
| from typing import ( | ||
| AbstractSet, | ||
| Any, | ||
| Awaitable, | ||
| Callable, | ||
| ClassVar, | ||
| Generic, | ||
| Iterable, | ||
| Mapping, | ||
| NoReturn, | ||
| Optional, | ||
| Protocol, | ||
| TypeVar, | ||
| ) | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| import canonicaljson | ||
| import signedjson.key | ||
| import unpaddedbase64 | ||
| from typing_extensions import Concatenate, ParamSpec | ||
|
|
||
| from twisted.internet.defer import Deferred, ensureDeferred | ||
| from twisted.internet.testing import MemoryReactor, MemoryReactorClock | ||
| from twisted.python.failure import Failure | ||
| from twisted.python.threadpool import ThreadPool | ||
| from twisted.trial import unittest | ||
| from twisted.web.resource import Resource | ||
| from twisted.web.server import Request | ||
|
|
||
| from synapse import events | ||
| from synapse.api.constants import EventTypes | ||
| from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion | ||
| from synapse.config._base import Config, RootConfig | ||
| from synapse.config.homeserver import HomeServerConfig | ||
| from synapse.config.server import DEFAULT_ROOM_VERSION | ||
| from synapse.crypto.event_signing import add_hashes_and_signatures | ||
| from synapse.federation.transport.server import TransportLayerServer | ||
| from synapse.http.server import JsonResource, OptionsResource | ||
| from synapse.http.site import SynapseRequest, SynapseSite | ||
| from synapse.logging.context import ( | ||
| SENTINEL_CONTEXT, | ||
| LoggingContext, | ||
| current_context, | ||
| set_current_context, | ||
| ) | ||
| from synapse.rest import RegisterServletsFunc | ||
| from synapse.server import HomeServer | ||
| from synapse.storage.keys import FetchKeyResult | ||
| from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester | ||
| from synapse.util.clock import Clock | ||
| from synapse.util.httpresourcetree import create_resource_tree | ||
|
|
||
| from tests.server import ( | ||
| CustomHeaderType, | ||
| FakeChannel, | ||
| ThreadedMemoryReactorClock, | ||
| get_clock, | ||
| make_request, | ||
| setup_test_homeserver, | ||
| ) | ||
| from tests.test_utils import event_injection, setup_awaitable_errors | ||
| from tests.test_utils.logging_setup import setup_logging | ||
| from tests.utils import checked_cast, default_config, setupdb | ||
|
|
||
| setupdb() | ||
| setup_logging() | ||
|
|
@@ -474,17 +475,56 @@ | |
| # Reset to not use frozen dicts. | ||
| events.USE_FROZEN_DICTS = False | ||
|
|
||
| def wait_on_thread(self, deferred: Deferred, timeout: int = 10) -> None: | ||
| def wait_on_thread( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| self, | ||
| awaitable: Awaitable[TV], | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this to accept any awaitable ( |
||
| timeout: int = 10, | ||
| ) -> "Deferred[TV]": | ||
| """ | ||
| Wait until a Deferred is done, where it's waiting on a real thread. | ||
| Wait until the Awaitable is done, where it's waiting on a real thread. This | ||
| could be things spawned on the Twisted reactor threadpool or Tokio runtime | ||
| (async Rust code). | ||
|
|
||
| Ideally, this behavior (giving time for other threads to drive forward) would be | ||
| built-in to wherever we drive the Twisted reactor but we don't want to slow down | ||
| the entire test suite with real sleeps. | ||
|
|
||
| With other functions like `get_success(...)`, it only advances the reactor's | ||
| *virtual* clock in a tight loop, never yielding real wall-clock time, so the | ||
| Tokio threads never get a chance to run. Instead we advance the Twisted reactor | ||
| while also sleeping a little real time each iteration. | ||
|
|
||
| Args: | ||
| awaitable: The thing to wait for | ||
| timeout: The maximum amount of real time we should before giving up | ||
|
|
||
| Returns: | ||
| Deferred (wrapping the awaitable that was passed in) | ||
| """ | ||
| start_time = time.time() | ||
|
|
||
| deferred: Deferred[TV] = ensureDeferred(awaitable) # type: ignore[arg-type] | ||
| while not deferred.called: | ||
| if start_time + timeout < time.time(): | ||
| raise ValueError("Timed out waiting for threadpool") | ||
| self.reactor.advance(0.01) | ||
| time.sleep(0.01) | ||
| raise ValueError( | ||
| "Timed out waiting for work happening on a thread to finish" | ||
| ) | ||
| # Give some real wall-clock time for other threads to do work. This could be | ||
| # things spawned on the Twisted reactor threadpool or Tokio thread pool | ||
| # (async Rust code). | ||
| # time.sleep(0) | ||
| # Suspend execution of this thread to allow other threads to do work. This | ||
| # could be things spawned on the Twisted reactor threadpool or Tokio thread | ||
| # pool (async Rust code). | ||
| # | ||
| # We could also use `time.sleep(0)` here | ||
| os.sched_yield() | ||
| # Advance the Twisted reactor as the thread may have scheduled something on | ||
| # the reactor to run (like `reactor.callFromThread(...)`) | ||
| self.reactor.advance(0) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this from advancing time by
Comment on lines
+522
to
+524
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the order to advance the Twisted reactor after we give some time to other threads to make some progress. The thinking is that the thread can do some work, potentially scheduling some things on reactor, and then we unblock that. Since we're iterating in a loop, the order probably doesn't matter that much but perhaps this makes more sense. |
||
|
|
||
| # Make it easy to chain other things | ||
| return deferred | ||
|
|
||
| def wait_for_background_updates(self) -> None: | ||
| """Block until all background database updates have completed.""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, not very satisfied with this API shape. I think it's good enough as another iteration. Perhaps a better name?
Ideally, we could just do
self.get_success(self._auth.get_user_by_access_token("some_token")))which would drive things to completion regardless of the kind of work necessary (Python or Rust).But adding real sleeps to
get_success(...)for everything would result in slowing down the entire test suite.We could potentially add a new attribute to
HomeserverTestCaselikedrive_work_on_threads(like the existingneeds_threadpool) which would conditionally sleep for real inget_success(...). This is half-decent but having to realize this obscure detail makes things work kinda sucks.For example, this is how
needs_threadpoolis defined on a test case basis:synapse/tests/media/test_media_storage.py
Lines 71 to 72 in d3fc819
Or even better if we could automatically detect when there is work to be done on the Tokio thread pool and do the real sleep loop. Probably have to detect this by using the Tokio
RuntimeMetrics. I think it would be better to explore this as a follow-up though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think that we can actually change the sleep to be
time.sleep(0), since that is apparently enough to signal that other threads should run. I've tried it locally and the teststests/handlers/test_oauth_delegation.pytests/synapse_rust/test_http_client.pyseem to pass?At which point this could be added to
get_successI think?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.sleep(0)does seem to work 👍. If I look at the docs fortime.sleep(...)("Suspend execution of the calling thread [...]"), it also mentions that you can useos.sched_yield()("Voluntarily relinquish the CPU.") which also works.https://discuss.python.org/t/time-sleep-0-yield-behaviour/27185/5 mentions that
time.sleep(0)releases the GIL whileos.sched_yield()doesn't. But I think that is now fixed (looks like it was part of Python 3.10): python/cpython#96078It's unclear what's better, constant context switching or just allowing some time. I would have assumed that constant context switching would have some impact but the results from #19871 show that it doesn't seem to slow things down in a noticeable way.