Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Ares - Autonomous Security Operations Agent

<!-- BEGIN_AUTO_BADGES -->
<div align="center">

[![Pre-Commit](https://github.com/dreadnode/python-template/actions/workflows/pre-commit.yaml/badge.svg)](https://github.com/dreadnode/python-template/actions/workflows/pre-commit.yaml)
[![Renovate](https://github.com/dreadnode/python-template/actions/workflows/renovate.yaml/badge.svg)](https://github.com/dreadnode/python-template/actions/workflows/renovate.yaml)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

</div>
<!-- END_AUTO_BADGES -->

[![Tests](https://github.com/dreadnode/ares/actions/workflows/tests.yaml/badge.svg)](https://github.com/dreadnode/ares/actions/workflows/tests.yaml)
[![Coverage](https://raw.githubusercontent.com/dreadnode/ares/main/.github/badges/coverage.svg)](https://github.com/dreadnode/ares/actions/workflows/coverage-badge.yaml)
[![Pre-Commit](https://github.com/dreadnode/ares/actions/workflows/pre-commit.yaml/badge.svg)](https://github.com/dreadnode/ares/actions/workflows/pre-commit.yaml)
Expand Down
144 changes: 88 additions & 56 deletions src/ares/core/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,63 @@
"""Data models for Ares SOC Investigation Agent."""
"""Data models for Ares SOC Investigation Agent.

This module provides structured data models for SOC investigations and red team operations,
built on rigging's Model class for automatic serialization and LLM output parsing.

Example usage for LLM output parsing:
>>> from ares.core.models import Evidence, parse, parse_set
>>> # Parse a single Evidence from LLM response text
>>> evidence, _ = parse(llm_response, Evidence)
>>> # Parse multiple Evidence items
>>> items = [e for e, _ in parse_set(llm_response, Evidence)]

"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum, IntEnum
from typing import Any

from pydantic import Field, computed_field
from rigging import Model
from rigging.model import element, wrapped

# Re-export rigging parsing utilities for convenient access
from rigging.parsing import (
parse,
parse_many,
parse_set,
try_parse,
try_parse_many,
try_parse_set,
)

__all__ = [
"Credential",
"Evidence",
"Hash",
"Host",
"InvestigationStage",
"InvestigationState",
"InvestigativeQuestion",
"Model",
"PyramidLevel",
"QuestionSource",
"QuestionState",
"RedTeamState",
"Share",
"Target",
"TimelineEvent",
"User",
"parse",
"parse_many",
"parse_set",
"try_parse",
"try_parse_many",
"try_parse_set",
]


class PyramidLevel(IntEnum):
"""Levels of the Pyramid of Pain.
Expand Down Expand Up @@ -57,8 +110,7 @@ class InvestigationStage(Enum):
SYNTHESIS = "synthesis" # Generate report


@dataclass
class Evidence:
class Evidence(Model):
"""A piece of evidence discovered during investigation.

Attributes:
Expand All @@ -82,30 +134,18 @@ class Evidence:
source: str
timestamp: datetime | None
pyramid_level: PyramidLevel
mitre_techniques: list[str] = field(default_factory=list)
mitre_techniques: list[str] = wrapped("mitre-techniques", element(tag="technique", default=[]))
confidence: float = 0.5
metadata: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, str] = Field(default_factory=dict)
source_query_id: str | None = None
validated: bool = False

def to_dict(self) -> dict:
return {
"id": self.id,
"type": self.type,
"value": self.value,
"source": self.source,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"pyramid_level": self.pyramid_level.value,
"mitre_techniques": self.mitre_techniques,
"confidence": self.confidence,
"metadata": self.metadata,
"source_query_id": self.source_query_id,
"validated": self.validated,
}
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for storage (backward compatible)."""
return self.model_dump(mode="json")


@dataclass
class TimelineEvent:
class TimelineEvent(Model):
"""An event in the investigation timeline.

Attributes:
Expand All @@ -121,25 +161,17 @@ class TimelineEvent:
id: str
timestamp: datetime
description: str
evidence_ids: list[str] = field(default_factory=list)
mitre_techniques: list[str] = field(default_factory=list)
evidence_ids: list[str] = wrapped("evidence-ids", element(tag="evidence-id", default=[]))
mitre_techniques: list[str] = wrapped("mitre-techniques", element(tag="technique", default=[]))
confidence: float = 0.5
source: str = "investigation"

def to_dict(self) -> dict:
return {
"id": self.id,
"timestamp": self.timestamp.isoformat(),
"description": self.description,
"evidence_ids": self.evidence_ids,
"mitre_techniques": self.mitre_techniques,
"confidence": self.confidence,
"source": self.source,
}
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for storage (backward compatible)."""
return self.model_dump(mode="json")


@dataclass
class InvestigativeQuestion:
class InvestigativeQuestion(Model):
"""A question that drives the investigation forward.

Generated by the MITRE Navigator and Pyramid Climber engines.
Expand Down Expand Up @@ -185,19 +217,23 @@ class InvestigativeQuestion:
urgency_score: float = 0.0

state: QuestionState = QuestionState.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
answered_at: datetime | None = None

generated_from_evidence_ids: list[str] = field(default_factory=list)
generated_from_evidence_ids: list[str] = wrapped(
"generated-from-evidence-ids", element(tag="evidence-id", default=[])
)
generated_from_question_id: str | None = None

answer_evidence_ids: list[str] = field(default_factory=list)
answer_evidence_ids: list[str] = wrapped(
"answer-evidence-ids", element(tag="evidence-id", default=[])
)
answer_summary: str | None = None

@computed_field # type: ignore[prop-decorator]
@property
def priority_score(self) -> float:
"""
Composite priority score.
"""Composite priority score.

Weights:
- Pyramid elevation: 3x (we want TTPs, not IOCs)
Expand All @@ -212,14 +248,16 @@ def priority_score(self) -> float:
+ (self.urgency_score * 1.0)
)

def can_parallelize_with(self, other: "InvestigativeQuestion") -> bool:
def can_parallelize_with(self, other: InvestigativeQuestion) -> bool:
"""Check if this question can run in parallel with another."""
# Questions in a reasoning chain should be sequential
if self.generated_from_question_id == other.id:
return False
return other.generated_from_question_id != self.id

def to_dict(self) -> dict:
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for storage (backward compatible)."""
# Use custom format to match original API
return {
"id": self.id,
"question": self.text,
Expand Down Expand Up @@ -341,28 +379,25 @@ def to_summary(self) -> dict:


# Red Team Models
@dataclass
class Target:
class Target(Model):
"""Primary target information."""

ip: str
hostname: str = ""
domain: str = ""


@dataclass
class Host:
class Host(Model):
"""Discovered host information."""

ip: str
hostname: str = ""
os: str = ""
roles: list[str] = field(default_factory=list)
services: list[str] = field(default_factory=list)
roles: list[str] = wrapped("roles", element(tag="role", default=[]))
services: list[str] = wrapped("services", element(tag="service", default=[]))


@dataclass
class User:
class User(Model):
"""Discovered user account."""

username: str
Expand All @@ -371,8 +406,7 @@ class User:
is_admin: bool = False


@dataclass
class Credential:
class Credential(Model):
"""Discovered credential."""

username: str
Expand All @@ -382,8 +416,7 @@ class Credential:
is_admin: bool = False


@dataclass
class Hash:
class Hash(Model):
"""Discovered password hash."""

username: str
Expand All @@ -393,8 +426,7 @@ class Hash:
cracked_password: str = ""


@dataclass
class Share:
class Share(Model):
"""Discovered SMB share."""

host: str
Expand Down
Loading