From 4dbecc9bd6faa99c2739634f92682d6d76590bfc Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Tue, 26 May 2026 09:50:38 +0200 Subject: [PATCH 01/12] fix(launch_window): exclude launcher check for non-parented widgets for BECMainWindow --- bec_widgets/applications/launch_window.py | 35 ++++++++++++++++------- tests/unit_tests/test_launch_window.py | 11 +++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/bec_widgets/applications/launch_window.py b/bec_widgets/applications/launch_window.py index 4eae81a48..50f8e3b1b 100644 --- a/bec_widgets/applications/launch_window.py +++ b/bec_widgets/applications/launch_window.py @@ -660,20 +660,35 @@ def _launcher_is_last_widget(self, connections: dict) -> bool: Check if the launcher is the last widget in the application. """ - # get all parents of connections for connection in connections.values(): - try: - parent = connection.parent() - if parent is None and connection.objectName() != self.objectName(): - logger.info( - f"Found non-launcher connection without parent: {connection.objectName()}" - ) - return False - except Exception as e: - logger.error(f"Error getting parent of connection: {e}") + if not self._connection_belongs_to_launcher(connection): return False return True + def _connection_belongs_to_launcher(self, connection: QObject) -> bool: + """ + Check whether a registered connection is the launcher itself or part of its Qt hierarchy. + + Registered top-level windows such as BECMainWindowNoRPC are expected when another GUI is + open. They are not launcher children, but they are also not an error condition. + """ + try: + if connection is self or getattr(connection, "gui_id", None) == self.gui_id: + return True + if connection.objectName() == self.objectName(): + return True + + parent = connection.parent() + while parent is not None: + if parent is self: + return True + parent = parent.parent() + except Exception as e: + logger.error(f"Error checking launcher ownership of connection: {e}") + return False + + return False + def _turn_off_the_lights(self, connections: dict): """ If there is only one connection remaining, it is the launcher, so we show it. diff --git a/tests/unit_tests/test_launch_window.py b/tests/unit_tests/test_launch_window.py index 44df9022f..90b942f44 100644 --- a/tests/unit_tests/test_launch_window.py +++ b/tests/unit_tests/test_launch_window.py @@ -153,6 +153,17 @@ def test_gui_server_turns_off_the_lights(bec_launch_window, connection_names, hi mock_set_quit_on_last_window_closed.assert_called_once_with(True) +def test_launcher_detects_external_main_window_without_info_log(bec_launch_window): + connection = mock.MagicMock() + connection.parent.return_value = None + connection.objectName.return_value = "BECMainWindowNoRPC" + + with mock.patch("bec_widgets.applications.launch_window.logger.info") as mock_info: + assert not bec_launch_window._launcher_is_last_widget({"window": connection}) + + mock_info.assert_not_called() + + @pytest.mark.parametrize( "connection_names, close_called", [ From aa2ee58bcded801e01e2a6f27ead64d6aa2602f5 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 12:59:38 +0200 Subject: [PATCH 02/12] fix(rpc): additional logs --- bec_widgets/cli/rpc/rpc_base.py | 50 +++++++++++++++ bec_widgets/utils/rpc_server.py | 96 +++++++++++++++++++++++++++-- tests/unit_tests/test_rpc_base.py | 36 +++++++++++ tests/unit_tests/test_rpc_server.py | 59 +++++++++++++++++- 4 files changed, 234 insertions(+), 7 deletions(-) diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 2523789e1..92df5d2aa 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -2,6 +2,7 @@ import inspect import threading +import time import uuid from functools import wraps from typing import TYPE_CHECKING, Any, cast @@ -9,6 +10,7 @@ from bec_lib.client import BECClient from bec_lib.device import DeviceBaseWithConfig from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from if TYPE_CHECKING: # pragma: no cover @@ -25,6 +27,7 @@ # pylint: disable=protected-access _DEFAULT_RPC_TIMEOUT = object() +logger = bec_logger.logger def _name_arg(arg): @@ -41,6 +44,16 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]: return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()} +def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + def rpc_timeout(timeout): """ A decorator to set a timeout for an RPC call. @@ -261,12 +274,42 @@ def _run_rpc( MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response ) + target_gui_id = gui_id or self._gui_id + args_log = _format_rpc_payload(args) + kwargs_log = _format_rpc_payload(kwargs) + sent_at = time.time() + deadline = sent_at + timeout if timeout is not None else None + rpc_msg.metadata.update( + { + "method": method, + "receiver": receiver, + "target_gui_id": target_gui_id, + "object_name": self.object_name, + "wait_for_response": wait_for_rpc_response, + "timeout": timeout, + "sent_at": sent_at, + "deadline": deadline, + } + ) + logger.info( + "Sending GUI RPC request " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"wait_for_response={wait_for_rpc_response} timeout={timeout} " + f"args={args_log} kwargs={kwargs_log}" + ) self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) if wait_for_rpc_response: try: finished = self._msg_wait_event.wait(timeout) if not finished: + logger.error( + "GUI RPC response timeout " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"timeout={timeout} args={args_log} kwargs={kwargs_log}" + ) raise RPCResponseTimeoutError(request_id, timeout) finally: self._msg_wait_event.clear() @@ -278,6 +321,12 @@ def _run_rpc( # the _on_rpc_response method assert isinstance(self._rpc_response, messages.RequestResponseMessage) + logger.info( + "Received GUI RPC response " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"accepted={self._rpc_response.accepted} args={args_log} kwargs={kwargs_log}" + ) if not self._rpc_response.accepted: raise ValueError(self._rpc_response.message["error"]) msg_result = self._rpc_response.message.get("result") @@ -286,6 +335,7 @@ def _run_rpc( def _on_rpc_response(self, msg_obj: MessageObject) -> None: msg = cast(messages.RequestResponseMessage, msg_obj.value) + logger.debug(f"GUI RPC response callback received: {msg}") self._rpc_response = msg self._msg_wait_event.set() diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 25fd0cca2..8a91f4046 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -1,10 +1,11 @@ from __future__ import annotations import functools +import time import traceback import types from contextlib import contextmanager -from typing import TYPE_CHECKING, Callable, Literal, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints @@ -115,33 +116,116 @@ def on_rpc_update(self, msg: dict, metadata: dict): if request_id is None: logger.error("Received RPC instruction without request_id") return + method = msg.get("action") + parameter = msg.get("parameter", {}) + args = parameter.get("args", []) + kwargs = parameter.get("kwargs", {}) + args_log = self._format_rpc_payload(args) + kwargs_log = self._format_rpc_payload(kwargs) + target_gui_id = parameter.get("gui_id") + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + received_at = time.time() + receive_latency = self._elapsed_seconds(sent_at, received_at) + stale_on_receive = deadline is not None and received_at > deadline + logger.info( + "GUI RPC server received request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}" + ) + if stale_on_receive: + logger.warning( + "GUI RPC server received request after client timeout deadline " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"args={args_log} kwargs={kwargs_log}" + ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") + execution_start = time.perf_counter() with rpc_exception_hook(functools.partial(self.send_response, request_id, False)): try: - method = msg["action"] - args = msg["parameter"].get("args", []) - kwargs = msg["parameter"].get("kwargs", {}) if method.startswith("system."): res = self.run_system_rpc(method, args, kwargs) else: - obj = self.get_object_from_config(msg["parameter"]) + obj = self.get_object_from_config(parameter) res = self.run_rpc(obj, method, args, kwargs) except Exception: + execution_duration = time.perf_counter() - execution_start content = traceback.format_exc() - logger.error(f"Error while executing RPC instruction: {content}") + logger.error( + "GUI RPC server execution failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " + f"args={args_log} kwargs={kwargs_log}\n" + f"{content}" + ) self.send_response(request_id, False, {"error": content}) else: + execution_duration = time.perf_counter() - execution_start + response_stale = deadline is not None and time.time() > deadline + logger.info( + "GUI RPC server executed request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " + f"response_after_client_deadline={response_stale} " + f"args={args_log} kwargs={kwargs_log}" + ) + if response_stale: + logger.warning( + "GUI RPC server response is late for client timeout " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"execution_duration_s={execution_duration:.3f} " + f"args={args_log} kwargs={kwargs_log}" + ) logger.debug(f"RPC instruction executed successfully: {res}") self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res)) def send_response(self, request_id: str, accepted: bool, msg: dict): + log_message = ( + "GUI RPC server publishing response " + f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}" + ) + if accepted: + logger.info(log_message) + else: + logger.error(log_message) self.client.connector.set_and_publish( MessageEndpoints.gui_instruction_response(request_id), messages.RequestResponseMessage(accepted=accepted, message=msg), expire=60, ) + @staticmethod + def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + @staticmethod + def _format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" + + @staticmethod + def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + def get_object_from_config(self, config: dict): gui_id = config.get("gui_id") obj = self.rpc_register.get_rpc_by_id(gui_id) diff --git a/tests/unit_tests/test_rpc_base.py b/tests/unit_tests/test_rpc_base.py index 3fddb4e77..7fc44cf4e 100644 --- a/tests/unit_tests/test_rpc_base.py +++ b/tests/unit_tests/test_rpc_base.py @@ -3,10 +3,12 @@ import pytest from bec_lib.device import DeviceBaseWithConfig, Signal +from bec_widgets.cli.rpc import rpc_base as rpc_base_module from bec_widgets.cli.rpc.rpc_base import ( DeletedWidgetError, RPCBase, RPCReference, + RPCResponseTimeoutError, _transform_args_kwargs, ) @@ -51,3 +53,37 @@ def test_transform_args_kwargs(): ) assert args == ("full name", "short name", "string_arg", "full name") assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"} + + +def test_run_rpc_logs_response_timeout(monkeypatch): + rpc = RPCBase(gui_id="progress_widget", object_name="progressbar") + rpc._rpc_timeout = 0 + rpc._client = MagicMock() + + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_base_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_base_module.logger, "error", error_mock) + + with pytest.raises(RPCResponseTimeoutError): + rpc._run_rpc("set_value", 42, precision=2, timeout=0) + + publish_msg = rpc._client.connector.set_and_publish.call_args.args[1] + assert publish_msg.metadata["method"] == "set_value" + assert publish_msg.metadata["target_gui_id"] == "progress_widget" + assert publish_msg.metadata["object_name"] == "progressbar" + assert publish_msg.metadata["timeout"] == 0 + assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"] + assert info_mock.call_count == 1 + info_message = info_mock.call_args.args[0] + assert "args=(42,)" in info_message + assert "kwargs={'precision': 2}" in info_message + error_mock.assert_called_once() + error_message = error_mock.call_args.args[0] + assert "GUI RPC response timeout" in error_message + assert "method=set_value" in error_message + assert "target_gui_id=progress_widget" in error_message + assert "object_name=progressbar" in error_message + assert "timeout=0" in error_message + assert "args=(42,)" in error_message + assert "kwargs={'precision': 2}" in error_message diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 7f83ea13f..33f99ba30 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -1,11 +1,12 @@ import argparse -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from bec_lib.service_config import ServiceConfig from qtpy.QtWidgets import QWidget from bec_widgets.applications.companion_app import GUIServer +from bec_widgets.utils import rpc_server as rpc_server_module from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat @@ -140,6 +141,62 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w assert "Max delay exceeded" in args[2]["error"] +def test_send_response_logs_publish_status(rpc_server, monkeypatch): + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "error", error_mock) + + with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock: + rpc_server.send_response("request-ok", True, {"result": None}) + rpc_server.send_response("request-failed", False, {"error": "bad"}) + + assert publish_mock.call_count == 2 + assert "request_id=request-ok" in info_mock.call_args.args[0] + assert "accepted=True" in info_mock.call_args.args[0] + assert "request_id=request-failed" in error_mock.call_args.args[0] + assert "accepted=False" in error_mock.call_args.args[0] + + +def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch): + info_mock = MagicMock() + warning_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock) + + rpc_server.rpc_register.get_rpc_by_id = MagicMock() + rpc_server.run_rpc = MagicMock(return_value=None) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + { + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + {"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1}, + ) + + received_log = info_mock.call_args_list[0].args[0] + executed_log = info_mock.call_args_list[1].args[0] + warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list) + + assert "GUI RPC server received request" in received_log + assert "request_id=late-request" in received_log + assert "method=set_value" in received_log + assert "target_gui_id=ring" in received_log + assert "timeout=0.1" in received_log + assert "stale_on_receive=True" in received_log + assert "args=[1]" in received_log + assert "kwargs={'source': 'test'}" in received_log + assert "response_after_client_deadline=True" in executed_log + assert "args=[1]" in executed_log + assert "kwargs={'source': 'test'}" in executed_log + assert "received request after client timeout deadline" in warning_logs + assert "response is late for client timeout" in warning_logs + assert "args=[1]" in warning_logs + assert "kwargs={'source': 'test'}" in warning_logs + + def test_run_rpc_delegates_to_rpc_content_class(rpc_server): class Content: USER_ACCESS = ["foo", "mode", "mode.setter"] From 2555c20ad99968d86b533cab1c83ff18f16bb8fe Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 15:08:07 +0200 Subject: [PATCH 03/12] fix(rpc): log dispatcher receipt before qt callback --- bec_widgets/utils/bec_dispatcher.py | 66 ++++++++++++++++++++++++- tests/unit_tests/test_bec_dispatcher.py | 53 ++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index af739ef27..b99ae9adb 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -3,8 +3,9 @@ import collections import random import string +import time from collections.abc import Callable -from typing import TYPE_CHECKING, DefaultDict, Hashable, Union +from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union import louie import redis @@ -25,6 +26,67 @@ from bec_widgets.utils.rpc_server import RPCServer +def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + +def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + +def _format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" + + +def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: + if not isinstance(msg_content, dict) or not isinstance(metadata, dict): + return + request_id = metadata.get("request_id") + method = msg_content.get("action") + parameter = msg_content.get("parameter") + if request_id is None or method is None or not isinstance(parameter, dict): + return + + dispatch_received_at = time.time() + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at) + stale_on_dispatch = deadline is not None and dispatch_received_at > deadline + target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") + args_log = _format_rpc_payload(parameter.get("args", [])) + kwargs_log = _format_rpc_payload(parameter.get("kwargs", {})) + + logger.info( + "GUI RPC dispatcher received request before Qt callback emit " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}" + ) + if stale_on_dispatch: + logger.warning( + "GUI RPC dispatcher received request after client timeout deadline " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"args={args_log} kwargs={kwargs_log}" + ) + + class QtThreadSafeCallback(QObject): """QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt.""" @@ -88,10 +150,12 @@ def _execute_callback(self, cb, msg, kwargs): # we can notice kwargs are lost when passed to Qt slot metadata = msg.metadata + _log_rpc_dispatcher_receive(msg.content, metadata) cb(msg.content, metadata) else: # from stream msg = msg["data"] + _log_rpc_dispatcher_receive(msg.content, msg.metadata) cb(msg.content, msg.metadata) diff --git a/tests/unit_tests/test_bec_dispatcher.py b/tests/unit_tests/test_bec_dispatcher.py index 12b57c673..48bbd8ec5 100644 --- a/tests/unit_tests/test_bec_dispatcher.py +++ b/tests/unit_tests/test_bec_dispatcher.py @@ -1,6 +1,7 @@ # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring import threading import time +from types import SimpleNamespace from unittest import mock import pytest @@ -213,3 +214,55 @@ def _get_slots(): send_msg_event.set() qtbot.wait(10) + + +def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch): + info_mock = mock.MagicMock() + warning_mock = mock.MagicMock() + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock) + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock) + + def callback(_msg, _metadata): + pass + + cb = QtThreadSafeCallback(callback) + connector = QtRedisConnector("localhost:1", mock.MagicMock()) + rpc_msg = SimpleNamespace( + content={ + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + metadata={ + "request_id": "dispatcher-request", + "receiver": "gui", + "object_name": "progressbar", + "timeout": 0.1, + "sent_at": 1.0, + "deadline": 1.1, + }, + ) + + try: + connector._execute_callback(cb, {"data": rpc_msg}, {}) + + info_mock.assert_called_once() + info_message = info_mock.call_args.args[0] + assert "GUI RPC dispatcher received request before Qt callback emit" in info_message + assert "request_id=dispatcher-request" in info_message + assert "method=set_value" in info_message + assert "receiver=gui" in info_message + assert "target_gui_id=ring" in info_message + assert "object_name=progressbar" in info_message + assert "timeout=0.1" in info_message + assert "stale_on_dispatch=True" in info_message + assert "args=[1]" in info_message + assert "kwargs={'source': 'test'}" in info_message + + warning_mock.assert_called_once() + warning_message = warning_mock.call_args.args[0] + assert "received request after client timeout deadline" in warning_message + assert "request_id=dispatcher-request" in warning_message + assert "args=[1]" in warning_message + assert "kwargs={'source': 'test'}" in warning_message + finally: + connector.shutdown() From 15935a4b4ede617fe48524a83db809bca9ce8ae6 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 15:27:20 +0200 Subject: [PATCH 04/12] refactor(rpc): share logging helpers --- bec_widgets/cli/rpc/rpc_base.py | 16 +++--------- bec_widgets/utils/bec_dispatcher.py | 36 +++++---------------------- bec_widgets/utils/rpc_logging.py | 28 +++++++++++++++++++++ bec_widgets/utils/rpc_server.py | 38 ++++++----------------------- 4 files changed, 45 insertions(+), 73 deletions(-) create mode 100644 bec_widgets/utils/rpc_logging.py diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 92df5d2aa..15091c8be 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -13,6 +13,8 @@ from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from +from bec_widgets.utils.rpc_logging import format_rpc_payload + if TYPE_CHECKING: # pragma: no cover from bec_lib import messages from bec_lib.connector import MessageObject @@ -44,16 +46,6 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]: return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()} -def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - - def rpc_timeout(timeout): """ A decorator to set a timeout for an RPC call. @@ -275,8 +267,8 @@ def _run_rpc( ) target_gui_id = gui_id or self._gui_id - args_log = _format_rpc_payload(args) - kwargs_log = _format_rpc_payload(kwargs) + args_log = format_rpc_payload(args) + kwargs_log = format_rpc_payload(kwargs) sent_at = time.time() deadline = sent_at + timeout if timeout is not None else None rpc_msg.metadata.update( diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index b99ae9adb..28b3e1395 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -16,6 +16,7 @@ from qtpy.QtCore import QObject from qtpy.QtCore import Signal as pyqtSignal +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload from bec_widgets.utils.serialization import register_serializer_extension logger = bec_logger.logger @@ -26,31 +27,6 @@ from bec_widgets.utils.rpc_server import RPCServer -def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - - -def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: - if start is None: - return None - try: - return max(0.0, stop - float(start)) - except (TypeError, ValueError): - return None - - -def _format_elapsed(elapsed: float | None) -> str: - if elapsed is None: - return "unknown" - return f"{elapsed:.3f}" - - def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: if not isinstance(msg_content, dict) or not isinstance(metadata, dict): return @@ -64,17 +40,17 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: sent_at = metadata.get("sent_at") deadline = metadata.get("deadline") timeout = metadata.get("timeout") - dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at) + dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at) stale_on_dispatch = deadline is not None and dispatch_received_at > deadline target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") - args_log = _format_rpc_payload(parameter.get("args", [])) - kwargs_log = _format_rpc_payload(parameter.get("kwargs", {})) + args_log = format_rpc_payload(parameter.get("args", [])) + kwargs_log = format_rpc_payload(parameter.get("kwargs", {})) logger.info( "GUI RPC dispatcher received request before Qt callback emit " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " - f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}" ) if stale_on_dispatch: @@ -82,7 +58,7 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: "GUI RPC dispatcher received request after client timeout deadline " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " - f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " f"args={args_log} kwargs={kwargs_log}" ) diff --git a/bec_widgets/utils/rpc_logging.py b/bec_widgets/utils/rpc_logging.py new file mode 100644 index 000000000..90a21f4fc --- /dev/null +++ b/bec_widgets/utils/rpc_logging.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Any + + +def format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + +def elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + +def format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 8a91f4046..041e1f82f 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -5,7 +5,7 @@ import traceback import types from contextlib import contextmanager -from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar +from typing import TYPE_CHECKING, Callable, Literal, TypeVar from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints @@ -19,6 +19,7 @@ from bec_widgets.utils.bec_dispatcher import BECDispatcher from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.utils.error_popups import ErrorPopupUtility +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload from bec_widgets.utils.rpc_register import RPCRegister from bec_widgets.utils.screen_utils import apply_window_geometry from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea @@ -120,20 +121,20 @@ def on_rpc_update(self, msg: dict, metadata: dict): parameter = msg.get("parameter", {}) args = parameter.get("args", []) kwargs = parameter.get("kwargs", {}) - args_log = self._format_rpc_payload(args) - kwargs_log = self._format_rpc_payload(kwargs) + args_log = format_rpc_payload(args) + kwargs_log = format_rpc_payload(kwargs) target_gui_id = parameter.get("gui_id") sent_at = metadata.get("sent_at") deadline = metadata.get("deadline") timeout = metadata.get("timeout") received_at = time.time() - receive_latency = self._elapsed_seconds(sent_at, received_at) + receive_latency = elapsed_seconds(sent_at, received_at) stale_on_receive = deadline is not None and received_at > deadline logger.info( "GUI RPC server received request " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"receive_latency_s={format_elapsed(receive_latency)} " f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}" ) if stale_on_receive: @@ -141,7 +142,7 @@ def on_rpc_update(self, msg: dict, metadata: dict): "GUI RPC server received request after client timeout deadline " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"receive_latency_s={format_elapsed(receive_latency)} " f"args={args_log} kwargs={kwargs_log}" ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") @@ -201,31 +202,6 @@ def send_response(self, request_id: str, accepted: bool, msg: dict): expire=60, ) - @staticmethod - def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: - if start is None: - return None - try: - return max(0.0, stop - float(start)) - except (TypeError, ValueError): - return None - - @staticmethod - def _format_elapsed(elapsed: float | None) -> str: - if elapsed is None: - return "unknown" - return f"{elapsed:.3f}" - - @staticmethod - def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - def get_object_from_config(self, config: dict): gui_id = config.get("gui_id") obj = self.rpc_register.get_rpc_by_id(gui_id) From 20c5e43f3edd6dddee9c72751ee726ee385bef68 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 15:50:25 +0200 Subject: [PATCH 05/12] fix(companion_app): disable logging of bec_lib.scan_items on widget side --- bec_widgets/applications/companion_app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bec_widgets/applications/companion_app.py b/bec_widgets/applications/companion_app.py index 05944463e..f229b6774 100644 --- a/bec_widgets/applications/companion_app.py +++ b/bec_widgets/applications/companion_app.py @@ -74,6 +74,7 @@ def start(self): bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR bec_logger._update_sinks() + bec_logger.disabled_modules = ["bec_lib.scan_items"] with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore self._run() From d07c212719f10255a4f4997d239c0d0e135e1153 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 16:51:58 +0200 Subject: [PATCH 06/12] fix(rpc_server): log warning if rpc call is repeated --- bec_widgets/utils/rpc_server.py | 9 ++++++- tests/unit_tests/test_rpc_server.py | 38 +++++++++++++++++++---------- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 041e1f82f..0163e1415 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -357,7 +357,14 @@ def serialize_result_and_send(self, request_id: str, res: object): res = self.serialize_object(res) except RegistryNotReadyError: try: - self._rpc_singleshot_repeats[request_id] += retry_delay + repeat = self._rpc_singleshot_repeats[request_id] + repeat += retry_delay + logger.warning( + "GUI RPC result serialization delayed; retrying " + f"request_id={request_id} retry_delay_ms={retry_delay} " + f"accumulated_delay_ms={repeat.accumulated_delay} " + f"max_delay_ms={repeat.max_delay}" + ) QTimer.singleShot( retry_delay, lambda: self.serialize_result_and_send(request_id, res) ) diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 33f99ba30..03cfd8c17 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -92,22 +92,34 @@ def serialize_side_effect(obj): # Third call succeeds return {"gui_id": dummy.gui_id, "success": True} + warning_mock = MagicMock() + # Patch serialize_object to control when it raises RegistryNotReadyError with patch.object(rpc_server, "serialize_object", side_effect=serialize_side_effect): with patch.object(rpc_server, "send_response") as mock_send_response: - # Start the serialization process - rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() - rpc_server.serialize_result_and_send(request_id, dummy) - - # Verify that serialize_object was called 3 times - qtbot.waitUntil(lambda: call_count >= 3, timeout=5000) - - # Verify that send_response was called with success - mock_send_response.assert_called_once() - args = mock_send_response.call_args[0] - assert args[0] == request_id - assert args[1] is True # accepted=True - assert "result" in args[2] + with patch.object(rpc_server_module.logger, "warning", warning_mock): + # Start the serialization process + rpc_server._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() + rpc_server.serialize_result_and_send(request_id, dummy) + + # Verify that serialize_object was called 3 times + qtbot.waitUntil(lambda: call_count >= 3, timeout=5000) + + # Verify that send_response was called with success + mock_send_response.assert_called_once() + args = mock_send_response.call_args[0] + assert args[0] == request_id + assert args[1] is True # accepted=True + assert "result" in args[2] + + assert warning_mock.call_count == 2 + warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list) + assert "result serialization delayed; retrying" in warning_logs + assert "request_id=test_request_123" in warning_logs + assert "retry_delay_ms=100" in warning_logs + assert "accumulated_delay_ms=100" in warning_logs + assert "accumulated_delay_ms=200" in warning_logs + assert "max_delay_ms=2000" in warning_logs def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_widget): From e78d8cb3d539641970c6db7bf534b6a0d120eae8 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 17:22:07 +0200 Subject: [PATCH 07/12] fix(client_utils): increase default rpc timeout to 60s --- bec_widgets/cli/client_utils.py | 2 +- tests/unit_tests/test_client_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 21276773a..551e41b85 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -222,7 +222,7 @@ def __init__(self, **kwargs) -> None: self._ipython_registry: dict[str, RPCReference] = {} self.available_widgets = AvailableWidgetsNamespace() register_serializer_extension() - self._rpc_timeout = 5 + self._rpc_timeout = 60 #################### #### Client API #### diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index 30f0a5daa..3c489b8a2 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -262,7 +262,7 @@ def test_client_utils_delete_falls_back_to_direct_close(): def test_client_utils_gui_client_set_rpc_timeout(): gui = BECGuiClient() - assert gui._rpc_timeout == 5 + assert gui._rpc_timeout == 60 gui.set_rpc_timeout(10) assert gui._rpc_timeout == 10 From a80a705d0195a9e1cea220bf036661224818c38a Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Tue, 26 May 2026 09:59:09 +0200 Subject: [PATCH 08/12] fix: change prints into proper logs --- .../widgets/control/buttons/button_abort/button_abort.py | 5 ++++- bec_widgets/widgets/plots/image/image_base.py | 4 ++-- bec_widgets/widgets/plots/waveform/waveform.py | 2 +- .../widgets/progress/scan_progressbar/scan_progressbar.py | 1 - 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/bec_widgets/widgets/control/buttons/button_abort/button_abort.py b/bec_widgets/widgets/control/buttons/button_abort/button_abort.py index 4a9585dc2..c17675020 100644 --- a/bec_widgets/widgets/control/buttons/button_abort/button_abort.py +++ b/bec_widgets/widgets/control/buttons/button_abort/button_abort.py @@ -1,3 +1,4 @@ +from bec_lib.logger import bec_logger from bec_qthemes import material_icon from qtpy.QtCore import Qt from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget @@ -5,6 +6,8 @@ from bec_widgets.utils.bec_widget import BECWidget from bec_widgets.utils.error_popups import SafeSlot +logger = bec_logger.logger + class AbortButton(BECWidget, QWidget): """A button that abort the scan.""" @@ -55,7 +58,7 @@ def abort_scan( scan_id(str|None): The scan id to abort. If None, the current scan will be aborted. """ if self.scan_id is not None: - print(f"Aborting scan with scan_id: {self.scan_id}") + logger.info(f"Aborting scan with scan_id: {self.scan_id}") self.queue.request_scan_abortion(scan_id=self.scan_id) else: self.queue.request_scan_abortion() diff --git a/bec_widgets/widgets/plots/image/image_base.py b/bec_widgets/widgets/plots/image/image_base.py index 57bbf95c5..db8e783e4 100644 --- a/bec_widgets/widgets/plots/image/image_base.py +++ b/bec_widgets/widgets/plots/image/image_base.py @@ -460,7 +460,7 @@ def enable_colorbar( self._color_bar = None def disable_autorange(): - print("Disabling autorange") + logger.info("Disabling autorange") self.setProperty("autorange", False) if style == "simple": @@ -928,7 +928,7 @@ def _set_autorange(self, enabled: bool, sync: bool = True): # if sync: self._sync_colorbar_levels() self._sync_autorange_switch() - print(f"Autorange set to {enabled}") + logger.info(f"Autorange set to {enabled}") @SafeProperty(str) def autorange_mode(self) -> str: diff --git a/bec_widgets/widgets/plots/waveform/waveform.py b/bec_widgets/widgets/plots/waveform/waveform.py index 467a6138a..35a2873ba 100644 --- a/bec_widgets/widgets/plots/waveform/waveform.py +++ b/bec_widgets/widgets/plots/waveform/waveform.py @@ -2449,7 +2449,7 @@ def _check_dataset_size_and_confirm(self, dataset_obj, device_entry: str) -> boo first_key = next(iter(info)) mem_bytes = info[first_key]["value"]["mem_size"] size_mb = mem_bytes / (1024 * 1024) - print(f"Dataset size: {size_mb:.1f} MB") + logger.info(f"Dataset size: {size_mb:.1f} MB") except Exception as exc: # noqa: BLE001 logger.error(f"Unable to evaluate dataset size: {exc}") return True diff --git a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py index 392fecb81..c10f7b3b1 100644 --- a/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py +++ b/bec_widgets/widgets/progress/scan_progressbar/scan_progressbar.py @@ -155,7 +155,6 @@ def __init__( self._progress_device = None self.task = None self.scan_number = None - self.progress_started.connect(lambda: print("Scan progress started")) def connect_to_queue(self): """ From a37eee0a2c303551c144d5123a851600151199e1 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Tue, 26 May 2026 10:00:37 +0200 Subject: [PATCH 09/12] fix(logging): removed args/kwargs from logging messages --- bec_widgets/cli/rpc/rpc_base.py | 11 +++-------- bec_widgets/utils/bec_dispatcher.py | 9 +++------ bec_widgets/utils/rpc_logging.py | 12 ------------ bec_widgets/utils/rpc_server.py | 18 ++++++------------ tests/unit_tests/test_bec_dispatcher.py | 4 ---- tests/unit_tests/test_rpc_base.py | 4 ---- tests/unit_tests/test_rpc_server.py | 6 ------ 7 files changed, 12 insertions(+), 52 deletions(-) diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 15091c8be..a5061ae48 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -13,8 +13,6 @@ from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from -from bec_widgets.utils.rpc_logging import format_rpc_payload - if TYPE_CHECKING: # pragma: no cover from bec_lib import messages from bec_lib.connector import MessageObject @@ -267,8 +265,6 @@ def _run_rpc( ) target_gui_id = gui_id or self._gui_id - args_log = format_rpc_payload(args) - kwargs_log = format_rpc_payload(kwargs) sent_at = time.time() deadline = sent_at + timeout if timeout is not None else None rpc_msg.metadata.update( @@ -287,8 +283,7 @@ def _run_rpc( "Sending GUI RPC request " f"request_id={request_id} method={method} receiver={receiver} " f"target_gui_id={target_gui_id} object_name={self.object_name} " - f"wait_for_response={wait_for_rpc_response} timeout={timeout} " - f"args={args_log} kwargs={kwargs_log}" + f"wait_for_response={wait_for_rpc_response} timeout={timeout}" ) self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) @@ -300,7 +295,7 @@ def _run_rpc( "GUI RPC response timeout " f"request_id={request_id} method={method} receiver={receiver} " f"target_gui_id={target_gui_id} object_name={self.object_name} " - f"timeout={timeout} args={args_log} kwargs={kwargs_log}" + f"timeout={timeout}" ) raise RPCResponseTimeoutError(request_id, timeout) finally: @@ -317,7 +312,7 @@ def _run_rpc( "Received GUI RPC response " f"request_id={request_id} method={method} receiver={receiver} " f"target_gui_id={target_gui_id} object_name={self.object_name} " - f"accepted={self._rpc_response.accepted} args={args_log} kwargs={kwargs_log}" + f"accepted={self._rpc_response.accepted}" ) if not self._rpc_response.accepted: raise ValueError(self._rpc_response.message["error"]) diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index 28b3e1395..6f1453155 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -16,7 +16,7 @@ from qtpy.QtCore import QObject from qtpy.QtCore import Signal as pyqtSignal -from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed from bec_widgets.utils.serialization import register_serializer_extension logger = bec_logger.logger @@ -43,23 +43,20 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at) stale_on_dispatch = deadline is not None and dispatch_received_at > deadline target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") - args_log = format_rpc_payload(parameter.get("args", [])) - kwargs_log = format_rpc_payload(parameter.get("kwargs", {})) logger.info( "GUI RPC dispatcher received request before Qt callback emit " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " - f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}" + f"stale_on_dispatch={stale_on_dispatch}" ) if stale_on_dispatch: logger.warning( "GUI RPC dispatcher received request after client timeout deadline " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " - f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " - f"args={args_log} kwargs={kwargs_log}" + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)}" ) diff --git a/bec_widgets/utils/rpc_logging.py b/bec_widgets/utils/rpc_logging.py index 90a21f4fc..9ef28b2db 100644 --- a/bec_widgets/utils/rpc_logging.py +++ b/bec_widgets/utils/rpc_logging.py @@ -1,17 +1,5 @@ from __future__ import annotations -from typing import Any - - -def format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - def elapsed_seconds(start: float | int | None, stop: float) -> float | None: if start is None: diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 0163e1415..af27e4ca9 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -19,7 +19,7 @@ from bec_widgets.utils.bec_dispatcher import BECDispatcher from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.utils.error_popups import ErrorPopupUtility -from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed from bec_widgets.utils.rpc_register import RPCRegister from bec_widgets.utils.screen_utils import apply_window_geometry from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea @@ -121,8 +121,6 @@ def on_rpc_update(self, msg: dict, metadata: dict): parameter = msg.get("parameter", {}) args = parameter.get("args", []) kwargs = parameter.get("kwargs", {}) - args_log = format_rpc_payload(args) - kwargs_log = format_rpc_payload(kwargs) target_gui_id = parameter.get("gui_id") sent_at = metadata.get("sent_at") deadline = metadata.get("deadline") @@ -135,15 +133,14 @@ def on_rpc_update(self, msg: dict, metadata: dict): f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " f"receive_latency_s={format_elapsed(receive_latency)} " - f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}" + f"stale_on_receive={stale_on_receive}" ) if stale_on_receive: logger.warning( "GUI RPC server received request after client timeout deadline " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"receive_latency_s={format_elapsed(receive_latency)} " - f"args={args_log} kwargs={kwargs_log}" + f"receive_latency_s={format_elapsed(receive_latency)}" ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") execution_start = time.perf_counter() @@ -160,8 +157,7 @@ def on_rpc_update(self, msg: dict, metadata: dict): logger.error( "GUI RPC server execution failed " f"request_id={request_id} method={method} gui_id={self.gui_id} " - f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " - f"args={args_log} kwargs={kwargs_log}\n" + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f}\n" f"{content}" ) self.send_response(request_id, False, {"error": content}) @@ -172,16 +168,14 @@ def on_rpc_update(self, msg: dict, metadata: dict): "GUI RPC server executed request " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " - f"response_after_client_deadline={response_stale} " - f"args={args_log} kwargs={kwargs_log}" + f"response_after_client_deadline={response_stale}" ) if response_stale: logger.warning( "GUI RPC server response is late for client timeout " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"execution_duration_s={execution_duration:.3f} " - f"args={args_log} kwargs={kwargs_log}" + f"execution_duration_s={execution_duration:.3f}" ) logger.debug(f"RPC instruction executed successfully: {res}") self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() diff --git a/tests/unit_tests/test_bec_dispatcher.py b/tests/unit_tests/test_bec_dispatcher.py index 48bbd8ec5..900fc3c26 100644 --- a/tests/unit_tests/test_bec_dispatcher.py +++ b/tests/unit_tests/test_bec_dispatcher.py @@ -255,14 +255,10 @@ def callback(_msg, _metadata): assert "object_name=progressbar" in info_message assert "timeout=0.1" in info_message assert "stale_on_dispatch=True" in info_message - assert "args=[1]" in info_message - assert "kwargs={'source': 'test'}" in info_message warning_mock.assert_called_once() warning_message = warning_mock.call_args.args[0] assert "received request after client timeout deadline" in warning_message assert "request_id=dispatcher-request" in warning_message - assert "args=[1]" in warning_message - assert "kwargs={'source': 'test'}" in warning_message finally: connector.shutdown() diff --git a/tests/unit_tests/test_rpc_base.py b/tests/unit_tests/test_rpc_base.py index 7fc44cf4e..ac6686542 100644 --- a/tests/unit_tests/test_rpc_base.py +++ b/tests/unit_tests/test_rpc_base.py @@ -76,8 +76,6 @@ def test_run_rpc_logs_response_timeout(monkeypatch): assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"] assert info_mock.call_count == 1 info_message = info_mock.call_args.args[0] - assert "args=(42,)" in info_message - assert "kwargs={'precision': 2}" in info_message error_mock.assert_called_once() error_message = error_mock.call_args.args[0] assert "GUI RPC response timeout" in error_message @@ -85,5 +83,3 @@ def test_run_rpc_logs_response_timeout(monkeypatch): assert "target_gui_id=progress_widget" in error_message assert "object_name=progressbar" in error_message assert "timeout=0" in error_message - assert "args=(42,)" in error_message - assert "kwargs={'precision': 2}" in error_message diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 03cfd8c17..6ef015316 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -198,15 +198,9 @@ def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch): assert "target_gui_id=ring" in received_log assert "timeout=0.1" in received_log assert "stale_on_receive=True" in received_log - assert "args=[1]" in received_log - assert "kwargs={'source': 'test'}" in received_log assert "response_after_client_deadline=True" in executed_log - assert "args=[1]" in executed_log - assert "kwargs={'source': 'test'}" in executed_log assert "received request after client timeout deadline" in warning_logs assert "response is late for client timeout" in warning_logs - assert "args=[1]" in warning_logs - assert "kwargs={'source': 'test'}" in warning_logs def test_run_rpc_delegates_to_rpc_content_class(rpc_server): From 593fd3cdd71480c02ec17349d5f2845f51b8ff29 Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Thu, 28 May 2026 14:26:01 +0200 Subject: [PATCH 10/12] fix(rpc): more robust shutdown section with PID logging --- bec_widgets/applications/companion_app.py | 84 +++++++++--- bec_widgets/cli/client_utils.py | 155 +++++++++++++++++++++- bec_widgets/utils/rpc_server.py | 20 ++- tests/unit_tests/test_client_utils.py | 80 +++++++++++ tests/unit_tests/test_rpc_server.py | 47 +++++++ 5 files changed, 359 insertions(+), 27 deletions(-) diff --git a/bec_widgets/applications/companion_app.py b/bec_widgets/applications/companion_app.py index f229b6774..43524194a 100644 --- a/bec_widgets/applications/companion_app.py +++ b/bec_widgets/applications/companion_app.py @@ -5,6 +5,7 @@ import os import signal import sys +import traceback from contextlib import redirect_stderr, redirect_stdout import darkdetect @@ -63,6 +64,7 @@ def __init__(self, args): self.app: QApplication | None = None self.launcher_window: LaunchWindow | None = None self.dispatcher: BECDispatcher | None = None + self._shutdown_started = False def start(self): """ @@ -123,17 +125,8 @@ def _run(self): self.app.aboutToQuit.connect(self.shutdown) self.app.setQuitOnLastWindowClosed(True) - def sigint_handler(*args): - # display message, for people to let it terminate gracefully - print("Caught SIGINT, exiting") - # Widgets should be all closed. - with RPCRegister.delayed_broadcast(): - for widget in QApplication.instance().topLevelWidgets(): # type: ignore - widget.close() - self.shutdown() - - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGINT, self.request_shutdown) + signal.signal(signal.SIGTERM, self.request_shutdown) sys.exit(self.app.exec()) @@ -150,16 +143,67 @@ def setup_bec_icon(self): ) self.app.setWindowIcon(icon) + def request_shutdown(self, signum=None, _frame=None): + """ + Request Qt application shutdown from an RPC call or OS signal. + + Cleanup itself is handled by ``shutdown()``, which is connected to + ``QApplication.aboutToQuit``. Calling it directly here would run BEC/RPC + teardown before Qt has processed the widget close events. + """ + signal_name = signal.Signals(signum).name if signum is not None else "shutdown" + pid = os.getpid() + if self.app is None: + logger.info(f"Caught {signal_name}, shutting down GUI server pid={pid} without app") + self.shutdown() + return + + widgets = [ + f"{widget.__class__.__name__}(objectName={widget.objectName()!r})" + for widget in self.app.topLevelWidgets() + ] + logger.info( + f"Caught {signal_name}, requesting GUI server shutdown pid={pid} " + f"top_level_widgets={widgets}" + ) + with RPCRegister.delayed_broadcast(): + for widget in self.app.topLevelWidgets(): + widget.close() + self.app.quit() + + @staticmethod + def _run_shutdown_step(step: str, callback): + try: + callback() + except Exception as exc: + logger.error( + f"GUIServer shutdown step failed pid={os.getpid()} step={step}: {exc}\n" + f"{traceback.format_exc()}" + ) + def shutdown(self): - logger.info("Shutdown GUIServer", repr(self)) - if self.launcher_window and shiboken6.isValid(self.launcher_window): - self.launcher_window.close() - self.launcher_window.deleteLater() - if pylsp_server.is_running(): - pylsp_server.stop() - if self.dispatcher: - self.dispatcher.stop_cli_server() - self.dispatcher.disconnect_all() + if self._shutdown_started: + return + self._shutdown_started = True + logger.info(f"Shutdown GUIServer pid={os.getpid()} {repr(self)}") + + def close_launcher_window(): + if self.launcher_window and shiboken6.isValid(self.launcher_window): + self.launcher_window.close() + self.launcher_window.deleteLater() + + def stop_pylsp_server(): + if pylsp_server.is_running(): + pylsp_server.stop() + + def stop_dispatcher(): + if self.dispatcher: + self.dispatcher.stop_cli_server() + self.dispatcher.disconnect_all() + + self._run_shutdown_step("close_launcher_window", close_launcher_window) + self._run_shutdown_step("stop_pylsp_server", stop_pylsp_server) + self._run_shutdown_step("stop_dispatcher", stop_dispatcher) def main(): diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 551e41b85..574438058 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -5,6 +5,7 @@ import json import os import select +import signal import subprocess import threading import time @@ -33,6 +34,9 @@ logger = bec_logger.logger IGNORE_WIDGETS = ["LaunchWindow"] +PROCESS_TERMINATION_TIMEOUT = 10 +PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 +GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 RegistryState: TypeAlias = dict[ Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"], @@ -75,6 +79,123 @@ def _get_output(process, logger) -> None: logger.error(f"Error reading process output: {str(e)}") +def _process_group_id(process) -> int | None: + pid = getattr(process, "pid", None) + if os.name != "posix" or not isinstance(pid, int): + return None + try: + return os.getpgid(pid) + except ProcessLookupError: + return None + + +def _process_details(process) -> str: + args = getattr(process, "args", None) + if isinstance(args, list): + command = " ".join(str(arg) for arg in args) + else: + command = str(args) + return ( + f"pid={getattr(process, 'pid', None)} pgid={_process_group_id(process)} command={command}" + ) + + +def _process_group_snapshot(process) -> str: + pgid = _process_group_id(process) + if pgid is None: + return "Process group snapshot unavailable: process group no longer exists" + try: + result = subprocess.run( + ["ps", "-o", "pid,ppid,pgid,stat,command", "-g", str(pgid)], + check=False, + capture_output=True, + text=True, + timeout=2, + ) + except Exception as exc: + return f"Process group snapshot unavailable: {exc}" + output = result.stdout.strip() + if not output: + return f"Process group snapshot empty for pgid={pgid}" + return output + + +def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATION_TIMEOUT) -> None: + if process.poll() is not None: + return + + process_details = _process_details(process) + try: + pgid = _process_group_id(process) + if pgid is not None: + logger.info(f"Terminating GUI process group {process_details}") + os.killpg(pgid, signal.SIGTERM) + else: + logger.info(f"Terminating GUI process {process_details}") + process.terminate() + except ProcessLookupError: + process.wait(timeout=timeout) + return + except Exception as exc: + logger.warning( + f"Failed to terminate GUI process group: {exc}; terminating process only. " + f"{process_details}" + ) + process.terminate() + + try: + process.wait(timeout=timeout) + return + except subprocess.TimeoutExpired: + logger.warning( + f"GUI process did not stop within {timeout}s; killing it. " + f"{process_details}\n{_process_group_snapshot(process)}" + ) + + try: + pgid = _process_group_id(process) + if pgid is not None: + os.killpg(pgid, signal.SIGKILL) + else: + process.kill() + except ProcessLookupError as e: + logger.error(f"Failed to kill GUI process group: {e}") + process.wait(timeout=timeout) + return + process.wait(timeout=timeout) + + +def _wait_for_process_exit(process, timeout: float) -> bool: + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + return False + return True + + +def _join_process_output_thread(process, thread: threading.Thread | None, logger) -> None: + if thread is None: + return + thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) + if not thread.is_alive(): + return + + for stream in (process.stdout, process.stderr): + if stream is None: + continue + try: + stream.close() + except OSError as e: + logger.error(f"Failed to close stream {str(e)}") + pass + thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) + if thread.is_alive(): + logger.warning( + "GUI process output reader thread did not stop after process shutdown. " + f"{_process_details(process)}" + ) + + def _start_plot_process( gui_id: str, gui_class_id: str, @@ -465,11 +586,13 @@ def kill_server(self) -> None: if self._process: logger.success("Stopping GUI...") - self._process.terminate() - if self._process_output_processing_thread: - self._process_output_processing_thread.join() - self._process.wait() + if not self._request_server_shutdown(): + _terminate_plot_process(self._process, logger) + _join_process_output_thread( + self._process, self._process_output_processing_thread, logger + ) self._process = None + self._process_output_processing_thread = None # Unregister the registry state self._client.connector.unregister( @@ -488,6 +611,30 @@ def close(self): #### Private methods #### ######################### + def _request_server_shutdown(self) -> bool: + if self._process is None or self._process.poll() is not None: + return True + process_details = _process_details(self._process) + logger.info(f"Requesting graceful GUI shutdown {process_details}") + try: + self.launcher._run_rpc( # pylint: disable=protected-access + "system.shutdown", wait_for_rpc_response=False + ) + except Exception as exc: + logger.warning( + f"Could not request graceful GUI shutdown via RPC: {exc}. " f"{process_details}" + ) + return False + if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT): + logger.info(f"GUI server exited after graceful shutdown {process_details}") + return True + logger.warning( + "GUI server did not exit after graceful shutdown request; " + f"falling back to process termination. {process_details}\n" + f"{_process_group_snapshot(self._process)}" + ) + return False + def _check_if_server_is_alive(self): """Checks if the process is alive""" if self._process is None: diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index af27e4ca9..d3b9cf4b4 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -12,7 +12,7 @@ from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import from qtpy.QtCore import Qt, QTimer -from qtpy.QtWidgets import QWidget +from qtpy.QtWidgets import QApplication, QWidget from redis.exceptions import RedisError from bec_widgets.utils.bec_connector import BECConnector @@ -290,10 +290,23 @@ def _resolve_rpc_target(self, obj, method: str) -> tuple[object, object]: def run_system_rpc(self, method: str, args: list, kwargs: dict): if method == "system.launch_dock_area": return self._launch_dock_area(*args, **kwargs) + if method == "system.shutdown": + return self._shutdown_gui_server() if method == "system.list_capabilities": - return {"system.launch_dock_area": True} + return {"system.launch_dock_area": True, "system.shutdown": True} raise ValueError(f"Unknown system RPC method: {method}") + @staticmethod + def _shutdown_gui_server() -> None: + app = QApplication.instance() + if app is None: + return + gui_server = getattr(app, "gui_server", None) + if gui_server is not None and hasattr(gui_server, "request_shutdown"): + QTimer.singleShot(0, gui_server.request_shutdown) + return + QTimer.singleShot(0, app.quit) + @staticmethod def _launch_dock_area( name: str | None = None, @@ -468,8 +481,9 @@ def _serialize_bec_connector(self, connector: BECConnector, wait=False) -> dict: container_proxy = parent.gui_id else: container_proxy = None - except Exception: + except Exception as e: container_proxy = None + logger.error(f"Error while serializing RPC result: {e}") if wait and not self.rpc_register.object_is_registered(connector): raise RegistryNotReadyError(f"Connector {connector} not registered yet") diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index 3c489b8a2..efce65bdf 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -1,3 +1,5 @@ +import signal +import subprocess from contextlib import contextmanager from unittest import mock @@ -266,3 +268,81 @@ def test_client_utils_gui_client_set_rpc_timeout(): gui.set_rpc_timeout(10) assert gui._rpc_timeout == 10 + + +def test_client_utils_kill_server_waits_for_process_before_joining_output_thread(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + gui._process = mock.MagicMock(pid=123, stdout=None, stderr=None) + gui._process.poll.return_value = None + order = [] + gui._process.wait.side_effect = lambda timeout: order.append("wait") + gui._process_output_processing_thread = mock.MagicMock() + gui._process_output_processing_thread.join.side_effect = lambda timeout: order.append("join") + gui._process_output_processing_thread.is_alive.return_value = False + + with ( + mock.patch.object(gui, "_request_server_shutdown", return_value=False), + mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123), + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + ): + gui.kill_server() + + killpg.assert_called_once_with(123, signal.SIGTERM) + assert order == ["wait", "join"] + assert gui._process is None + assert gui._process_output_processing_thread is None + + +def test_client_utils_kill_server_requests_graceful_shutdown_before_signal(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + process = mock.MagicMock(stdout=None, stderr=None) + process.poll.return_value = None + gui._process = process + gui._process_output_processing_thread = mock.MagicMock() + gui._process_output_processing_thread.is_alive.return_value = False + launcher = mock.MagicMock() + + with ( + mock.patch.object( + BECGuiClient, "launcher", new_callable=mock.PropertyMock + ) as launcher_prop, + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + ): + launcher_prop.return_value = launcher + gui.kill_server() + + launcher._run_rpc.assert_called_once_with("system.shutdown", wait_for_rpc_response=False) + process.wait.assert_called_once_with(timeout=5) + killpg.assert_not_called() + assert gui._process is None + assert gui._process_output_processing_thread is None + + +def test_client_utils_kill_server_kills_process_group_after_timeout(): + gui = BECGuiClient() + gui._client = mock.MagicMock() + process = mock.MagicMock(pid=123, stdout=None, stderr=None, args=["bec-gui-server"]) + process.poll.return_value = None + process.wait.side_effect = [subprocess.TimeoutExpired(cmd="bec-gui-server", timeout=10), None] + gui._process = process + + with ( + mock.patch.object(gui, "_request_server_shutdown", return_value=False), + mock.patch("bec_widgets.cli.client_utils.os.getpgid", return_value=123), + mock.patch("bec_widgets.cli.client_utils.os.killpg") as killpg, + mock.patch("bec_widgets.cli.client_utils.subprocess.run") as run, + ): + run.return_value.stdout = "PID PPID PGID STAT COMMAND\n123 1 123 S bec-gui-server" + gui.kill_server() + + assert killpg.call_args_list == [mock.call(123, signal.SIGTERM), mock.call(123, signal.SIGKILL)] + assert process.wait.call_args_list == [mock.call(timeout=10), mock.call(timeout=10)] + run.assert_called_once_with( + ["ps", "-o", "pid,ppid,pgid,stat,command", "-g", "123"], + check=False, + capture_output=True, + text=True, + timeout=2, + ) diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 6ef015316..8b71d8f71 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -5,6 +5,7 @@ from bec_lib.service_config import ServiceConfig from qtpy.QtWidgets import QWidget +from bec_widgets.applications import companion_app as companion_app_module from bec_widgets.applications.companion_app import GUIServer from bec_widgets.utils import rpc_server as rpc_server_module from bec_widgets.utils.bec_connector import BECConnector @@ -59,6 +60,52 @@ def test_gui_server_get_service_config(gui_server): assert gui_server._get_service_config().config == ServiceConfig().config +def test_gui_server_signal_shutdown_closes_widgets_and_quits_app(gui_server): + widget = MagicMock() + gui_server.app = MagicMock() + gui_server.app.topLevelWidgets.return_value = [widget] + + gui_server.request_shutdown() + + widget.close.assert_called_once() + gui_server.app.quit.assert_called_once() + + +def test_gui_server_shutdown_is_idempotent(gui_server): + gui_server.launcher_window = MagicMock() + gui_server.dispatcher = MagicMock() + + with ( + patch.object(companion_app_module.shiboken6, "isValid", return_value=True), + patch.object(companion_app_module.pylsp_server, "is_running", return_value=False), + ): + gui_server.shutdown() + gui_server.shutdown() + + gui_server.launcher_window.close.assert_called_once() + gui_server.launcher_window.deleteLater.assert_called_once() + gui_server.dispatcher.stop_cli_server.assert_called_once() + gui_server.dispatcher.disconnect_all.assert_called_once() + + +def test_rpc_server_system_capabilities_include_shutdown(rpc_server): + assert rpc_server.run_system_rpc("system.list_capabilities", [], {}) == { + "system.launch_dock_area": True, + "system.shutdown": True, + } + + +def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qapp): + gui_server = MagicMock() + qapp.gui_server = gui_server + + rpc_server.run_system_rpc("system.shutdown", [], {}) + qapp.processEvents() + + gui_server.request_shutdown.assert_called_once() + del qapp.gui_server + + def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server): """ Test that a singleshot RPC method raises an error when called multiple times. From 674048d810c0feff67fa4e9526e2beb4feb7229e Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Thu, 28 May 2026 14:35:28 +0200 Subject: [PATCH 11/12] fix(client_utils): stop output reader thread on shutdown --- bec_widgets/cli/client_utils.py | 22 +++++++++++++++++----- tests/end-2-end/test_rpc_widgets_e2e.py | 2 +- tests/unit_tests/test_client_utils.py | 25 ++++++++++++++++++++++++- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 574438058..ec05ba9fd 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -36,7 +36,9 @@ IGNORE_WIDGETS = ["LaunchWindow"] PROCESS_TERMINATION_TIMEOUT = 10 PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 +PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2 GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 +OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event" RegistryState: TypeAlias = dict[ Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"], @@ -57,14 +59,16 @@ def _filter_output(output: str) -> str: return output -def _get_output(process, logger) -> None: +def _get_output(process, logger, stop_event: threading.Event | None = None) -> None: log_func = {process.stdout: logger.debug, process.stderr: logger.info} stream_buffer = {process.stdout: [], process.stderr: []} try: os.set_blocking(process.stdout.fileno(), False) os.set_blocking(process.stderr.fileno(), False) - while process.poll() is None: - readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1) + while process.poll() is None and not (stop_event and stop_event.is_set()): + readylist, _, _ = select.select( + [process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT + ) for stream in (process.stdout, process.stderr): buf = stream_buffer[stream] if stream in readylist: @@ -180,6 +184,9 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger if not thread.is_alive(): return + if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None): + stop_event.set() + for stream in (process.stdout, process.stderr): if stream is None: continue @@ -187,7 +194,6 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger stream.close() except OSError as e: logger.error(f"Failed to close stream {str(e)}") - pass thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) if thread.is_alive(): logger.warning( @@ -247,8 +253,14 @@ def _start_plot_process( if logger is None: process_output_processing_thread = None else: + process_output_stop_event = threading.Event() process_output_processing_thread = threading.Thread( - target=_get_output, args=(process, logger) + target=_get_output, args=(process, logger, process_output_stop_event) + ) + setattr( + process_output_processing_thread, + OUTPUT_READER_STOP_EVENT_ATTR, + process_output_stop_event, ) process_output_processing_thread.start() return process, process_output_processing_thread diff --git a/tests/end-2-end/test_rpc_widgets_e2e.py b/tests/end-2-end/test_rpc_widgets_e2e.py index 19e8109fc..5652701b7 100644 --- a/tests/end-2-end/test_rpc_widgets_e2e.py +++ b/tests/end-2-end/test_rpc_widgets_e2e.py @@ -69,7 +69,7 @@ def create_widget( return widget -@pytest.mark.timeout(100) +@pytest.mark.timeout(20) def test_available_widgets(qtbot, connected_client_gui_obj): """This test checks that all widgets that are available via gui.available_widgets can be created and removed.""" gui = connected_client_gui_obj diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index efce65bdf..2afdcf5b9 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -6,7 +6,12 @@ import pytest from bec_widgets.cli.client import BECDockArea -from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process +from bec_widgets.cli.client_utils import ( + OUTPUT_READER_STOP_EVENT_ATTR, + BECGuiClient, + _join_process_output_thread, + _start_plot_process, +) from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout @@ -346,3 +351,21 @@ def test_client_utils_kill_server_kills_process_group_after_timeout(): text=True, timeout=2, ) + + +def test_join_process_output_thread_signals_reader_before_closing_streams(): + process = mock.MagicMock(pid=123, args=["bec-gui-server"]) + process.stdout = mock.MagicMock() + process.stderr = mock.MagicMock() + thread = mock.MagicMock() + stop_event = mock.MagicMock() + setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event) + thread.is_alive.side_effect = [True, False] + logger = mock.MagicMock() + + _join_process_output_thread(process, thread, logger) + + assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)] + stop_event.set.assert_called_once_with() + process.stdout.close.assert_called_once_with() + process.stderr.close.assert_called_once_with() From 64d9f25c92271746a086c9118c237153a8dbfffd Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 29 May 2026 12:09:22 +0200 Subject: [PATCH 12/12] fix(rpc): client/server rpc handshake for shutdown --- bec_widgets/cli/client_utils.py | 33 +++++++++++++++------------ bec_widgets/utils/rpc_server.py | 28 +++++++++++++++++++++++ tests/unit_tests/test_client_utils.py | 5 +++- tests/unit_tests/test_rpc_server.py | 16 +++++++++++++ 4 files changed, 67 insertions(+), 15 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index ec05ba9fd..f32f73a6a 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -37,6 +37,7 @@ PROCESS_TERMINATION_TIMEOUT = 10 PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2 +GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3 GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event" @@ -141,19 +142,18 @@ def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATIO process.wait(timeout=timeout) return except Exception as exc: - logger.warning( - f"Failed to terminate GUI process group: {exc}; terminating process only. " - f"{process_details}" - ) + logger.warning("Failed to terminate GUI process group; terminating process only.") + logger.info(f"GUI process termination failure details: {exc}. {process_details}") process.terminate() try: process.wait(timeout=timeout) return except subprocess.TimeoutExpired: - logger.warning( - f"GUI process did not stop within {timeout}s; killing it. " - f"{process_details}\n{_process_group_snapshot(process)}" + logger.warning(f"GUI process did not stop within {timeout}s; killing it.") + logger.info( + f"GUI process force-kill details: {process_details}\n" + f"{_process_group_snapshot(process)}" ) try: @@ -196,10 +196,8 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger logger.error(f"Failed to close stream {str(e)}") thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) if thread.is_alive(): - logger.warning( - "GUI process output reader thread did not stop after process shutdown. " - f"{_process_details(process)}" - ) + logger.warning("GUI process output reader thread did not stop after process shutdown.") + logger.info(f"GUI process output reader thread details: {_process_details(process)}") def _start_plot_process( @@ -630,19 +628,26 @@ def _request_server_shutdown(self) -> bool: logger.info(f"Requesting graceful GUI shutdown {process_details}") try: self.launcher._run_rpc( # pylint: disable=protected-access - "system.shutdown", wait_for_rpc_response=False + "system.shutdown", + wait_for_rpc_response=True, + timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, ) except Exception as exc: logger.warning( - f"Could not request graceful GUI shutdown via RPC: {exc}. " f"{process_details}" + "Could not confirm graceful GUI shutdown via RPC; " + "falling back to process termination." ) + logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}") return False if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT): logger.info(f"GUI server exited after graceful shutdown {process_details}") return True logger.warning( "GUI server did not exit after graceful shutdown request; " - f"falling back to process termination. {process_details}\n" + "falling back to process termination." + ) + logger.info( + f"Graceful GUI shutdown timeout details: {process_details}\n" f"{_process_group_snapshot(self._process)}" ) return False diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index d3b9cf4b4..a9446c680 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -143,6 +143,34 @@ def on_rpc_update(self, msg: dict, metadata: dict): f"receive_latency_s={format_elapsed(receive_latency)}" ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") + + # Shutdown must acknowledge before teardown starts. The generic RPC path + # below publishes successful responses through QTimer.singleShot(0); + # for system.shutdown that would race with the queued app quit and + # dispatcher shutdown scheduled by _shutdown_gui_server(). + if method == "system.shutdown": + execution_start = time.perf_counter() + try: + self.run_system_rpc(method, args, kwargs) + except Exception: + execution_duration = time.perf_counter() - execution_start + content = traceback.format_exc() + logger.error( + "GUI RPC server shutdown request failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}\n{content}" + ) + self.send_response(request_id, False, {"error": content}) + else: + execution_duration = time.perf_counter() - execution_start + logger.info( + "GUI RPC server acknowledged shutdown request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}" + ) + self.send_response(request_id, True, {"result": None}) + return + execution_start = time.perf_counter() with rpc_exception_hook(functools.partial(self.send_response, request_id, False)): try: diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index 2afdcf5b9..5cf5750f6 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -7,6 +7,7 @@ from bec_widgets.cli.client import BECDockArea from bec_widgets.cli.client_utils import ( + GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, OUTPUT_READER_STOP_EVENT_ATTR, BECGuiClient, _join_process_output_thread, @@ -318,7 +319,9 @@ def test_client_utils_kill_server_requests_graceful_shutdown_before_signal(): launcher_prop.return_value = launcher gui.kill_server() - launcher._run_rpc.assert_called_once_with("system.shutdown", wait_for_rpc_response=False) + launcher._run_rpc.assert_called_once_with( + "system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT + ) process.wait.assert_called_once_with(timeout=5) killpg.assert_not_called() assert gui._process is None diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 8b71d8f71..fa8ebf14d 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -106,6 +106,22 @@ def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qap del qapp.gui_server +def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server): + order = [] + rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown")) + rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response")) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + {"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}}, + {"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2}, + ) + + assert order == ["shutdown", "response"] + rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None}) + rpc_server.serialize_result_and_send.assert_not_called() + + def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server): """ Test that a singleshot RPC method raises an error when called multiple times.