Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions keep/providers/nagios_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .nagios_provider import NagiosProvider as NagiosProvider
104 changes: 104 additions & 0 deletions keep/providers/nagios_provider/nagios_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Nagios Provider is a class that provides a way to receive alerts from Nagios using webhooks.
"""
import dataclasses
import pydantic
from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig, ProviderScope


@pydantic.dataclasses.dataclass
class NagiosProviderAuthConfig:
"""
Authentication configuration for Nagios.
"""
host_url: pydantic.AnyHttpUrl = dataclasses.field(
metadata={
"required": True,
"description": "Nagios Host URL",
"hint": "e.g. https://nagios.example.com",
"sensitive": False,
}
)
api_key: str = dataclasses.field(
metadata={
"required": False,
"description": "API Key for Nagios (optional)",
"sensitive": True,
}
)


class NagiosProvider(BaseProvider):
"""
Get alerts from Nagios into Keep via webhooks.
"""
webhook_description = ""
webhook_template = ""
webhook_markdown = """
To send alerts from Nagios to Keep:

1. Install the notification script from Keep docs
2. Set webhook URL: {keep_webhook_api_url}
3. Add header "X-API-KEY" with your Keep API key
4. Configure Nagios notification commands
"""

PROVIDER_DISPLAY_NAME = "Nagios"
PROVIDER_TAGS = ["alert"]
PROVIDER_CATEGORY = ["Monitoring"]
WEBHOOK_INSTALLATION_REQUIRED = True
PROVIDER_ICON = "nagios-icon.png"

PROVIDER_SCOPES = [
ProviderScope(name="read_alerts", description="Read alerts from Nagios"),
]

STATUS_MAP = {
"OK": AlertStatus.RESOLVED,
"WARNING": AlertStatus.FIRING,
"CRITICAL": AlertStatus.FIRING,
"UNKNOWN": AlertStatus.FIRING,
"UP": AlertStatus.RESOLVED,
"DOWN": AlertStatus.FIRING,
}

SEVERITY_MAP = {
"OK": AlertSeverity.INFO,
"WARNING": AlertSeverity.WARNING,
"CRITICAL": AlertSeverity.CRITICAL,
"UNKNOWN": AlertSeverity.INFO,
"UP": AlertSeverity.INFO,
"DOWN": AlertSeverity.CRITICAL,
}

def __init__(self, context_manager: ContextManager, provider_id: str, config: ProviderConfig):
super().__init__(context_manager, provider_id, config)

def validate_config(self):
self.authentication_config = NagiosProviderAuthConfig(**self.config.authentication)

def validate_scopes(self):
return {"read_alerts": True}

@staticmethod
def _format_alert(event: dict, provider_instance: "BaseProvider" = None) -> AlertDto:
"""Format Nagios webhook payload into Keep alert format."""
host = event.get("host", "")
service = event.get("service", "")
state = event.get("state", "UNKNOWN")
output = event.get("output", "")

return AlertDto(
id=f"{host}:{service}" if service else host,
name=service or host,
status=NagiosProvider.STATUS_MAP.get(state, AlertStatus.FIRING),
severity=NagiosProvider.SEVERITY_MAP.get(state, AlertSeverity.INFO),
description=output,
source=["nagios"],
hostname=host,
service_name=service,
state=state,
)
1 change: 1 addition & 0 deletions keep/providers/solarwinds_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .solarwinds_provider import SolarwindsProvider as SolarwindsProvider
111 changes: 111 additions & 0 deletions keep/providers/solarwinds_provider/solarwinds_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
Solarwinds Provider is a class that provides a way to receive alerts from Solarwinds using webhooks.
"""
import dataclasses
import pydantic
from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig, ProviderScope


@pydantic.dataclasses.dataclass
class SolarwindsProviderAuthConfig:
"""
Authentication configuration for Solarwinds.
"""
host_url: pydantic.AnyHttpUrl = dataclasses.field(
metadata={
"required": True,
"description": "Solarwinds Orion Host URL",
"hint": "e.g. https://solarwinds.example.com",
"sensitive": False,
}
)
api_user: str = dataclasses.field(
metadata={
"required": True,
"description": "Solarwinds API User",
"sensitive": False,
}
)
api_password: str = dataclasses.field(
metadata={
"required": True,
"description": "Solarwinds API Password",
"sensitive": True,
}
)


class SolarwindsProvider(BaseProvider):
"""
Get alerts from Solarwinds Orion into Keep via webhooks.
"""
webhook_description = ""
webhook_template = ""
webhook_markdown = """
To send alerts from Solarwinds Orion to Keep:

1. Configure an alert action in Solarwinds Orion
2. Set webhook URL: {keep_webhook_api_url}
3. Add header "X-API-KEY" with your Keep API key
4. Configure alert triggers to send notifications
"""

PROVIDER_DISPLAY_NAME = "Solarwinds"
PROVIDER_TAGS = ["alert"]
PROVIDER_CATEGORY = ["Monitoring"]
WEBHOOK_INSTALLATION_REQUIRED = True
PROVIDER_ICON = "solarwinds-icon.png"

PROVIDER_SCOPES = [
ProviderScope(name="read_alerts", description="Read alerts from Solarwinds"),
]

