diff --git a/bec_widgets/applications/companion_app.py b/bec_widgets/applications/companion_app.py index 05944463e..43524194a 100644 --- a/bec_widgets/applications/companion_app.py +++ b/bec_widgets/applications/companion_app.py @@ -5,6 +5,7 @@ import os import signal import sys +import traceback from contextlib import redirect_stderr, redirect_stdout import darkdetect @@ -63,6 +64,7 @@ def __init__(self, args): self.app: QApplication | None = None self.launcher_window: LaunchWindow | None = None self.dispatcher: BECDispatcher | None = None + self._shutdown_started = False def start(self): """ @@ -74,6 +76,7 @@ def start(self): bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR bec_logger._update_sinks() + bec_logger.disabled_modules = ["bec_lib.scan_items"] with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore self._run() @@ -122,17 +125,8 @@ def _run(self): self.app.aboutToQuit.connect(self.shutdown) self.app.setQuitOnLastWindowClosed(True) - def sigint_handler(*args): - # display message, for people to let it terminate gracefully - print("Caught SIGINT, exiting") - # Widgets should be all closed. - with RPCRegister.delayed_broadcast(): - for widget in QApplication.instance().topLevelWidgets(): # type: ignore - widget.close() - self.shutdown() - - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGINT, self.request_shutdown) + signal.signal(signal.SIGTERM, self.request_shutdown) sys.exit(self.app.exec()) @@ -149,16 +143,67 @@ def setup_bec_icon(self): ) self.app.setWindowIcon(icon) + def request_shutdown(self, signum=None, _frame=None): + """ + Request Qt application shutdown from an RPC call or OS signal. + + Cleanup itself is handled by ``shutdown()``, which is connected to + ``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC + teardown before Qt has processed the widget close events. + """ + signal_name = signal.Signals(signum).name if signum is not None else "shutdown" + pid = os.getpid() + if self.app is None: + logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app") + self.shutdown() + return + + widgets = [ + f"{widget.__class__.__name__}(objectName={widget.objectName()!r})" + for widget in self.app.topLevelWidgets() + ] + logger.info( + f"Caught {signal_name}, requesting GUI server shutdown pid={pid} " + f"top_level_widgets={widgets}" + ) + with RPCRegister.delayed_broadcast(): + for widget in self.app.topLevelWidgets(): + widget.close() + self.app.quit() + + @staticmethod + def _run_shutdown_step(step: str, callback): + try: + callback() + except Exception as exc: + logger.error( + f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n" + f"{traceback.format_exc()}" + ) + def shutdown(self): - logger.info("Shutdown GUIServer", repr(self)) - if self.launcher_window and shiboken6.isValid(self.launcher_window): - self.launcher_window.close() - self.launcher_window.deleteLater() - if pylsp_server.is_running(): - pylsp_server.stop() - if self.dispatcher: - self.dispatcher.stop_cli_server() - self.dispatcher.disconnect_all() + if self._shutdown_started: + return + self._shutdown_started = True + logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}") + + def close_launcher_window(): + if self.launcher_window and shiboken6.isValid(self.launcher_window): + self.launcher_window.close() + self.launcher_window.deleteLater() + + def stop_pylsp_server(): + if pylsp_server.is_running(): + pylsp_server.stop() + + def stop_dispatcher(): + if self.dispatcher: + self.dispatcher.stop_cli_server() + self.dispatcher.disconnect_all() + + self._run_shutdown_step("close_launcher_window", close_launcher_window) + self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server) + self._run_shutdown_step("stop_dispatcher", stop_dispatcher) def main(): diff --git a/bec_widgets/applications/launch_window.py b/bec_widgets/applications/launch_window.py index 4eae81a48..50f8e3b1b 100644 --- a/bec_widgets/applications/launch_window.py +++ b/bec_widgets/applications/launch_window.py @@ -660,20 +660,35 @@ def _launcher_is_last_widget(self, connections: dict) -> bool: Check if the launcher is the last widget in the application. """ - # get all parents of connections for connection in connections.values(): - try: - parent = connection.parent() - if parent is None and connection.objectName() != self.objectName(): - logger.info( - f"Found non-launcher connection without parent: {connection.objectName()}" - ) - return False - except Exception as e: - logger.error(f"Error getting parent of connection: {e}") + if not self._connection_belongs_to_launcher(connection): return False return True + def _connection_belongs_to_launcher(self, connection: QObject) -> bool: + """ + Check whether a registered connection is the launcher itself or part of its Qt hierarchy. + + Registered top-level windows such as BECMainWindowNoRPC are expected when another GUI is + open. They are not launcher children, but they are also not an error condition. + """ + try: + if connection is self or getattr(connection, "gui_id", None) == self.gui_id: + return True + if connection.objectName() == self.objectName(): + return True + + parent = connection.parent() + while parent is not None: + if parent is self: + return True + parent = parent.parent() + except Exception as e: + logger.error(f"Error checking launcher ownership of connection: {e}") + return False + + return False + def _turn_off_the_lights(self, connections: dict): """ If there is only one connection remaining, it is the launcher, so we show it. diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 21276773a..f32f73a6a 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -5,6 +5,7 @@ import json import os import select +import signal import subprocess import threading import time @@ -33,6 +34,12 @@ logger = bec_logger.logger IGNORE_WIDGETS = ["LaunchWindow"] +PROCESS_TERMINATION_TIMEOUT = 10 +PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 +PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2 +GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3 +GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 +OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event" RegistryState: TypeAlias = dict[ Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"], @@ -53,14 +60,16 @@ def _filter_output(output: str) -> str: return output -def _get_output(process, logger) -> None: +def _get_output(process, logger, stop_event: threading.Event | None = None) -> None: log_func = {process.stdout: logger.debug, process.stderr: logger.info} stream_buffer = {process.stdout: [], process.stderr: []} try: os.set_blocking(process.stdout.fileno(), False) os.set_blocking(process.stderr.fileno(), False) - while process.poll() is None: - readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1) + while process.poll() is None and not (stop_event and stop_event.is_set()): + readylist, _, _ = select.select( + [process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT + ) for stream in (process.stdout, process.stderr): buf = stream_buffer[stream] if stream in readylist: @@ -75,6 +84,122 @@ def _get_output(process, logger) -> None: logger.error(f"Error reading process output: {str(e)}") +def _process_group_id(process) -> int | None: + pid = getattr(process, "pid", None) + if os.name != "posix" or not isinstance(pid, int): + return None + try: + return os.getpgid(pid) + except ProcessLookupError: + return None + + +def _process_details(process) -> str: + args = getattr(process, "args", None) + if isinstance(args, list): + command = " ".join(str(arg) for arg in args) + else: + command = str(args) + return ( + f"pid={getattr(process, 'pid', None)} pgid={_process_group_id(process)} command={command}" + ) + + +def _process_group_snapshot(process) -> str: + pgid = _process_group_id(process) + if pgid is None: + return "Process group snapshot unavailable: process group no longer exists" + try: + result = subprocess.run( + ["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)], + check=False, + capture_output=True, + text=True, + timeout=2, + ) + except Exception as exc: + return f"Process group snapshot unavailable: {exc}" + output = result.stdout.strip() + if not output: + return f"Process group snapshot empty for pgid={pgid}" + return output + + +def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None: + if process.poll() is not None: + return + + process_details = _process_details(process) + try: + pgid = _process_group_id(process) + if pgid is not None: + logger.info(f"Terminating GUI process group {process_details}") + os.killpg(pgid, signal.SIGTERM) + else: + logger.info(f"Terminating GUI process {process_details}") + process.terminate() + except ProcessLookupError: + process.wait(timeout=timeout) + return + except Exception as exc: + logger.warning("Failed to terminate GUI process group; terminating process only.") + logger.info(f"GUI process termination failure details: {exc}. {process_details}") + process.terminate() + + try: + process.wait(timeout=timeout) + return + except subprocess.TimeoutExpired: + logger.warning(f"GUI process did not stop within {timeout}s; killing it.") + logger.info( + f"GUI process force-kill details: {process_details}\n" + f"{_process_group_snapshot(process)}" + ) + + try: + pgid = _process_group_id(process) + if pgid is not None: + os.killpg(pgid, signal.SIGKILL) + else: + process.kill() + except ProcessLookupError as e: + logger.error(f"Failed to kill GUI process group: {e}") + process.wait(timeout=timeout) + return + process.wait(timeout=timeout) + + +def _wait_for_process_exit(process, timeout: float) -> bool: + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + return False + return True + + +def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None: + if thread is None: + return + thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) + if not thread.is_alive(): + return + + if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None): + stop_event.set() + + for stream in (process.stdout, process.stderr): + if stream is None: + continue + try: + stream.close() + except OSError as e: + logger.error(f"Failed to close stream {str(e)}") + thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) + if thread.is_alive(): + logger.warning("GUI process output reader thread did not stop after process shutdown.") + logger.info(f"GUI process output reader thread details: {_process_details(process)}") + + def _start_plot_process( gui_id: str, gui_class_id: str, @@ -126,8 +251,14 @@ def _start_plot_process( if logger is None: process_output_processing_thread = None else: + process_output_stop_event = threading.Event() process_output_processing_thread = threading.Thread( - target=_get_output, args=(process, logger) + target=_get_output, args=(process, logger, process_output_stop_event) + ) + setattr( + process_output_processing_thread, + OUTPUT_READER_STOP_EVENT_ATTR, + process_output_stop_event, ) process_output_processing_thread.start() return process, process_output_processing_thread @@ -222,7 +353,7 @@ def __init__(self, **kwargs) -> None: self._ipython_registry: dict[str, RPCReference] = {} self.available_widgets = AvailableWidgetsNamespace() register_serializer_extension() - self._rpc_timeout = 5 + self._rpc_timeout = 60 #################### #### Client API #### @@ -465,11 +596,13 @@ def kill_server(self) -> None: if self._process: logger.success("Stopping GUI...") - self._process.terminate() - if self._process_output_processing_thread: - self._process_output_processing_thread.join() - self._process.wait() + if not self._request_server_shutdown(): + _terminate_plot_process(self._process, logger) + _join_process_output_thread( + self._process, self._process_output_processing_thread, logger + ) self._process = None + self._process_output_processing_thread = None # Unregister the registry state self._client.connector.unregister( @@ -488,6 +621,37 @@ def close(self): #### Private methods #### ######################### + def _request_server_shutdown(self) -> bool: + if self._process is None or self._process.poll() is not None: + return True + process_details = _process_details(self._process) + logger.info(f"Requesting graceful GUI shutdown {process_details}") + try: + self.launcher._run_rpc( # pylint: disable=protected-access + "system.shutdown", + wait_for_rpc_response=True, + timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, + ) + except Exception as exc: + logger.warning( + "Could not confirm graceful GUI shutdown via RPC; " + "falling back to process termination." + ) + logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}") + return False + if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT): + logger.info(f"GUI server exited after graceful shutdown {process_details}") + return True + logger.warning( + "GUI server did not exit after graceful shutdown request; " + "falling back to process termination." + ) + logger.info( + f"Graceful GUI shutdown timeout details: {process_details}\n" + f"{_process_group_snapshot(self._process)}" + ) + return False + def _check_if_server_is_alive(self): """Checks if the process is alive""" if self._process is None: diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 2523789e1..a5061ae48 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -2,6 +2,7 @@ import inspect import threading +import time import uuid from functools import wraps from typing import TYPE_CHECKING, Any, cast @@ -9,6 +10,7 @@ from bec_lib.client import BECClient from bec_lib.device import DeviceBaseWithConfig from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from if TYPE_CHECKING: # pragma: no cover @@ -25,6 +27,7 @@ # pylint: disable=protected-access _DEFAULT_RPC_TIMEOUT = object() +logger = bec_logger.logger def _name_arg(arg): @@ -261,12 +264,39 @@ def _run_rpc( MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response ) + target_gui_id = gui_id or self._gui_id + sent_at = time.time() + deadline = sent_at + timeout if timeout is not None else None + rpc_msg.metadata.update( + { + "method": method, + "receiver": receiver, + "target_gui_id": target_gui_id, + "object_name": self.object_name, + "wait_for_response": wait_for_rpc_response, + "timeout": timeout, + "sent_at": sent_at, + "deadline": deadline, + } + ) + logger.info( + "Sending GUI RPC request " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"wait_for_response={wait_for_rpc_response} timeout={timeout}" + ) self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) if wait_for_rpc_response: try: finished = self._msg_wait_event.wait(timeout) if not finished: + logger.error( + "GUI RPC response timeout " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"timeout={timeout}" + ) raise RPCResponseTimeoutError(request_id, timeout) finally: self._msg_wait_event.clear() @@ -278,6 +308,12 @@ def _run_rpc( # the _on_rpc_response method assert isinstance(self._rpc_response, messages.RequestResponseMessage) + logger.info( + "Received GUI RPC response " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"accepted={self._rpc_response.accepted}" + ) if not self._rpc_response.accepted: raise ValueError(self._rpc_response.message["error"]) msg_result = self._rpc_response.message.get("result") @@ -286,6 +322,7 @@ def _run_rpc( def _on_rpc_response(self, msg_obj: MessageObject) -> None: msg = cast(messages.RequestResponseMessage, msg_obj.value) + logger.debug(f"GUI RPC response callback received: {msg}") self._rpc_response = msg self._msg_wait_event.set() diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index af739ef27..6f1453155 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -3,8 +3,9 @@ import collections import random import string +import time from collections.abc import Callable -from typing import TYPE_CHECKING, DefaultDict, Hashable, Union +from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union import louie import redis @@ -15,6 +16,7 @@ from qtpy.QtCore import QObject from qtpy.QtCore import Signal as pyqtSignal +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed from bec_widgets.utils.serialization import register_serializer_extension logger = bec_logger.logger @@ -25,6 +27,39 @@ from bec_widgets.utils.rpc_server import RPCServer +def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: + if not isinstance(msg_content, dict) or not isinstance(metadata, dict): + return + request_id = metadata.get("request_id") + method = msg_content.get("action") + parameter = msg_content.get("parameter") + if request_id is None or method is None or not isinstance(parameter, dict): + return + + dispatch_received_at = time.time() + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at) + stale_on_dispatch = deadline is not None and dispatch_received_at > deadline + target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") + + logger.info( + "GUI RPC dispatcher received request before Qt callback emit " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " + f"stale_on_dispatch={stale_on_dispatch}" + ) + if stale_on_dispatch: + logger.warning( + "GUI RPC dispatcher received request after client timeout deadline " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}" + ) + + class QtThreadSafeCallback(QObject): """QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt.""" @@ -88,10 +123,12 @@ def _execute_callback(self, cb, msg, kwargs): # we can notice kwargs are lost when passed to Qt slot metadata = msg.metadata + _log_rpc_dispatcher_receive(msg.content, metadata) cb(msg.content, metadata) else: # from stream msg = msg["data"] + _log_rpc_dispatcher_receive(msg.content, msg.metadata) cb(msg.content, msg.metadata) diff --git a/bec_widgets/utils/rpc_logging.py b/bec_widgets/utils/rpc_logging.py new file mode 100644 index 000000000..9ef28b2db --- /dev/null +++ b/bec_widgets/utils/rpc_logging.py @@ -0,0 +1,16 @@ +from __future__ import annotations + + +def elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + +def format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 25fd0cca2..a9446c680 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -1,6 +1,7 @@ from __future__ import annotations import functools +import time import traceback import types from contextlib import contextmanager @@ -11,13 +12,14 @@ from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import from qtpy.QtCore import Qt, QTimer -from qtpy.QtWidgets import QWidget +from qtpy.QtWidgets import QApplication, QWidget from redis.exceptions import RedisError from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.utils.bec_dispatcher import BECDispatcher from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.utils.error_popups import ErrorPopupUtility +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed from bec_widgets.utils.rpc_register import RPCRegister from bec_widgets.utils.screen_utils import apply_window_geometry from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea @@ -115,27 +117,107 @@ def on_rpc_update(self, msg: dict, metadata: dict): if request_id is None: logger.error("Received RPC instruction without request_id") return + method = msg.get("action") + parameter = msg.get("parameter", {}) + args = parameter.get("args", []) + kwargs = parameter.get("kwargs", {}) + target_gui_id = parameter.get("gui_id") + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + received_at = time.time() + receive_latency = elapsed_seconds(sent_at, received_at) + stale_on_receive = deadline is not None and received_at > deadline + logger.info( + "GUI RPC server received request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={format_elapsed(receive_latency)} " + f"stale_on_receive={stale_on_receive}" + ) + if stale_on_receive: + logger.warning( + "GUI RPC server received request after client timeout deadline " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={format_elapsed(receive_latency)}" + ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") + + # Shutdown must acknowledge before teardown starts. The generic RPC path + # below publishes successful responses through QTimer.singleShot(0); + # for system.shutdown that would race with the queued app quit and + # dispatcher shutdown scheduled by _shutdown_gui_server(). + if method == "system.shutdown": + execution_start = time.perf_counter() + try: + self.run_system_rpc(method, args, kwargs) + except Exception: + execution_duration = time.perf_counter() - execution_start + content = traceback.format_exc() + logger.error( + "GUI RPC server shutdown request failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}\n{content}" + ) + self.send_response(request_id, False, {"error": content}) + else: + execution_duration = time.perf_counter() - execution_start + logger.info( + "GUI RPC server acknowledged shutdown request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}" + ) + self.send_response(request_id, True, {"result": None}) + return + + execution_start = time.perf_counter() with rpc_exception_hook(functools.partial(self.send_response, request_id, False)): try: - method = msg["action"] - args = msg["parameter"].get("args", []) - kwargs = msg["parameter"].get("kwargs", {}) if method.startswith("system."): res = self.run_system_rpc(method, args, kwargs) else: - obj = self.get_object_from_config(msg["parameter"]) + obj = self.get_object_from_config(parameter) res = self.run_rpc(obj, method, args, kwargs) except Exception: + execution_duration = time.perf_counter() - execution_start content = traceback.format_exc() - logger.error(f"Error while executing RPC instruction: {content}") + logger.error( + "GUI RPC server execution failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f}\n" + f"{content}" + ) self.send_response(request_id, False, {"error": content}) else: + execution_duration = time.perf_counter() - execution_start + response_stale = deadline is not None and time.time() > deadline + logger.info( + "GUI RPC server executed request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " + f"response_after_client_deadline={response_stale}" + ) + if response_stale: + logger.warning( + "GUI RPC server response is late for client timeout " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"execution_duration_s={execution_duration:.3f}" + ) logger.debug(f"RPC instruction executed successfully: {res}") self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res)) def send_response(self, request_id: str, accepted: bool, msg: dict): + log_message = ( + "GUI RPC server publishing response " + f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}" + ) + if accepted: + logger.info(log_message) + else: + logger.error(log_message) self.client.connector.set_and_publish( MessageEndpoints.gui_instruction_response(request_id), messages.RequestResponseMessage(accepted=accepted, message=msg), @@ -236,10 +318,23 @@ def _resolve_rpc_target(self, obj, method: str) -> tuple[object, object]: def run_system_rpc(self, method: str, args: list, kwargs: dict): if method == "system.launch_dock_area": return self._launch_dock_area(*args, **kwargs) + if method == "system.shutdown": + return self._shutdown_gui_server() if method == "system.list_capabilities": - return {"system.launch_dock_area": True} + return {"system.launch_dock_area": True, "system.shutdown": True} raise ValueError(f"Unknown system RPC method: {method}") + @staticmethod + def _shutdown_gui_server() -> None: + app = QApplication.instance() + if app is None: + return + gui_server = getattr(app, "gui_server", None) + if gui_server is not None and hasattr(gui_server, "request_shutdown"): + QTimer.singleShot(0, gui_server.request_shutdown) + return + QTimer.singleShot(0, app.quit) + @staticmethod def _launch_dock_area( name: str | None = None, @@ -297,7 +392,14 @@ def serialize_result_and_send(self, request_id: str, res: object): res = self.serialize_object(res) except RegistryNotReadyError: try: - self._rpc_singleshot_repeats[request_id] += retry_delay + repeat = self._rpc_singleshot_repeats[request_id] + repeat += retry_delay + logger.warning( + "GUI RPC result serialization delayed; retrying " + f"request_id={request_id} retry_delay_ms={retry_delay} " + f"accumulated_delay_ms={repeat.accumulated_delay} " + f"max_delay_ms={repeat.max_delay}" + ) QTimer.singleShot( retry_delay, lambda: self.serialize_result_and_send(request_id, res) ) @@ -407,8 +509,9 @@ def _serialize_bec_connector(self, connector: BECConnector, wait=False) -> dict: container_proxy = parent.gui_id else: container_proxy = None - except Exception: + except Exception as e: container_proxy = None + logger.error(f"Error while serializing RPC result: {e}") if wait and not self.rpc_register.object_is_registered(connector): raise RegistryNotReadyError(f"Connector {connector} not registered yet") diff --git a/bec_widgets/widgets/control/buttons/button_abort/button_abort.py b/bec_widgets/widgets/control/buttons/button_abort/button_abort.py index 4a9585dc2..c17675020 100644 --- a/bec_widgets/widgets/control/buttons/button_abort/button_abort.py +++ b/bec_widgets/widgets/control/buttons/button_abort/button_abort.py @@ -1,3 +1,4 @@ +from bec_lib.logger import bec_logger from bec_qthemes import material_icon from qtpy.QtCore import Qt from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget @@ -5,6 +6,8 @@ from bec_widgets.utils.bec_widget import BECWidget from bec_widgets.utils.error_popups import SafeSlot +logger = bec_logger.logger + class AbortButton(BECWidget, QWidget): """A button that abort the scan.""" @@ -55,7 +58,7 @@ def abort_scan( scan_id(str|None): The scan id to abort. If None, the current scan will be aborted. """ if self.scan_id is not None: - print(f"Aborting scan with scan_id: {self.scan_id}") + logger.info(f"Aborting scan with scan_id: {self.scan_id}") self.queue.request_scan_abortion(scan_id=self.scan_id) else: self.queue.request_scan_abortion() diff --git a/bec_widgets/widgets/plots/image/image_base.py b/bec_widgets/widgets/plots/image/image_base.py index 57bbf95c5..db8e783e4 100644 --- a/bec_widgets/widgets/plots/image/image_base.py +++ b/bec_widgets/widgets/plots/image/image_base.py @@ -460,7 +460,7 @@ def enable_colorbar( self._color_bar = None def disable_autorange(): - print("Disabling autorange") + logger.info("Disabling autorange") self.setProperty("autorange", False) if style == "simple": @@ -928,7 +928,7 @@ def _set_autorange(self, enabled: bool, sync: bool = True): # if sync: self._sync_colorbar_levels() self._sync_autorange_switch() - print(f"Autorange set to {enabled}") + logger.info(f"Autorange set to {enabled}") @SafeProperty(str) def autorange_mode(self) -> str: diff --git a/bec_widgets/widgets/plots/waveform/waveform.py b/bec_widgets/widgets/plots/waveform/waveform.py index 467a6138a..35a2873ba 100644 --- a/bec_widgets/widgets/plots/waveform/waveform.py +++ b/bec_widgets/widgets/plots/waveform/waveform.py @@ -2449,7 +2449,7 @@ def _check_dataset_size_and_confirm(self, dataset_obj, device_entry: str) -> boo first_key = next(iter(info)) mem_bytes = info[first_key]["value"]["mem_size"] size_mb = mem_bytes / (1024 * 1024) - print(f"Dataset size: {size_mb:.1f} MB") + logger.info(f"Dataset size: {size_mb:.1f} MB") except Exception as exc: # noqa: BLE001 logger.error(f"Unable to evaluate dataset size: {exc}") return True diff --git a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py index 392fecb81..c10f7b3b1 100644 --- a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py +++ b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py @@ -155,7 +155,6 @@ def __init__( self._progress_device = None self.task = None self.scan_number = None - self.progress_started.connect(lambda: print("Scan progress started")) def connect_to_queue(self): """ diff --git a/tests/end-2-end/test_rpc_widgets_e2e.py b/tests/end-2-end/test_rpc_widgets_e2e.py index 19e8109fc..5652701b7 100644 --- a/tests/end-2-end/test_rpc_widgets_e2e.py +++ b/tests/end-2-end/test_rpc_widgets_e2e.py @@ -69,7 +69,7 @@ def create_widget( return widget -@pytest.mark.timeout(100) +@pytest.mark.timeout(20) def test_available_widgets(qtbot, connected_client_gui_obj): """This test checks that all widgets that are available via gui.available_widgets can be created and removed.""" gui = connected_client_gui_obj diff --git a/tests/unit_tests/test_bec_dispatcher.py b/tests/unit_tests/test_bec_dispatcher.py index 12b57c673..900fc3c26 100644 --- a/tests/unit_tests/test_bec_dispatcher.py +++ b/tests/unit_tests/test_bec_dispatcher.py @@ -1,6 +1,7 @@ # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring import threading import time +from types import SimpleNamespace from unittest import mock import pytest @@ -213,3 +214,51 @@ def _get_slots(): send_msg_event.set() qtbot.wait(10) + + +def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch): + info_mock = mock.MagicMock() + warning_mock = mock.MagicMock() + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock) + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock) + + def callback(_msg, _metadata): + pass + + cb = QtThreadSafeCallback(callback) + connector = QtRedisConnector("localhost:1", mock.MagicMock()) + rpc_msg = SimpleNamespace( + content={ + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + metadata={ + "request_id": "dispatcher-request", + "receiver": "gui", + "object_name": "progressbar", + "timeout": 0.1, + "sent_at": 1.0, + "deadline": 1.1, + }, + ) + + try: + connector._execute_callback(cb, {"data": rpc_msg}, {}) + + info_mock.assert_called_once() + info_message = info_mock.call_args.args[0] + assert "GUI RPC dispatcher received request before Qt callback emit" in info_message + assert "request_id=dispatcher-request" in info_message + assert "method=set_value" in info_message + assert "receiver=gui" in info_message + assert "target_gui_id=ring" in info_message + assert "object_name=progressbar" in info_message + assert "timeout=0.1" in info_message + assert "stale_on_dispatch=True" in info_message + + warning_mock.assert_called_once() + warning_message = warning_mock.call_args.args[0] + assert "received request after client timeout deadline" in warning_message + assert "request_id=dispatcher-request" in warning_message + finally: + connector.shutdown() diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index 30f0a5daa..5cf5750f6 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -1,10 +1,18 @@ +import signal +import subprocess from contextlib import contextmanager from unittest import mock import pytest from bec_widgets.cli.client import BECDockArea -from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process +from bec_widgets.cli.client_utils import ( + GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, + OUTPUT_READER_STOP_EVENT_ATTR, + BECGuiClient, + _join_process_output_thread, + _start_plot_process, +) from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout @@ -262,7 +270,105 @@ def test_client_utils_delete_falls_back_to_direct_close(): def test_client_utils_gui_client_set_rpc_timeout(): gui = BECGuiClient() - assert gui._rpc_timeout == 5 + assert gui._rpc_timeout == 60 gui.set_rpc_timeout(10) assert gui._rpc_timeout == 10 + + +def test_client_utils_kill_server_waits_for_process_before_joining_output_thread(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + gui._process = mock.MagicMock(pid=123, stdout=None, stderr=None) + gui._process.poll.return_value = None + order = [] + gui._process.wait.side_effect = lambda timeout: order.append("wait") + gui._process_output_processing_thread = mock.MagicMock() + gui._process_output_processing_thread.join.side_effect = lambda timeout: order.append("join") + gui._process_output_processing_thread.is_alive.return_value = False + + with ( + mock.patch.object(gui, "_request_server_shutdown", return_value=False), + mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123), + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + ): + gui.kill_server() + + killpg.assert_called_once_with(123, signal.SIGTERM) + assert order == ["wait", "join"] + assert gui._process is None + assert gui._process_output_processing_thread is None + + +def test_client_utils_kill_server_requests_graceful_shutdown_before_signal(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + process = mock.MagicMock(stdout=None, stderr=None) + process.poll.return_value = None + gui._process = process + gui._process_output_processing_thread = mock.MagicMock() + gui._process_output_processing_thread.is_alive.return_value = False + launcher = mock.MagicMock() + + with ( + mock.patch.object( + BECGuiClient, "launcher", new_callable=mock.PropertyMock + ) as launcher_prop, + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + ): + launcher_prop.return_value = launcher + gui.kill_server() + + launcher._run_rpc.assert_called_once_with( + "system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT + ) + process.wait.assert_called_once_with(timeout=5) + killpg.assert_not_called() + assert gui._process is None + assert gui._process_output_processing_thread is None + + +def test_client_utils_kill_server_kills_process_group_after_timeout(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + process = mock.MagicMock(pid=123, stdout=None, stderr=None, args=["bec-gui-server"]) + process.poll.return_value = None + process.wait.side_effect = [subprocess.TimeoutExpired(cmd="bec-gui-server", timeout=10), None] + gui._process = process + + with ( + mock.patch.object(gui, "_request_server_shutdown", return_value=False), + mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123), + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + mock.patch("bec_widgets.cli.client_utils.subprocess.run") as run, + ): + run.return_value.stdout = "PID PPID PGID STAT COMMAND\n123 1 123 S bec-gui-server" + gui.kill_server() + + assert killpg.call_args_list == [mock.call(123, signal.SIGTERM), mock.call(123, signal.SIGKILL)] + assert process.wait.call_args_list == [mock.call(timeout=10), mock.call(timeout=10)] + run.assert_called_once_with( + ["ps", "-o", "pid,ppid,pgid,stat,command", "-g", "123"], + check=False, + capture_output=True, + text=True, + timeout=2, + ) + + +def test_join_process_output_thread_signals_reader_before_closing_streams(): + process = mock.MagicMock(pid=123, args=["bec-gui-server"]) + process.stdout = mock.MagicMock() + process.stderr = mock.MagicMock() + thread = mock.MagicMock() + stop_event = mock.MagicMock() + setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event) + thread.is_alive.side_effect = [True, False] + logger = mock.MagicMock() + + _join_process_output_thread(process, thread, logger) + + assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)] + stop_event.set.assert_called_once_with() + process.stdout.close.assert_called_once_with() + process.stderr.close.assert_called_once_with() diff --git a/tests/unit_tests/test_launch_window.py b/tests/unit_tests/test_launch_window.py index 44df9022f..90b942f44 100644 --- a/tests/unit_tests/test_launch_window.py +++ b/tests/unit_tests/test_launch_window.py @@ -153,6 +153,17 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi mock_set_quit_on_last_window_closed.assert_called_once_with(True) +def test_launcher_detects_external_main_window_without_info_log(bec_launch_window): + connection = mock.MagicMock() + connection.parent.return_value = None + connection.objectName.return_value = "BECMainWindowNoRPC" + + with mock.patch("bec_widgets.applications.launch_window.logger.info") as mock_info: + assert not bec_launch_window._launcher_is_last_widget({"window": connection}) + + mock_info.assert_not_called() + + @pytest.mark.parametrize( "connection_names, close_called", [ diff --git a/tests/unit_tests/test_rpc_base.py b/tests/unit_tests/test_rpc_base.py index 3fddb4e77..ac6686542 100644 --- a/tests/unit_tests/test_rpc_base.py +++ b/tests/unit_tests/test_rpc_base.py @@ -3,10 +3,12 @@ import pytest from bec_lib.device import DeviceBaseWithConfig, Signal +from bec_widgets.cli.rpc import rpc_base as rpc_base_module from bec_widgets.cli.rpc.rpc_base import ( DeletedWidgetError, RPCBase, RPCReference, + RPCResponseTimeoutError, _transform_args_kwargs, ) @@ -51,3 +53,33 @@ def test_transform_args_kwargs(): ) assert args == ("full name", "short name", "string_arg", "full name") assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"} + + +def test_run_rpc_logs_response_timeout(monkeypatch): + rpc = RPCBase(gui_id="progress_widget", object_name="progressbar") + rpc._rpc_timeout = 0 + rpc._client = MagicMock() + + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_base_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_base_module.logger, "error", error_mock) + + with pytest.raises(RPCResponseTimeoutError): + rpc._run_rpc("set_value", 42, precision=2, timeout=0) + + publish_msg = rpc._client.connector.set_and_publish.call_args.args[1] + assert publish_msg.metadata["method"] == "set_value" + assert publish_msg.metadata["target_gui_id"] == "progress_widget" + assert publish_msg.metadata["object_name"] == "progressbar" + assert publish_msg.metadata["timeout"] == 0 + assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"] + assert info_mock.call_count == 1 + info_message = info_mock.call_args.args[0] + error_mock.assert_called_once() + error_message = error_mock.call_args.args[0] + assert "GUI RPC response timeout" in error_message + assert "method=set_value" in error_message + assert "target_gui_id=progress_widget" in error_message + assert "object_name=progressbar" in error_message + assert "timeout=0" in error_message diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 7f83ea13f..fa8ebf14d 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -1,11 +1,13 @@ import argparse -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from bec_lib.service_config import ServiceConfig from qtpy.QtWidgets import QWidget +from bec_widgets.applications import companion_app as companion_app_module from bec_widgets.applications.companion_app import GUIServer +from bec_widgets.utils import rpc_server as rpc_server_module from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat @@ -58,6 +60,68 @@ def test_gui_server_get_service_config(gui_server): assert gui_server._get_service_config().config == ServiceConfig().config +def test_gui_server_signal_shutdown_closes_widgets_and_quits_app(gui_server): + widget = MagicMock() + gui_server.app = MagicMock() + gui_server.app.topLevelWidgets.return_value = [widget] + + gui_server.request_shutdown() + + widget.close.assert_called_once() + gui_server.app.quit.assert_called_once() + + +def test_gui_server_shutdown_is_idempotent(gui_server): + gui_server.launcher_window = MagicMock() + gui_server.dispatcher = MagicMock() + + with ( + patch.object(companion_app_module.shiboken6, "isValid", return_value=True), + patch.object(companion_app_module.pylsp_server, "is_running", return_value=False), + ): + gui_server.shutdown() + gui_server.shutdown() + + gui_server.launcher_window.close.assert_called_once() + gui_server.launcher_window.deleteLater.assert_called_once() + gui_server.dispatcher.stop_cli_server.assert_called_once() + gui_server.dispatcher.disconnect_all.assert_called_once() + + +def test_rpc_server_system_capabilities_include_shutdown(rpc_server): + assert rpc_server.run_system_rpc("system.list_capabilities", [], {}) == { + "system.launch_dock_area": True, + "system.shutdown": True, + } + + +def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qapp): + gui_server = MagicMock() + qapp.gui_server = gui_server + + rpc_server.run_system_rpc("system.shutdown", [], {}) + qapp.processEvents() + + gui_server.request_shutdown.assert_called_once() + del qapp.gui_server + + +def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server): + order = [] + rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown")) + rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response")) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + {"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}}, + {"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2}, + ) + + assert order == ["shutdown", "response"] + rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None}) + rpc_server.serialize_result_and_send.assert_not_called() + + def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server): """ Test that a singleshot RPC method raises an error when called multiple times. @@ -91,22 +155,34 @@ def serialize_side_effect(obj): # Third call succeeds return {"gui_id": dummy.gui_id, "success": True} + warning_mock = MagicMock() + # Patch serialize_object to control when it raises RegistryNotReadyError with patch.object(rpc_server, "serialize_object", side_effect=serialize_side_effect): with patch.object(rpc_server, "send_response") as mock_send_response: - # Start the serialization process - rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() - rpc_server.serialize_result_and_send(request_id, dummy) - - # Verify that serialize_object was called 3 times - qtbot.waitUntil(lambda: call_count >= 3, timeout=5000) - - # Verify that send_response was called with success - mock_send_response.assert_called_once() - args = mock_send_response.call_args[0] - assert args[0] == request_id - assert args[1] is True # accepted=True - assert "result" in args[2] + with patch.object(rpc_server_module.logger, "warning", warning_mock): + # Start the serialization process + rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() + rpc_server.serialize_result_and_send(request_id, dummy) + + # Verify that serialize_object was called 3 times + qtbot.waitUntil(lambda: call_count >= 3, timeout=5000) + + # Verify that send_response was called with success + mock_send_response.assert_called_once() + args = mock_send_response.call_args[0] + assert args[0] == request_id + assert args[1] is True # accepted=True + assert "result" in args[2] + + assert warning_mock.call_count == 2 + warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list) + assert "result serialization delayed; retrying" in warning_logs + assert "request_id=test_request_123" in warning_logs + assert "retry_delay_ms=100" in warning_logs + assert "accumulated_delay_ms=100" in warning_logs + assert "accumulated_delay_ms=200" in warning_logs + assert "max_delay_ms=2000" in warning_logs def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_widget): @@ -140,6 +216,56 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w assert "Max delay exceeded" in args[2]["error"] +def test_send_response_logs_publish_status(rpc_server, monkeypatch): + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "error", error_mock) + + with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock: + rpc_server.send_response("request-ok", True, {"result": None}) + rpc_server.send_response("request-failed", False, {"error": "bad"}) + + assert publish_mock.call_count == 2 + assert "request_id=request-ok" in info_mock.call_args.args[0] + assert "accepted=True" in info_mock.call_args.args[0] + assert "request_id=request-failed" in error_mock.call_args.args[0] + assert "accepted=False" in error_mock.call_args.args[0] + + +def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch): + info_mock = MagicMock() + warning_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock) + + rpc_server.rpc_register.get_rpc_by_id = MagicMock() + rpc_server.run_rpc = MagicMock(return_value=None) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + { + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + {"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1}, + ) + + received_log = info_mock.call_args_list[0].args[0] + executed_log = info_mock.call_args_list[1].args[0] + warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list) + + assert "GUI RPC server received request" in received_log + assert "request_id=late-request" in received_log + assert "method=set_value" in received_log + assert "target_gui_id=ring" in received_log + assert "timeout=0.1" in received_log + assert "stale_on_receive=True" in received_log + assert "response_after_client_deadline=True" in executed_log + assert "received request after client timeout deadline" in warning_logs + assert "response is late for client timeout" in warning_logs + + def test_run_rpc_delegates_to_rpc_content_class(rpc_server): class Content: USER_ACCESS = ["foo", "mode", "mode.setter"]