From b92a9f6d63d8b8cf590bfd20734a88a381dd6a9c Mon Sep 17 00:00:00 2001 From: mrveiss Date: Fri, 3 Apr 2026 23:33:03 +0300 Subject: [PATCH 1/2] feat(monitoring): wire HealthCollector state-change events to workflow trigger + notification pipeline (#3404) Co-Authored-By: Claude Sonnet 4.6 --- .../services/notification_service.py | 4 + .../service_health_monitor.yaml | 79 +++++++ .../slm/agent/health_collector.py | 87 ++++++++ .../health_collector_state_change_test.py | 192 ++++++++++++++++++ docs/examples/service_failure_monitoring.py | 138 +++++++++++++ docs/user/guides/workflows.md | 63 ++++++ 6 files changed, 563 insertions(+) create mode 100644 autobot-backend/workflow_templates/service_health_monitor.yaml create mode 100644 autobot-slm-backend/slm/agent/health_collector_state_change_test.py create mode 100644 docs/examples/service_failure_monitoring.py diff --git a/autobot-backend/services/notification_service.py b/autobot-backend/services/notification_service.py index c42b6cdea..cdca6efee 100644 --- a/autobot-backend/services/notification_service.py +++ b/autobot-backend/services/notification_service.py @@ -70,6 +70,7 @@ class NotificationEvent(str, Enum): WORKFLOW_FAILED = "workflow_failed" STEP_FAILED = "step_failed" APPROVAL_NEEDED = "approval_needed" + SERVICE_FAILED = "service_failure" # --------------------------------------------------------------------------- @@ -119,6 +120,9 @@ class NotificationConfig: NotificationEvent.APPROVAL_NEEDED: ( "Workflow '$workflow_id' is waiting for approval at step '$step_name'." ), + NotificationEvent.SERVICE_FAILED: ( + "Service '$service' on '$hostname' transitioned $prev_state -> $new_state. $error_context" + ), } diff --git a/autobot-backend/workflow_templates/service_health_monitor.yaml b/autobot-backend/workflow_templates/service_health_monitor.yaml new file mode 100644 index 000000000..6f3950617 --- /dev/null +++ b/autobot-backend/workflow_templates/service_health_monitor.yaml @@ -0,0 +1,79 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +# +# Workflow Template: Service Health Monitor (#3404) +# +# Triggered by Redis pub/sub events emitted by HealthCollector when a +# systemd service changes state. Use this template to create an +# automated workflow that notifies operators the moment a service fails +# or recovers. + +metadata: + id: service_health_monitor + name: "Service Health Monitor" + description: > + Listens for systemd service state-change events published by the SLM + HealthCollector and dispatches a SERVICE_FAILED notification through + the AutoBot notification pipeline. + version: "1.0.0" + category: monitoring + tags: + - systemd + - health + - alerting + +trigger: + type: REDIS_PUBSUB + # Glob pattern — matches any service on any managed node. + # To restrict to a specific service, replace * with the service name, + # e.g. autobot:services:autobot-backend:state_change + channel: "autobot:services:*:state_change" + # Only fire when the service has entered a failure-like state. + # Remove this filter block to react to all transitions (including recovery). + filter: + field: new_state + operator: in + values: + - failed + - crash-loop + +steps: + - id: notify_service_failure + name: "Send service-failure notification" + type: notification + event: service_failure + # Map pub/sub payload fields to notification template variables. + # The SERVICE_FAILED default template uses: service, hostname, + # prev_state, new_state, error_context. + payload_mapping: + service: "{{ trigger.payload.service }}" + hostname: "{{ trigger.payload.hostname }}" + prev_state: "{{ trigger.payload.prev_state }}" + new_state: "{{ trigger.payload.new_state }}" + error_context: "{{ trigger.payload.error_context }}" + # Channel routing — configure at least one channel. + channels: + - in_app + # Uncomment and fill in to enable additional channels: + # email_recipients: + # - ops@example.com + # slack_webhook_url: "{{ env.SLACK_OPS_WEBHOOK }}" + # webhook_url: "{{ env.PAGERDUTY_EVENTS_URL }}" + + - id: log_state_change + name: "Log state change to knowledge base" + type: knowledge_write + depends_on: + - notify_service_failure + payload: + title: "Service state change: {{ trigger.payload.service }}" + body: > + Host {{ trigger.payload.hostname }} reported service + {{ trigger.payload.service }} transitioned from + {{ trigger.payload.prev_state }} to {{ trigger.payload.new_state }}. + Error context: {{ trigger.payload.error_context }} + tags: + - service-health + - "host:{{ trigger.payload.hostname }}" + - "service:{{ trigger.payload.service }}" diff --git a/autobot-slm-backend/slm/agent/health_collector.py b/autobot-slm-backend/slm/agent/health_collector.py index 10cf503b8..50ecb8bdc 100644 --- a/autobot-slm-backend/slm/agent/health_collector.py +++ b/autobot-slm-backend/slm/agent/health_collector.py @@ -5,8 +5,10 @@ Health Collector for SLM Agent Collects system and service health metrics for reporting to admin. +Publishes state-change events to Redis pub/sub (#3404). """ +import json import logging import os import platform @@ -19,6 +21,8 @@ logger = logging.getLogger(__name__) +_STATE_CHANGE_CHANNEL_TEMPLATE = "autobot:services:{service}:state_change" + class HealthCollector: """ @@ -49,6 +53,9 @@ def __init__( self.ports = ports or [] self.hostname = platform.node() self.discover_services = discover_services + # Tracks the last known status per service name for state-change detection. + # Populated on first collect(); events are only published on transitions. + self._last_known_status: Dict[str, str] = {} def collect(self) -> Dict: """Collect all health metrics.""" @@ -158,6 +165,7 @@ def discover_all_services(self) -> List[Dict]: except Exception as e: logger.warning("Error discovering services: %s", e) + self._detect_and_publish_state_changes(services) return services def _run_systemctl_list_units(self) -> Optional[str]: @@ -287,6 +295,85 @@ def _get_error_context(self, service_name: str, lines: int = 5) -> str: logger.debug("Could not get error context for %s: %s", service_name, e) return "" + def _publish_state_change( + self, + service_name: str, + prev_state: str, + new_state: str, + error_context: str, + ) -> None: + """Publish a service state-change event to Redis pub/sub. + + Channel: autobot:services:{service_name}:state_change + Payload keys: service, hostname, prev_state, new_state, error_context. + + Failure is logged at WARNING level and never propagates — a Redis + outage must not interrupt health collection (#3404). + """ + try: + from autobot_shared.redis_client import get_redis_client + + client = get_redis_client(database="main") + if client is None: + logger.warning( + "Redis unavailable — state-change event not published " + "(service=%s %s->%s)", + service_name, + prev_state, + new_state, + ) + return + channel = _STATE_CHANGE_CHANNEL_TEMPLATE.format(service=service_name) + payload = json.dumps( + { + "service": service_name, + "hostname": self.hostname, + "prev_state": prev_state, + "new_state": new_state, + "error_context": error_context, + } + ) + client.publish(channel, payload) + logger.info( + "Published state-change event: service=%s %s->%s", + service_name, + prev_state, + new_state, + ) + except Exception as exc: + logger.warning( + "Failed to publish state-change event for %s: %s", service_name, exc + ) + + def _detect_and_publish_state_changes( + self, services: List[Dict] + ) -> None: + """Compare discovered service statuses against last known state. + + Publishes a Redis pub/sub event for each service whose status has + changed since the previous call. Updates ``_last_known_status`` so + only real transitions trigger events (#3404). + """ + for svc in services: + name = svc.get("name") + new_state = svc.get("status", "unknown") + if name is None: + continue + prev_state = self._last_known_status.get(name) + if prev_state is None: + # First observation — record state but do not emit an event. + self._last_known_status[name] = new_state + continue + if prev_state == new_state: + continue + self._last_known_status[name] = new_state + self._publish_state_change( + service_name=name, + prev_state=prev_state, + new_state=new_state, + error_context=svc.get("error_message", ""), + ) + def is_healthy(self, thresholds: Optional[Dict] = None) -> bool: """Quick health check against thresholds.""" defaults = { diff --git a/autobot-slm-backend/slm/agent/health_collector_state_change_test.py b/autobot-slm-backend/slm/agent/health_collector_state_change_test.py new file mode 100644 index 000000000..0b7df8fdc --- /dev/null +++ b/autobot-slm-backend/slm/agent/health_collector_state_change_test.py @@ -0,0 +1,192 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +"""Unit tests for HealthCollector state-change pub/sub logic (#3404).""" + +import json +from unittest.mock import MagicMock, call, patch + +import pytest + +from slm.agent.health_collector import HealthCollector, _STATE_CHANGE_CHANNEL_TEMPLATE + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_collector() -> HealthCollector: + """Return a HealthCollector with service discovery disabled.""" + return HealthCollector(discover_services=False) + + +def _svc(name: str, status: str, error_message: str = "") -> dict: + base = {"name": name, "status": status} + if error_message: + base["error_message"] = error_message + return base + + +# --------------------------------------------------------------------------- +# _detect_and_publish_state_changes +# --------------------------------------------------------------------------- + + +class TestDetectAndPublishStateChanges: + def test_first_observation_does_not_publish(self): + collector = _make_collector() + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes([_svc("nginx", "running")]) + mock_pub.assert_not_called() + + def test_first_observation_stores_state(self): + collector = _make_collector() + collector._detect_and_publish_state_changes([_svc("nginx", "running")]) + assert collector._last_known_status["nginx"] == "running" + + def test_same_state_does_not_publish(self): + collector = _make_collector() + collector._last_known_status["nginx"] = "running" + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes([_svc("nginx", "running")]) + mock_pub.assert_not_called() + + def test_state_change_publishes_event(self): + collector = _make_collector() + collector._last_known_status["nginx"] = "running" + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes([_svc("nginx", "failed")]) + mock_pub.assert_called_once_with( + service_name="nginx", + prev_state="running", + new_state="failed", + error_context="", + ) + + def test_error_context_forwarded_on_failure(self): + collector = _make_collector() + collector._last_known_status["redis"] = "running" + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes( + [_svc("redis", "failed", error_message="OOM killed")] + ) + _, kwargs = mock_pub.call_args + assert kwargs["error_context"] == "OOM killed" + + def test_updates_last_known_status_on_change(self): + collector = _make_collector() + collector._last_known_status["nginx"] = "running" + collector._detect_and_publish_state_changes([_svc("nginx", "failed")]) + assert collector._last_known_status["nginx"] == "failed" + + def test_service_without_name_is_skipped(self): + collector = _make_collector() + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes([{"status": "failed"}]) + mock_pub.assert_not_called() + + def test_multiple_services_each_evaluated_independently(self): + collector = _make_collector() + collector._last_known_status["nginx"] = "running" + collector._last_known_status["sshd"] = "running" + with patch.object(collector, "_publish_state_change") as mock_pub: + collector._detect_and_publish_state_changes( + [_svc("nginx", "failed"), _svc("sshd", "running")] + ) + assert mock_pub.call_count == 1 + args = mock_pub.call_args + assert args.kwargs["service_name"] == "nginx" + + +# --------------------------------------------------------------------------- +# _publish_state_change +# --------------------------------------------------------------------------- + + +class TestPublishStateChange: + def _mock_redis(self): + mock = MagicMock() + mock.publish = MagicMock(return_value=1) + return mock + + def test_publishes_to_correct_channel(self): + collector = _make_collector() + mock_redis = self._mock_redis() + with patch( + "slm.agent.health_collector.get_redis_client", return_value=mock_redis + ): + collector._publish_state_change("nginx", "running", "failed", "") + expected_channel = _STATE_CHANGE_CHANNEL_TEMPLATE.format(service="nginx") + call_args = mock_redis.publish.call_args[0] + assert call_args[0] == expected_channel + + def test_payload_contains_required_fields(self): + collector = _make_collector() + collector.hostname = "test-host" + mock_redis = self._mock_redis() + captured = {} + + def _capture_publish(channel, payload): + captured["payload"] = json.loads(payload) + + mock_redis.publish.side_effect = _capture_publish + with patch( + "slm.agent.health_collector.get_redis_client", return_value=mock_redis + ): + collector._publish_state_change("nginx", "running", "failed", "segfault") + + p = captured["payload"] + assert p["service"] == "nginx" + assert p["hostname"] == "test-host" + assert p["prev_state"] == "running" + assert p["new_state"] == "failed" + assert p["error_context"] == "segfault" + + def test_redis_unavailable_does_not_raise(self): + collector = _make_collector() + with patch( + "slm.agent.health_collector.get_redis_client", return_value=None + ): + # Must not propagate any exception. + collector._publish_state_change("nginx", "running", "failed", "") + + def test_redis_exception_does_not_raise(self): + collector = _make_collector() + mock_redis = self._mock_redis() + mock_redis.publish.side_effect = ConnectionError("redis gone") + with patch( + "slm.agent.health_collector.get_redis_client", return_value=mock_redis + ): + collector._publish_state_change("nginx", "running", "failed", "") + + +# --------------------------------------------------------------------------- +# NotificationEvent.SERVICE_FAILED round-trip (import smoke test) +# --------------------------------------------------------------------------- + + +class TestServiceFailedEvent: + def test_service_failed_enum_value(self): + from services.notification_service import NotificationEvent + + assert NotificationEvent.SERVICE_FAILED.value == "service_failure" + + def test_service_failed_template_renders(self): + from services.notification_service import NotificationEvent, NotificationService + + svc = NotificationService() + result = svc.render_template( + NotificationEvent.SERVICE_FAILED.value, + { + "service": "nginx", + "hostname": "node-01", + "prev_state": "running", + "new_state": "failed", + "error_context": "OOM killed", + }, + ) + assert "nginx" in result + assert "node-01" in result + assert "running" in result + assert "failed" in result diff --git a/docs/examples/service_failure_monitoring.py b/docs/examples/service_failure_monitoring.py new file mode 100644 index 000000000..8f6624811 --- /dev/null +++ b/docs/examples/service_failure_monitoring.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Example: Service Failure Monitoring via Redis Pub/Sub (#3404) + +Demonstrates how to subscribe to HealthCollector state-change events +and react to systemd service failures in real time. + +The SLM HealthCollector publishes to: + autobot:services:{service_name}:state_change + +each time a monitored service transitions between states (e.g. running -> +failed). This script shows a standalone monitoring loop that: + +1. Subscribes to all service state-change channels using a glob pattern. +2. Parses each message and logs the transition. +3. Sends an in-app notification via NotificationService for failure states. + +Run this script directly to verify the integration end-to-end in a +development environment where Redis is available. +""" + +import asyncio +import json +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s — %(message)s", + stream=sys.stdout, +) +logger = logging.getLogger(__name__) + +# States that should trigger an alert. +_ALERT_STATES = frozenset({"failed", "crash-loop"}) + +# Pub/sub pattern that matches every service on every host. +_SUBSCRIPTION_PATTERN = "autobot:services:*:state_change" + + +async def _send_failure_notification(payload: dict) -> None: + """Send an in-app SERVICE_FAILED notification for the given payload.""" + try: + from services.notification_service import ( + NotificationChannel, + NotificationConfig, + NotificationEvent, + NotificationService, + ) + + config = NotificationConfig( + workflow_id=f"svc-monitor:{payload['service']}", + channels={ + NotificationEvent.SERVICE_FAILED.value: [ + NotificationChannel.IN_APP.value, + ] + }, + # Replace with the operator's user ID or fetch from config. + user_id="admin", + ) + svc = NotificationService() + await svc.send( + event=NotificationEvent.SERVICE_FAILED, + workflow_id=config.workflow_id, + payload=payload, + config=config, + ) + logger.info( + "Notification sent for service=%s state=%s", + payload.get("service"), + payload.get("new_state"), + ) + except Exception as exc: + logger.error("Failed to send notification: %s", exc) + + +async def monitor_service_health() -> None: + """Subscribe to HealthCollector state-change events and react to failures.""" + from autobot_shared.redis_client import get_redis_client + + redis = await get_redis_client(async_client=True, database="main") + if redis is None: + logger.error("Could not connect to Redis — aborting monitor loop.") + return + + pubsub = redis.pubsub() + await pubsub.psubscribe(_SUBSCRIPTION_PATTERN) + logger.info("Subscribed to pattern: %s", _SUBSCRIPTION_PATTERN) + + async for message in pubsub.listen(): + if message["type"] != "pmessage": + continue + + raw_data = message.get("data", b"") + try: + payload = json.loads(raw_data) + except (json.JSONDecodeError, TypeError) as exc: + logger.warning("Skipping malformed message on %s: %s", message["channel"], exc) + continue + + service = payload.get("service", "") + hostname = payload.get("hostname", "") + prev_state = payload.get("prev_state", "") + new_state = payload.get("new_state", "") + error_context = payload.get("error_context", "") + + logger.info( + "State change: host=%s service=%s %s -> %s", + hostname, + service, + prev_state, + new_state, + ) + + if new_state in _ALERT_STATES: + logger.warning( + "ALERT: service=%s entered state=%s on host=%s — %s", + service, + new_state, + hostname, + error_context or "(no error context)", + ) + await _send_failure_notification(payload) + + +def main() -> None: + """Entry point for running the monitor loop.""" + try: + asyncio.run(monitor_service_health()) + except KeyboardInterrupt: + logger.info("Monitor stopped by operator.") + + +if __name__ == "__main__": + main() diff --git a/docs/user/guides/workflows.md b/docs/user/guides/workflows.md index 425e9a7ce..370a4e663 100644 --- a/docs/user/guides/workflows.md +++ b/docs/user/guides/workflows.md @@ -133,6 +133,69 @@ reaches an approval gate: - Check the Overview page regularly to catch failed workflows early. - Use descriptive names for your workflows so they are easy to find later. +## Monitor a Linux Service + +AutoBot can alert you the moment a systemd service on any managed node +changes state. The SLM HealthCollector polls every node's systemd unit list +and publishes a Redis pub/sub event whenever a service transitions between +states (for example, `running` to `failed`). + +### How it works + +1. The SLM agent running on each node calls `HealthCollector.discover_all_services()` + on every health-check cycle. +2. When a service's state changes from the previous cycle, the agent publishes + to the Redis channel: + + ```text + autobot:services:{service_name}:state_change + ``` + + The message payload includes the service name, the originating hostname, + the previous and new state, and any recent journal error context. + +3. A workflow subscribed to that channel receives the event and can dispatch + notifications, log the incident, or trigger a remediation workflow. + +### Using the built-in template + +A ready-made template is provided at +`autobot-backend/workflow_templates/service_health_monitor.yaml`. + +To use it: + +1. Click **Templates** in the Workflow Automation sidebar. +2. Select **Service Health Monitor**. +3. Click **Use Template**. +4. Optionally restrict the trigger channel to a specific service + (change `autobot:services:*:state_change` to + `autobot:services:my-service:state_change`). +5. Configure at least one notification channel (in-app, email, Slack, or + webhook) in the **Send service-failure notification** step. +6. Save and enable the workflow. + +### Notification event type + +The `SERVICE_FAILED` notification event type is available in +`NotificationEvent.SERVICE_FAILED` (`"service_failure"`). The default +message template is: + +```text +Service '{service}' on '{hostname}' transitioned {prev_state} -> {new_state}. {error_context} +``` + +You can override this template in the workflow's notification step +configuration using Python `string.Template` syntax. + +### Developer reference + +- See `docs/examples/service_failure_monitoring.py` for a standalone Python + script that subscribes to the state-change channel and sends notifications. +- The `HealthCollector` class lives in + `autobot-slm-backend/slm/agent/health_collector.py`. +- `NotificationEvent.SERVICE_FAILED` and its default template are defined in + `autobot-backend/services/notification_service.py`. + ## Related Guides - [Working with Agents](working-with-agents.md) -- agents power many workflow From dc02d70b54a9339da02e1c51397a597af0218411 Mon Sep 17 00:00:00 2001 From: mrveiss Date: Fri, 3 Apr 2026 23:41:20 +0300 Subject: [PATCH 2/2] fix(monitoring): guard state-change publish against partial service list; fix SERVICE_FAILED enum value - Early-return on TimeoutExpired/FileNotFoundError/Exception before calling _detect_and_publish_state_changes to prevent false transitions on truncated lists - SERVICE_FAILED value corrected from "service_failure" to "service_failed" to match name/value convention (WORKFLOW_FAILED="workflow_failed", STEP_FAILED="step_failed") - Updated test assertion and workflow template event key to match Co-Authored-By: Claude Sonnet 4.6 --- autobot-backend/services/notification_service.py | 2 +- autobot-backend/workflow_templates/service_health_monitor.yaml | 2 +- autobot-slm-backend/slm/agent/health_collector.py | 3 +++ .../slm/agent/health_collector_state_change_test.py | 2 +- docs/user/guides/workflows.md | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/autobot-backend/services/notification_service.py b/autobot-backend/services/notification_service.py index cdca6efee..9aa478ef7 100644 --- a/autobot-backend/services/notification_service.py +++ b/autobot-backend/services/notification_service.py @@ -70,7 +70,7 @@ class NotificationEvent(str, Enum): WORKFLOW_FAILED = "workflow_failed" STEP_FAILED = "step_failed" APPROVAL_NEEDED = "approval_needed" - SERVICE_FAILED = "service_failure" + SERVICE_FAILED = "service_failed" # --------------------------------------------------------------------------- diff --git a/autobot-backend/workflow_templates/service_health_monitor.yaml b/autobot-backend/workflow_templates/service_health_monitor.yaml index 6f3950617..6a7f5c1f2 100644 --- a/autobot-backend/workflow_templates/service_health_monitor.yaml +++ b/autobot-backend/workflow_templates/service_health_monitor.yaml @@ -42,7 +42,7 @@ steps: - id: notify_service_failure name: "Send service-failure notification" type: notification - event: service_failure + event: service_failed # Map pub/sub payload fields to notification template variables. # The SERVICE_FAILED default template uses: service, hostname, # prev_state, new_state, error_context. diff --git a/autobot-slm-backend/slm/agent/health_collector.py b/autobot-slm-backend/slm/agent/health_collector.py index 50ecb8bdc..66e8eaeee 100644 --- a/autobot-slm-backend/slm/agent/health_collector.py +++ b/autobot-slm-backend/slm/agent/health_collector.py @@ -160,10 +160,13 @@ def discover_all_services(self) -> List[Dict]: except subprocess.TimeoutExpired: logger.warning("Timeout discovering services") + return services except FileNotFoundError: logger.warning("systemctl not found - not a systemd system") + return services except Exception as e: logger.warning("Error discovering services: %s", e) + return services self._detect_and_publish_state_changes(services) return services diff --git a/autobot-slm-backend/slm/agent/health_collector_state_change_test.py b/autobot-slm-backend/slm/agent/health_collector_state_change_test.py index 0b7df8fdc..2a15e79f0 100644 --- a/autobot-slm-backend/slm/agent/health_collector_state_change_test.py +++ b/autobot-slm-backend/slm/agent/health_collector_state_change_test.py @@ -170,7 +170,7 @@ class TestServiceFailedEvent: def test_service_failed_enum_value(self): from services.notification_service import NotificationEvent - assert NotificationEvent.SERVICE_FAILED.value == "service_failure" + assert NotificationEvent.SERVICE_FAILED.value == "service_failed" def test_service_failed_template_renders(self): from services.notification_service import NotificationEvent, NotificationService diff --git a/docs/user/guides/workflows.md b/docs/user/guides/workflows.md index 370a4e663..ab2a3e0b9 100644 --- a/docs/user/guides/workflows.md +++ b/docs/user/guides/workflows.md @@ -177,7 +177,7 @@ To use it: ### Notification event type The `SERVICE_FAILED` notification event type is available in -`NotificationEvent.SERVICE_FAILED` (`"service_failure"`). The default +`NotificationEvent.SERVICE_FAILED` (`"service_failed"`). The default message template is: ```text