# Solarwinds states Mapping to Keep alert states
STATUS_MAP = {
"Up": AlertStatus.RESOLVED,
"Down": AlertStatus.FIRING,
"Warning": AlertStatus.FIRING,
"Critical": AlertStatus.FIRING,
"Unmanaged": AlertStatus.RESOLVED,
"Unplugged": AlertStatus.FIRING,
}

SEVERITY_MAP = {
"Up": AlertSeverity.INFO,
"Down": AlertSeverity.CRITICAL,
"Warning": AlertSeverity.WARNING,
"Critical": AlertSeverity.CRITICAL,
"Unmanaged": AlertSeverity.INFO,
"Unplugged": AlertSeverity.WARNING,
}

def __init__(self, context_manager: ContextManager, provider_id: str, config: ProviderConfig):
super().__init__(context_manager, provider_id, config)

def validate_config(self):
self.authentication_config = SolarwindsProviderAuthConfig(**self.config.authentication)

def validate_scopes(self):
return {"read_alerts": True}

@staticmethod
def _format_alert(event: dict, provider_instance: "BaseProvider" = None) -> AlertDto:
"""Format Solarwinds webhook payload into Keep alert format."""
node_name = event.get("NodeName", "")
alert_name = event.get("AlertName", "")
status = event.get("Status", "Unknown")
message = event.get("Message", "")

return AlertDto(
id=f"{node_name}:{alert_name}",
name=alert_name or node_name,
status=SolarwindsProvider.STATUS_MAP.get(status, AlertStatus.FIRING),
severity=SolarwindsProvider.SEVERITY_MAP.get(status, AlertSeverity.INFO),
description=message,
source=["solarwinds"],
hostname=node_name,
status_text=status,
)
135 changes: 135 additions & 0 deletions tests/test_solarwinds_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Tests for SolarWinds Provider.

Tests the SolarWinds Orion webhook provider for receiving alerts.
"""
import pytest
from unittest.mock import MagicMock, patch

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.solarwinds_provider.solarwinds_provider import SolarwindsProvider
from keep.providers.models.provider_config import ProviderConfig


class TestSolarwindsProvider:
"""Test suite for SolarWinds Provider."""

@pytest.fixture
def solarwinds_config(self):
return ProviderConfig(
description="Test SolarWinds",
authentication={
"webhook_url": "https://solarwinds.example.com/webhook",
"api_key": "test-api-key",
},
)

@pytest.fixture
def solarwinds_provider(self, solarwinds_config):
cm = ContextManager(tenant_id="test", workflow_id="test")
return SolarwindsProvider(cm, "test-solarwinds", solarwinds_config)

def test_format_alert_up(self):
"""Test formatting a SolarWinds UP alert."""
event = {
"node": "server1",
"alert": "Node Status",
"status": "Up",
"message": "Node is responding",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.status == AlertStatus.RESOLVED
assert alert.severity == AlertSeverity.INFO
assert alert.name == "Node Status"

def test_format_alert_down(self):
"""Test formatting a SolarWinds DOWN alert."""
event = {
"node": "server1",
"alert": "Node Status",
"status": "Down",
"message": "Node is not responding",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.status == AlertStatus.FIRING
assert alert.severity == AlertSeverity.CRITICAL

def test_format_alert_warning(self):
"""Test formatting a SolarWinds WARNING alert."""
event = {
"node": "server1",
"alert": "CPU Usage",
"status": "Warning",
"message": "CPU usage above 80%",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.status == AlertStatus.FIRING
assert alert.severity == AlertSeverity.WARNING

def test_format_alert_critical(self):
"""Test formatting a SolarWinds CRITICAL alert."""
event = {
"node": "server2",
"alert": "Memory",
"status": "Critical",
"message": "Memory usage critical",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.status == AlertStatus.FIRING
assert alert.severity == AlertSeverity.CRITICAL

def test_format_alert_unknown(self):
"""Test formatting a SolarWinds UNKNOWN status."""
event = {
"node": "server1",
"alert": "Service Check",
"status": "Unknown",
"message": "Unable to determine status",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.status == AlertStatus.FIRING

def test_format_alert_with_node_only(self):
"""Test formatting alert with only node (no alert name)."""
event = {
"node": "router1",
"status": "Down",
"message": "Router unreachable",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.name == "router1"
assert alert.hostname == "router1"

def test_format_alert_with_timestamp(self):
"""Test formatting alert with timestamp."""
event = {
"node": "server1",
"alert": "Disk Space",
"status": "Warning",
"message": "Disk 90% full",
"timestamp": "2024-01-15T10:30:00Z",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.lastReceived is not None

def test_format_alert_with_url(self):
"""Test formatting alert with URL."""
event = {
"node": "server1",
"alert": "CPU Load",
"status": "Critical",
"message": "High CPU load",
"url": "https://solarwinds.example.com/Orion/NetPerfMon/NodeDetails.aspx?NetObject=N:1",
}
alert = SolarwindsProvider._format_alert(event)
assert alert.url is not None

def test_validate_scopes_success(self, solarwinds_provider):
"""Test validate_scopes returns True for valid config."""
scopes = solarwinds_provider.validate_scopes()
assert scopes["webhook"] is True

def test_validate_config(self, solarwinds_provider):
"""Test config validation."""
assert solarwinds_provider.authentication_config.webhook_url == "https://solarwinds.example.com/webhook"
assert solarwinds_provider.authentication_config.api_key == "test-api-key"
Loading