diff --git a/bec_lib/bec_lib/bl_states.py b/bec_lib/bec_lib/bl_states.py index 21058dc8d..6eb77e632 100644 --- a/bec_lib/bec_lib/bl_states.py +++ b/bec_lib/bec_lib/bl_states.py @@ -13,6 +13,7 @@ from bec_lib.device import DeviceBase, Signal from bec_lib.devicemanager import DeviceManagerBase from bec_lib.endpoints import MessageEndpoints +from bec_lib.messaging_hooks import MessagingEvent from bec_lib.redis_connector import MessageObject, RedisConnector @@ -204,6 +205,9 @@ def _handle_state_exception(self, exc: Exception) -> None: compact_error_message=info, ) self.connector.raise_alarm(severity=Alarms.WARNING, info=error_info) + self.connector.notify( + MessagingEvent.INVALID_STATE, f"Beamline state invalid: {self.config.name}" + ) out = messages.BeamlineStateMessage(name=self.config.name, status="unknown", label=info) self._emit_state(out) diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 930e3bbb0..b63185c51 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1957,6 +1957,38 @@ def available_messaging_services(): message_op=MessageOp.STREAM, ) + @staticmethod + def notification(event_type: str): + """ + Endpoint for transient notification events that SciHub can route to + configured messaging services. + + Args: + event_type (str): Notification event name such as ``new_scan``. + + Returns: + EndpointInfo: Endpoint for notification events. + """ + endpoint = f"{EndpointType.INTERNAL.value}/messaging_services/notification/{event_type}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.NotificationMessage, message_op=MessageOp.SEND + ) + + @staticmethod + def notification_config(): + """ + Endpoint for persisted notification routing configuration. + + Returns: + EndpointInfo: Endpoint for notification routing config. + """ + endpoint = f"{EndpointType.USER.value}/messaging_services/notification_config" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.NotificationConfigMessage, + message_op=MessageOp.SET_PUBLISH, + ) + @staticmethod def message_service_ingest(deployment_name: str): """ diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index 761c26f64..a5962ee84 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1728,6 +1728,39 @@ class MessagingConfig(BaseModel): scilog: MessagingServiceScopeConfig +class NotificationServiceTarget(BaseModel): + service_name: Literal["signal", "teams", "scilog"] + scope: str | list[str] | None = None + + +class NotificationMessage(BECMessage): + """ + Message for notification events that should be routed through configured + messaging services. + + Args: + message (str): Notification body text. + """ + + msg_type: ClassVar[str] = "notification_message" + message: str + + +class NotificationConfigMessage(BECMessage): + """ + Routing configuration for notification events. + + Args: + routes: Mapping of event name to messaging service targets. + """ + + msg_type: ClassVar[str] = "notification_config_message" + routes: dict[ + Literal["new_scan", "scan_completed", "alarm", "invalid_state"], + list[NotificationServiceTarget], + ] = Field(default_factory=dict) + + AvailableMessagingServices = Annotated[ Union[SignalServiceInfo, SciLogServiceInfo, TeamsServiceInfo], Field(discriminator="service_type"), diff --git a/bec_lib/bec_lib/messaging_hooks.py b/bec_lib/bec_lib/messaging_hooks.py new file mode 100644 index 000000000..2747d1fe5 --- /dev/null +++ b/bec_lib/bec_lib/messaging_hooks.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import enum +from typing import TYPE_CHECKING, cast + +from bec_lib import messages +from bec_lib.connector import MessageObject +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.messaging_services import ( + SciLogMessagingService, + SignalMessagingService, + TeamsMessagingService, +) + +if TYPE_CHECKING: + from bec_lib.redis_connector import RedisConnector + +logger = bec_logger.logger + + +class MessagingEvent(str, enum.Enum): + """ + Enumeration of messaging events that can trigger configured hooks. + """ + + SCAN = "new_scan" + SCAN_COMPLETED = "scan_completed" + ALARM = "alarm" + INVALID_STATE = "invalid_state" + + +class MessagingManager: + """ + Manage notification routing from internal events to concrete messaging + services. + """ + + def __init__(self, connector: RedisConnector): + self.connector = connector + self.config: dict[MessagingEvent, list[messages.NotificationServiceTarget]] = {} + self.signal = SignalMessagingService(self.connector) + self.scilog = SciLogMessagingService(self.connector) + self.teams = TeamsMessagingService(self.connector) + self._service_by_name = {"signal": self.signal, "scilog": self.scilog, "teams": self.teams} + + self.connector.register( + patterns=MessageEndpoints.notification("*"), cb=self._handle_notification + ) + self.connector.register( + topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config + ) + + config_msg = self.connector.get(MessageEndpoints.notification_config()) + if config_msg is not None: + self.on_notification_config(config_msg) + + def _handle_notification(self, msg_obj: MessageObject[messages.NotificationMessage], **_kwargs): + prefix = MessageEndpoints.notification("").endpoint + event_type_str = msg_obj.topic.removeprefix(prefix) + try: + event_type = MessagingEvent(event_type_str) + except ValueError: + logger.warning(f"Unknown notification event received on topic {msg_obj.topic}") + return + self.on_notification(event_type, cast(messages.NotificationMessage, msg_obj.value)) + + def _handle_notification_config( + self, msg_obj: MessageObject[messages.NotificationConfigMessage], **_kwargs + ): + self.on_notification_config(cast(messages.NotificationConfigMessage, msg_obj.value)) + + def on_notification( + self, event_type: MessagingEvent, message: messages.NotificationMessage + ) -> None: + routes = self.config.get(event_type, []) + for route in routes: + service = self._service_by_name.get(route.service_name) + if service is None: + logger.warning(f"Unknown messaging service: {route.service_name}") + continue + try: + service.new().add_text(message.message).send(scope=route.scope) + except RuntimeError as exc: + logger.warning( + f"Failed to send notification for {event_type.value} via {route.service_name}: {exc}" + ) + + def on_notification_config(self, message: messages.NotificationConfigMessage) -> None: + config: dict[MessagingEvent, list[messages.NotificationServiceTarget]] = {} + for event_name, targets in message.routes.items(): + config[MessagingEvent(event_name)] = targets + self.config = config + + def shutdown(self) -> None: + self.connector.unregister( + patterns=MessageEndpoints.notification("*"), cb=self._handle_notification + ) + self.connector.unregister( + topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config + ) diff --git a/bec_lib/bec_lib/redis_connector.py b/bec_lib/bec_lib/redis_connector.py index 7de6b24c1..1afa479cb 100644 --- a/bec_lib/bec_lib/redis_connector.py +++ b/bec_lib/bec_lib/redis_connector.py @@ -58,6 +58,7 @@ DynamicMetricMessage, ErrorInfo, ) +from bec_lib.messaging_hooks import MessagingEvent from bec_lib.serialization import MsgpackSerialization logger = bec_logger.logger @@ -655,6 +656,23 @@ def raise_alarm(self, severity: Alarms, info: ErrorInfo, metadata: dict | None = """ alarm_msg = AlarmMessage(severity=severity, info=info, metadata=metadata or {}) self.set_and_publish(MessageEndpoints.alarm(), alarm_msg) + compact_message = info.compact_error_message or info.error_message or info.exception_type + self.notify(MessagingEvent.ALARM, compact_message) + + def notify(self, event: MessagingEvent, message: str, pipe: Pipeline | None = None) -> None: + """ + Publish a notification event for downstream routing by SciHub. + + Args: + event: Notification event type. + message: Notification body text. + pipe: Optional pipeline to enqueue the publish operation into. + """ + self.send( + MessageEndpoints.notification(event.value), + messages.NotificationMessage(message=message), + pipe=pipe, + ) def pipeline(self) -> redis.client.Pipeline: """Create a new pipeline""" diff --git a/bec_lib/bec_lib/tests/utils.py b/bec_lib/bec_lib/tests/utils.py index 3e0a879ec..4995dd32a 100644 --- a/bec_lib/bec_lib/tests/utils.py +++ b/bec_lib/bec_lib/tests/utils.py @@ -561,6 +561,13 @@ def send(self, topic, msg, pipe=None): raise TypeError("Message must be a BECMessage") return self.raw_send(topic, msg, pipe) + def notify(self, event, message: str, pipe=None): + return self.send( + MessageEndpoints.notification(event.value), + messages.NotificationMessage(message=message), + pipe=pipe, + ) + def set_and_publish(self, topic, msg, pipe=None, expire: int = None): if pipe: pipe._pipe_buffer.append(("set_and_publish", (topic.endpoint, msg), {"expire": expire})) diff --git a/bec_lib/tests/test_beamline_states.py b/bec_lib/tests/test_beamline_states.py index e6a936e8e..50b570b55 100644 --- a/bec_lib/tests/test_beamline_states.py +++ b/bec_lib/tests/test_beamline_states.py @@ -144,6 +144,28 @@ def test_update_device_state_publishes_when_state_changes( assert out is not None assert out[0]["data"].status == "valid" + def test_state_exception_publishes_invalid_state_notification( + self, connected_connector, dm_with_devices + ): + state = bl_states.ShutterState( + name="shutter_open", + device="samx", + signal="samx", + redis_connector=connected_connector, + device_manager=dm_with_devices, + ) + + with ( + mock.patch.object(connected_connector, "raise_alarm") as raise_alarm, + mock.patch.object(connected_connector, "notify") as notify, + ): + state._handle_state_exception(RuntimeError("device state unavailable")) + + raise_alarm.assert_called_once() + notify.assert_called_once_with( + bl_states.MessagingEvent.INVALID_STATE, "Beamline state invalid: shutter_open" + ) + class TestConcreteStates: def test_shutter_state_open_and_closed(self, connected_connector, dm_with_devices): diff --git a/bec_lib/tests/test_bec_messages.py b/bec_lib/tests/test_bec_messages.py index 01c59b601..cd781cddc 100644 --- a/bec_lib/tests/test_bec_messages.py +++ b/bec_lib/tests/test_bec_messages.py @@ -134,6 +134,31 @@ def test_ClientInfoMessage_raises(): ) +def test_NotificationMessage(): + msg = messages.NotificationMessage(message="Scan started") + res = MsgpackSerialization.dumps(msg) + res_loaded = MsgpackSerialization.loads(res) + assert res_loaded == msg + + +def test_NotificationConfigMessage(): + msg = messages.NotificationConfigMessage( + routes={ + "new_scan": [ + messages.NotificationServiceTarget(service_name="scilog", scope="logbook") + ], + "alarm": [ + messages.NotificationServiceTarget( + service_name="signal", scope=["+41791234567", "+41797654321"] + ) + ], + } + ) + res = MsgpackSerialization.dumps(msg) + res_loaded = MsgpackSerialization.loads(res) + assert res_loaded == msg + + def test_DeviceRPCMessage(): msg = messages.DeviceRPCMessage( device="samx", return_val=1, out="done", success=True, metadata={"RID": "1234"} diff --git a/bec_lib/tests/test_messaging_hooks.py b/bec_lib/tests/test_messaging_hooks.py new file mode 100644 index 000000000..9c00faeaf --- /dev/null +++ b/bec_lib/tests/test_messaging_hooks.py @@ -0,0 +1,86 @@ +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints, MessageOp +from bec_lib.messaging_hooks import MessagingEvent, MessagingManager + + +def _available_services_message(): + return messages.AvailableMessagingServicesMessage( + config=messages.MessagingConfig( + signal=messages.MessagingServiceScopeConfig(enabled=True, default=None), + teams=messages.MessagingServiceScopeConfig(enabled=False, default=None), + scilog=messages.MessagingServiceScopeConfig(enabled=True, default=None), + ), + deployment_services=[ + messages.SciLogServiceInfo( + id="scilog-default", scope="logbook", enabled=True, logbook_id="lb-1" + ) + ], + session_services=[], + ) + + +def test_notification_endpoints(): + event_endpoint = MessageEndpoints.notification("new_scan") + config_endpoint = MessageEndpoints.notification_config() + + assert event_endpoint.endpoint == "internal/messaging_services/notification/new_scan" + assert event_endpoint.message_type is messages.NotificationMessage + assert event_endpoint.message_op == MessageOp.SEND + + assert config_endpoint.endpoint == "user/messaging_services/notification_config" + assert config_endpoint.message_type is messages.NotificationConfigMessage + assert config_endpoint.message_op == MessageOp.SET_PUBLISH + + +def test_messaging_manager_loads_initial_config(connected_connector): + config_msg = messages.NotificationConfigMessage( + routes={ + "new_scan": [messages.NotificationServiceTarget(service_name="scilog", scope="logbook")] + } + ) + connected_connector.set_and_publish(MessageEndpoints.notification_config(), config_msg) + + manager = MessagingManager(connected_connector) + try: + assert manager.config == { + MessagingEvent.SCAN: [ + messages.NotificationServiceTarget(service_name="scilog", scope="logbook") + ] + } + finally: + manager.shutdown() + + +def test_messaging_manager_routes_notifications_to_message_service_queue(connected_connector): + manager = MessagingManager(connected_connector) + try: + available_services = _available_services_message() + manager.scilog._on_new_scope_change_msg({"data": available_services}) + manager.signal._on_new_scope_change_msg({"data": available_services}) + manager.teams._on_new_scope_change_msg({"data": available_services}) + + manager.on_notification_config( + messages.NotificationConfigMessage( + routes={ + "new_scan": [ + messages.NotificationServiceTarget(service_name="scilog", scope="logbook") + ] + } + ) + ) + + manager.on_notification( + MessagingEvent.SCAN, messages.NotificationMessage(message="Scan started") + ) + + out = connected_connector.xread(MessageEndpoints.message_service_queue(), from_start=True) + assert len(out) == 1 + sent_message = out[0]["data"] + assert sent_message.service_name == "scilog" + assert sent_message.scope == "logbook" + assert isinstance(sent_message.message[0], messages.MessagingServiceTextContent) + assert sent_message.message[0].content == "Scan started" + assert isinstance(sent_message.message[1], messages.MessagingServiceTagsContent) + assert sent_message.message[1].tags == ["bec"] + finally: + manager.shutdown() diff --git a/bec_lib/tests/test_redis_connector.py b/bec_lib/tests/test_redis_connector.py index 06d0fd039..45666a4d6 100644 --- a/bec_lib/tests/test_redis_connector.py +++ b/bec_lib/tests/test_redis_connector.py @@ -15,6 +15,7 @@ ClientInfoMessage, ProcedureExecutionMessage, ) +from bec_lib.messaging_hooks import MessagingEvent from bec_lib.redis_connector import ( IncompatibleMessageForEndpoint, IncompatibleRedisOperation, @@ -71,7 +72,10 @@ def test_redis_connector_send_client_info(connector): ], ) def test_redis_connector_raise_alarm(connector, severity, alarm_type, msg, compact_msg, metadata): - with mock.patch.object(connector, "set_and_publish", return_value=None): + with ( + mock.patch.object(connector, "set_and_publish", return_value=None), + mock.patch.object(connector, "notify", return_value=None), + ): info = messages.ErrorInfo( error_message=msg, compact_error_message=compact_msg, exception_type=alarm_type ) @@ -80,6 +84,7 @@ def test_redis_connector_raise_alarm(connector, severity, alarm_type, msg, compa connector.set_and_publish.assert_called_once_with( MessageEndpoints.alarm(), AlarmMessage(severity=severity, info=info, metadata=metadata) ) + connector.notify.assert_called_once_with(MessagingEvent.ALARM, compact_msg) @pytest.mark.parametrize( diff --git a/bec_server/bec_server/scan_server/generator_scan_worker.py b/bec_server/bec_server/scan_server/generator_scan_worker.py index 117a3f269..45b382d5e 100644 --- a/bec_server/bec_server/scan_server/generator_scan_worker.py +++ b/bec_server/bec_server/scan_server/generator_scan_worker.py @@ -11,6 +11,7 @@ from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import compile_file_components from bec_lib.logger import bec_logger +from bec_lib.messaging_hooks import MessagingEvent from .errors import DeviceInstructionError, ScanAbortion, UserScanInterruption from .scan_queue import InstructionQueueItem, InstructionQueueStatus, RequestBlock @@ -341,6 +342,14 @@ def _send_scan_status( self.worker.device_manager.connector.set_and_publish( MessageEndpoints.scan_status(), msg, pipe=pipe ) + if status == "open": + self.worker.device_manager.connector.notify( + MessagingEvent.SCAN, f"Scan started: {self.current_scan_id}", pipe=pipe + ) + elif status in {"closed", "user_completed"}: + self.worker.device_manager.connector.notify( + MessagingEvent.SCAN_COMPLETED, f"Scan completed: {self.current_scan_id}", pipe=pipe + ) pipe.execute() def update_instr_with_scan_report(self, instr: messages.DeviceInstructionMessage): diff --git a/bec_server/bec_server/scan_server/scans/scan_actions.py b/bec_server/bec_server/scan_server/scans/scan_actions.py index d8d16e398..1d6b7fb01 100644 --- a/bec_server/bec_server/scan_server/scans/scan_actions.py +++ b/bec_server/bec_server/scan_server/scans/scan_actions.py @@ -14,6 +14,7 @@ from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import compile_file_components from bec_lib.logger import bec_logger +from bec_lib.messaging_hooks import MessagingEvent from bec_server.scan_server.scan_stubs import ScanStubStatus if TYPE_CHECKING: @@ -1062,6 +1063,16 @@ def _send_scan_status( MessageEndpoints.public_scan_info(scan.scan_info.scan_id), msg, pipe=pipe, expire=expire ) self._connector.set_and_publish(MessageEndpoints.scan_status(), msg, pipe=pipe) + if status == "open": + self._connector.notify( + MessagingEvent.SCAN, f"Scan started: {scan.scan_info.scan_id}", pipe=pipe + ) + elif status in {"closed", "user_completed"}: + self._connector.notify( + MessagingEvent.SCAN_COMPLETED, + f"Scan completed: {scan.scan_info.scan_id}", + pipe=pipe, + ) pipe.execute() def _build_scan_status_message( diff --git a/bec_server/bec_server/scihub/scihub.py b/bec_server/bec_server/scihub/scihub.py index a7af08447..903c9786d 100644 --- a/bec_server/bec_server/scihub/scihub.py +++ b/bec_server/bec_server/scihub/scihub.py @@ -4,6 +4,7 @@ from bec_lib import messages from bec_lib.bec_service import BECService +from bec_lib.messaging_hooks import MessagingManager from bec_lib.service_config import ServiceConfig from bec_server.scihub.atlas import AtlasConnector from bec_server.scihub.service_handler.service_handler import ServiceHandler @@ -18,8 +19,10 @@ def __init__(self, config: ServiceConfig, connector_cls: type[RedisConnector]) - self.config = config self.atlas_connector = None self.service_handler = None + self.messaging_manager = None self._start_atlas_connector() self._start_service_handler() + self._start_messaging_manager() self.status = messages.BECStatus.RUNNING def _start_atlas_connector(self): @@ -31,7 +34,12 @@ def _start_service_handler(self): self.service_handler = ServiceHandler(self.connector) self.service_handler.start() + def _start_messaging_manager(self): + self.messaging_manager = MessagingManager(self.connector) + def shutdown(self): super().shutdown() + if self.messaging_manager: + self.messaging_manager.shutdown() if self.atlas_connector: self.atlas_connector.shutdown() diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_scan_actions.py b/bec_server/tests/tests_scan_server/scans_v4/test_scan_actions.py index 8aa4e50db..86431c8e8 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_scan_actions.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_scan_actions.py @@ -8,6 +8,7 @@ from bec_lib import messages from bec_lib.device import ReadoutPriority from bec_lib.endpoints import MessageEndpoints +from bec_lib.messaging_hooks import MessagingEvent from bec_lib.tests.fixtures import dm_with_devices # noqa: F401 from bec_lib.tests.utils import ConnectorMock from bec_server.scan_server.instruction_handler import InstructionHandler @@ -527,6 +528,7 @@ def test_send_scan_status_publishes_message(action_context): ctx.connector.pipeline = mock.MagicMock(return_value=pipe) ctx.connector.set = mock.MagicMock() ctx.connector.set_and_publish = mock.MagicMock() + ctx.connector.notify = mock.MagicMock() status_msg = messages.ScanStatusMessage(scan_id="scan-id-test", status="closed", info={}) ctx.actions._build_scan_status_message = mock.MagicMock(return_value=status_msg) @@ -539,9 +541,29 @@ def test_send_scan_status_publishes_message(action_context): ctx.connector.set_and_publish.assert_called_once_with( MessageEndpoints.scan_status(), status_msg, pipe=pipe ) + ctx.connector.notify.assert_called_once_with( + MessagingEvent.SCAN_COMPLETED, "Scan completed: scan-id-test", pipe=pipe + ) pipe.execute.assert_called_once_with() +def test_send_scan_status_publishes_new_scan_notification(action_context): + ctx = action_context() + pipe = mock.MagicMock() + ctx.connector.pipeline = mock.MagicMock(return_value=pipe) + ctx.connector.set = mock.MagicMock() + ctx.connector.set_and_publish = mock.MagicMock() + ctx.connector.notify = mock.MagicMock() + status_msg = messages.ScanStatusMessage(scan_id="scan-id-test", status="open", info={}) + ctx.actions._build_scan_status_message = mock.MagicMock(return_value=status_msg) + + ctx.actions._send_scan_status("open") + + ctx.connector.notify.assert_called_once_with( + MessagingEvent.SCAN, "Scan started: scan-id-test", pipe=pipe + ) + + def test_get_file_base_path_uses_account_and_templates(action_context): ctx = action_context() ctx.device_manager.parent = _TestParent("/tmp/data") diff --git a/bec_server/tests/tests_scan_server/test_generator_scan_worker.py b/bec_server/tests/tests_scan_server/test_generator_scan_worker.py index bd183c96d..a2a522f60 100644 --- a/bec_server/tests/tests_scan_server/test_generator_scan_worker.py +++ b/bec_server/tests/tests_scan_server/test_generator_scan_worker.py @@ -413,6 +413,22 @@ def test_send_scan_status(generator_worker_mock, status, expire): ] assert len(scan_info_msgs) == 1 assert scan_info_msgs[0]["expire"] == expire + notification_msgs = [ + msg + for msg in worker.worker.device_manager.connector.message_sent + if msg["queue"] + == MessageEndpoints.notification( + "new_scan" if status == "open" else "scan_completed" + ).endpoint + ] + if status == "open": + assert len(notification_msgs) == 1 + assert notification_msgs[0]["msg"].message == f"Scan started: {worker.current_scan_id}" + elif status == "closed": + assert len(notification_msgs) == 1 + assert notification_msgs[0]["msg"].message == f"Scan completed: {worker.current_scan_id}" + else: + assert notification_msgs == [] @pytest.mark.parametrize("abortion", [False, True]) diff --git a/bec_server/tests/tests_scihub/test_scihub_cli_launch.py b/bec_server/tests/tests_scihub/test_scihub_cli_launch.py index c8ca81ebf..cd0af333b 100644 --- a/bec_server/tests/tests_scihub/test_scihub_cli_launch.py +++ b/bec_server/tests/tests_scihub/test_scihub_cli_launch.py @@ -1,6 +1,9 @@ from unittest import mock +from bec_lib.service_config import ServiceConfig +from bec_lib.tests.utils import ConnectorMock from bec_server.scihub.cli.launch import main +from bec_server.scihub.scihub import SciHub def test_main(): @@ -27,3 +30,28 @@ def test_main_shutdown(): mock_scihub.assert_called_once() mock_event.assert_called_once() mock_scihub.return_value.shutdown.assert_called_once() + + +def test_scihub_starts_and_stops_messaging_manager(): + config = ServiceConfig( + redis={"host": "dummy", "port": 6379}, + service_config={ + "file_writer": {"plugin": "default_NeXus_format", "base_path": "./"}, + "log_writer": {"base_path": "./"}, + }, + ) + + with ( + mock.patch.object(SciHub, "_start_metrics_emitter"), + mock.patch.object(SciHub, "wait_for_service"), + mock.patch("bec_server.scihub.scihub.AtlasConnector"), + mock.patch("bec_server.scihub.scihub.ServiceHandler"), + mock.patch("bec_server.scihub.scihub.MessagingManager") as mock_messaging_manager, + ): + scihub = SciHub(config, ConnectorMock) + try: + mock_messaging_manager.assert_called_once_with(scihub.connector) + finally: + scihub.shutdown() + + mock_messaging_manager.return_value.shutdown.assert_called_once()