Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions autobot-backend/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class NotificationEvent(str, Enum):
WORKFLOW_FAILED = "workflow_failed"
STEP_FAILED = "step_failed"
APPROVAL_NEEDED = "approval_needed"
SERVICE_FAILED = "service_failed"


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -119,6 +120,9 @@ class NotificationConfig:
NotificationEvent.APPROVAL_NEEDED: (
"Workflow '$workflow_id' is waiting for approval at step '$step_name'."
),
NotificationEvent.SERVICE_FAILED: (
"Service '$service' on '$hostname' transitioned $prev_state -> $new_state. $error_context"
),
}


Expand Down
79 changes: 79 additions & 0 deletions autobot-backend/workflow_templates/service_health_monitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# AutoBot - AI-Powered Automation Platform
# Copyright (c) 2025 mrveiss
# Author: mrveiss
#
# Workflow Template: Service Health Monitor (#3404)
#
# Triggered by Redis pub/sub events emitted by HealthCollector when a
# systemd service changes state. Use this template to create an
# automated workflow that notifies operators the moment a service fails
# or recovers.

metadata:
id: service_health_monitor
name: "Service Health Monitor"
description: >
Listens for systemd service state-change events published by the SLM
HealthCollector and dispatches a SERVICE_FAILED notification through
the AutoBot notification pipeline.
version: "1.0.0"
category: monitoring
tags:
- systemd
- health
- alerting

trigger:
type: REDIS_PUBSUB
# Glob pattern — matches any service on any managed node.
# To restrict to a specific service, replace * with the service name,
# e.g. autobot:services:autobot-backend:state_change
channel: "autobot:services:*:state_change"
# Only fire when the service has entered a failure-like state.
# Remove this filter block to react to all transitions (including recovery).
filter:
field: new_state
operator: in
values:
- failed
- crash-loop

steps:
- id: notify_service_failure
name: "Send service-failure notification"
type: notification
event: service_failed
# Map pub/sub payload fields to notification template variables.
# The SERVICE_FAILED default template uses: service, hostname,
# prev_state, new_state, error_context.
payload_mapping:
service: "{{ trigger.payload.service }}"
hostname: "{{ trigger.payload.hostname }}"
prev_state: "{{ trigger.payload.prev_state }}"
new_state: "{{ trigger.payload.new_state }}"
error_context: "{{ trigger.payload.error_context }}"
# Channel routing — configure at least one channel.
channels:
- in_app
# Uncomment and fill in to enable additional channels:
# email_recipients:
# - ops@example.com
# slack_webhook_url: "{{ env.SLACK_OPS_WEBHOOK }}"
# webhook_url: "{{ env.PAGERDUTY_EVENTS_URL }}"

- id: log_state_change
name: "Log state change to knowledge base"
type: knowledge_write
depends_on:
- notify_service_failure
payload:
title: "Service state change: {{ trigger.payload.service }}"
body: >
Host {{ trigger.payload.hostname }} reported service
{{ trigger.payload.service }} transitioned from
{{ trigger.payload.prev_state }} to {{ trigger.payload.new_state }}.
Error context: {{ trigger.payload.error_context }}
tags:
- service-health
- "host:{{ trigger.payload.hostname }}"
- "service:{{ trigger.payload.service }}"
90 changes: 90 additions & 0 deletions autobot-slm-backend/slm/agent/health_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
Health Collector for SLM Agent

Collects system and service health metrics for reporting to admin.
Publishes state-change events to Redis pub/sub (#3404).
"""

import json
import logging
import os
import platform
Expand All @@ -19,6 +21,8 @@

logger = logging.getLogger(__name__)

_STATE_CHANGE_CHANNEL_TEMPLATE = "autobot:services:{service}:state_change"


class HealthCollector:
"""
Expand Down Expand Up @@ -49,6 +53,9 @@ def __init__(
self.ports = ports or []
self.hostname = platform.node()
self.discover_services = discover_services
# Tracks the last known status per service name for state-change detection.
# Populated on first collect(); events are only published on transitions.
self._last_known_status: Dict[str, str] = {}

def collect(self) -> Dict:
"""Collect all health metrics."""
Expand Down Expand Up @@ -153,11 +160,15 @@ def discover_all_services(self) -> List[Dict]:

except subprocess.TimeoutExpired:
logger.warning("Timeout discovering services")
return services
except FileNotFoundError:
logger.warning("systemctl not found - not a systemd system")
return services
except Exception as e:
logger.warning("Error discovering services: %s", e)
return services

self._detect_and_publish_state_changes(services)
return services

def _run_systemctl_list_units(self) -> Optional[str]:
Expand Down Expand Up @@ -287,6 +298,85 @@ def _get_error_context(self, service_name: str, lines: int = 5) -> str:
logger.debug("Could not get error context for %s: %s", service_name, e)
return ""

def _publish_state_change(
self,
service_name: str,
prev_state: str,
new_state: str,
error_context: str,
) -> None:
"""Publish a service state-change event to Redis pub/sub.

Channel: autobot:services:{service_name}:state_change
Payload keys: service, hostname, prev_state, new_state, error_context.

Failure is logged at WARNING level and never propagates — a Redis
outage must not interrupt health collection (#3404).
"""
try:
from autobot_shared.redis_client import get_redis_client

client = get_redis_client(database="main")
if client is None:
logger.warning(
"Redis unavailable — state-change event not published "
"(service=%s %s->%s)",
service_name,
prev_state,
new_state,
)
return
channel = _STATE_CHANGE_CHANNEL_TEMPLATE.format(service=service_name)
payload = json.dumps(
{
"service": service_name,
"hostname": self.hostname,
"prev_state": prev_state,
"new_state": new_state,
"error_context": error_context,
}
)
client.publish(channel, payload)
logger.info(
"Published state-change event: service=%s %s->%s",
service_name,
prev_state,
new_state,
)
except Exception as exc:
logger.warning(
"Failed to publish state-change event for %s: %s", service_name, exc
)

def _detect_and_publish_state_changes(
self, services: List[Dict]
) -> None:
"""Compare discovered service statuses against last known state.

Publishes a Redis pub/sub event for each service whose status has
changed since the previous call. Updates ``_last_known_status`` so
only real transitions trigger events (#3404).
"""
for svc in services:
name = svc.get("name")
new_state = svc.get("status", "unknown")
if name is None:
continue
prev_state = self._last_known_status.get(name)
if prev_state is None:
# First observation — record state but do not emit an event.
self._last_known_status[name] = new_state
continue
if prev_state == new_state:
continue
self._last_known_status[name] = new_state
self._publish_state_change(
service_name=name,
prev_state=prev_state,
new_state=new_state,
error_context=svc.get("error_message", ""),
)

def is_healthy(self, thresholds: Optional[Dict] = None) -> bool:
"""Quick health check against thresholds."""
defaults = {
Expand Down
Loading
Loading