Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 265 additions & 0 deletions src/ares/core/factories/red_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@
from ares.core.templates import get_template_loader
from ares.integrations.mitre import MITREAttackClient
from ares.tools.red.network import (
ACLExploitTools,
BloodHoundTools,
CertipyTools,
CoercionTools,
CrackingTools,
CredentialHarvestingTools,
CVEExploitTools,
DelegationTools,
GoldenTicketTools,
LateralMovementTools,
MSSQLTools,
NetworkEnumerationTools,
RedTeamReportingTools,
SharePilferingTools,
TrustAttackTools,
)

# Load system instructions from template
Expand All @@ -42,12 +48,22 @@
_last_step_number: int | None = None
_DEBOUNCE_WINDOW = 0.1 # 100ms debounce window

# Periodic priority check state
_discovered_vulnerabilities: dict[str, dict] = {} # vuln_type -> {details, discovered_at_step}
_exploited_vulnerabilities: set[str] = set() # vuln_types that have been exploited
_last_priority_check_step: int = 0
_PRIORITY_CHECK_INTERVAL = 10 # Check every N steps


def reset_event_tracking():
"""Reset event tracking state for a new agent run."""
global _last_event_times, _last_step_number
global _discovered_vulnerabilities, _exploited_vulnerabilities, _last_priority_check_step
_last_event_times = {}
_last_step_number = None
_discovered_vulnerabilities = {}
_exploited_vulnerabilities = set()
_last_priority_check_step = 0


def _should_log_event(event_type: str) -> bool:
Expand Down Expand Up @@ -162,6 +178,225 @@ async def log_tool_result(event: ToolEnd):
)


async def vulnerability_discovery_hook(event: ToolEnd):
"""
Redirect agent to exploit when vulnerabilities are discovered.

This hook monitors tool results for vulnerability indicators and
injects feedback to force immediate exploitation.
"""
if not hasattr(event, "result") or not event.result:
return None

result = str(event.result)
tool_name = event.tool_call.name if hasattr(event, "tool_call") and event.tool_call else ""

redirects = []

# ADCS Vulnerabilities
esc1_indicators = "recommended_actions" in result or "ACTIONABLE" in result
if "ESC1" in result and (esc1_indicators or "exploitable" in result.lower()):
redirects.append(
"🚨 ESC1 ADCS VULNERABILITY FOUND!\n"
"→ IMMEDIATELY run certipy_req_esc1 with the CA name and template from above\n"
"→ Then run certipy_auth to get Administrator NTLM hash\n"
"→ DO NOT proceed to other tasks until ESC1 is exploited!"
)

if any(esc in result for esc in ["ESC2", "ESC3", "ESC4", "ESC6"]):
redirects.append(
"⚠️ ADCS vulnerability found (ESC2/3/4/6)!\n"
"→ Investigate this path - may require relay attack or additional enumeration"
)

# ACL Abuse Paths
has_acl_indicator = any(
acl in result.lower() for acl in ["genericall", "genericwrite", "writedacl"]
)
is_acl_discovery_tool = tool_name in ["run_bloodhound", "enumerate_users"]
if has_acl_indicator and is_acl_discovery_tool:
redirects.append(
"🎯 ACL ABUSE PATH FOUND!\n"
"→ Use pywhisker to add shadow credentials on the vulnerable account\n"
"→ OR use bloodyad_set_password to reset their password\n"
"→ OR use bloodyad_add_group_member to add yourself to privileged groups\n"
"→ DO NOT summarize until ACL path is exploited!"
)

# Delegation
is_delegation_context = "delegation" in result.lower() or tool_name == "find_delegation"
if "unconstrained" in result.lower() and is_delegation_context:
redirects.append(
"🔗 UNCONSTRAINED DELEGATION FOUND!\n"
"→ Use petitpotam or coercer to force DC authentication to this machine\n"
"→ Capture TGT and perform DCSync\n"
"→ This is a CRITICAL path to Domain Admin!"
)

# MSSQL
is_mssql_context = "mssql" in tool_name.lower() or "sql" in result.lower()
if "impersonate" in result.lower() and is_mssql_context:
redirects.append(
"💾 MSSQL IMPERSONATION POSSIBLE!\n"
"→ Use mssql_xp_cmdshell with impersonate='sa' to get command execution\n"
"→ Execute credential harvesting commands"
)

# Krbtgt hash
if "krbtgt" in result.lower() and (":::" in result or "hash" in result.lower()):
redirects.append(
"👑 KRBTGT HASH FOUND!\n"
"→ IMMEDIATELY use generate_golden_ticket to forge Enterprise Admin ticket\n"
"→ Then use secretsdump on ALL domain controllers\n"
"→ Use raise_child if there are parent domains"
)

# Admin hash
has_admin_indicator = "administrator" in result.lower() and (
":::" in result or "ntlm" in result.lower()
)
has_hash_pattern = "aad3b435" in result or result.count(":") >= 3
if has_admin_indicator and has_hash_pattern:
redirects.append(
"🔑 ADMINISTRATOR HASH FOUND!\n"
"→ Use domain_admin_checker with this hash on ALL targets\n"
"→ Use secretsdump with this hash on ALL targets\n"
"→ DO NOT summarize - credential pivoting required!"
)

if redirects:
logger.warning("[!] Vulnerability discovery hook triggered - injecting exploit guidance")
header = "\n\n" + "=" * 50 + "\n⚡ IMMEDIATE ACTION REQUIRED ⚡\n" + "=" * 50 + "\n"
return header + "\n\n".join(redirects)

return None


def _track_discovery(vuln_id: str, vuln_type: str, tool: str, step: int) -> None:
"""Helper to track a discovered vulnerability."""
if vuln_id not in _discovered_vulnerabilities:
_discovered_vulnerabilities[vuln_id] = {"type": vuln_type, "tool": tool, "step": step}
logger.info(f"[*] Tracking: {vuln_type} discovered")


def _track_exploitation(tool_name: str) -> None:
"""Helper to track exploitation attempts."""
exploitation_tools = {
"certipy_req_esc1": "esc1_adcs",
"certipy_auth": "esc1_adcs",
"pywhisker": "acl_abuse",
"bloodyad_set_password": "acl_abuse", # pragma: allowlist secret
"bloodyad_add_group_member": "acl_abuse",
"petitpotam": "unconstrained_delegation",
"coercer": "unconstrained_delegation",
"mssql_xp_cmdshell": "mssql_impersonation",
"golden_ticket": "krbtgt_hash",
"generate_golden_ticket": "krbtgt_hash",
}
if tool_name in exploitation_tools:
vuln_type = exploitation_tools[tool_name]
if vuln_type not in _exploited_vulnerabilities:
_exploited_vulnerabilities.add(vuln_type)
logger.info(f"[+] Exploitation attempted: {vuln_type}")


async def track_vulnerability_discoveries(event: ToolEnd):
"""
Track discovered vulnerabilities and exploitation attempts.

This hook monitors tool results to maintain state of what's been
discovered vs exploited for the periodic priority check.
"""
if not hasattr(event, "result") or not event.result:
return

result = str(event.result)
tool_name = event.tool_call.name if hasattr(event, "tool_call") and event.tool_call else ""
step = _last_step_number or 0
result_lower = result.lower()

# Track ESC1 ADCS vulnerability
is_esc1 = "ESC1" in result and ("exploitable" in result_lower or "vulnerable" in result_lower)
if is_esc1:
_track_discovery("esc1_adcs", "ADCS ESC1", "certipy_req_esc1 → certipy_auth", step)

# Track ACL abuse paths
has_acl = any(acl in result_lower for acl in ["genericall", "genericwrite", "writedacl"])
is_acl_tool = tool_name in ["run_bloodhound", "enumerate_users"]
if has_acl and is_acl_tool:
_track_discovery(
"acl_abuse",
"ACL Abuse Path",
"pywhisker / bloodyad_set_password / bloodyad_add_group_member",
step,
)

# Track unconstrained delegation
if "unconstrained" in result_lower and "delegation" in result_lower:
_track_discovery(
"unconstrained_delegation", "Unconstrained Delegation", "petitpotam / coercer", step
)

# Track MSSQL impersonation
is_mssql = "mssql" in tool_name.lower() or "sql" in result_lower
if "impersonate" in result_lower and is_mssql:
_track_discovery(
"mssql_impersonation", "MSSQL Impersonation", "mssql_xp_cmdshell with impersonate", step
)

# Track krbtgt hash
if "krbtgt" in result_lower and (":::" in result or "hash" in result_lower):
_track_discovery("krbtgt_hash", "Krbtgt Hash", "golden_ticket → secretsdump", step)

# Track exploitation attempts
_track_exploitation(tool_name)


async def periodic_priority_check(event: StepStart):
"""
Periodically remind the agent of unexploited discoveries.

Fires every N steps to check if there are discovered vulnerabilities
that haven't been exploited yet, and injects a reminder.
"""
global _last_priority_check_step

step_num = getattr(event, "step_number", None)
if step_num is None:
return None

# Only check every N steps
if step_num - _last_priority_check_step < _PRIORITY_CHECK_INTERVAL:
return None

_last_priority_check_step = step_num

# Find unexploited vulnerabilities
unexploited = []
for vuln_id, vuln_info in _discovered_vulnerabilities.items():
if vuln_id not in _exploited_vulnerabilities:
steps_ago = step_num - vuln_info["step"]
unexploited.append(
f" - {vuln_info['type']} (found {steps_ago} steps ago)\n"
f" → Exploit with: {vuln_info['tool']}"
)

if not unexploited:
return None

logger.warning(f"[!] Periodic check: {len(unexploited)} unexploited vulnerabilities")

return (
"\n\n" + "=" * 50 + "\n"
"🔔 PRIORITY CHECK: UNEXPLOITED DISCOVERIES 🔔\n" + "=" * 50 + "\n\n"
"You have discovered vulnerabilities that remain UNEXPLOITED:\n\n"
+ "\n".join(unexploited)
+ "\n\n"
"⚠️ DO NOT summarize or complete until these are exploited!\n"
"⚠️ Discovery without exploitation is FAILURE.\n" + "=" * 50
)


@dn.tool
def complete_operation(summary: str) -> str:
"""
Expand Down Expand Up @@ -242,6 +477,25 @@ def create_redteam_agent(
reporting_tools = RedTeamReportingTools()
reporting_tools.set_state(state)

# New exploitation toolsets (CAP-838)
coercion_tools = CoercionTools()
coercion_tools.set_state(state)

mssql_tools = MSSQLTools()
mssql_tools.set_state(state)

acl_tools = ACLExploitTools()
acl_tools.set_state(state)

cve_tools = CVEExploitTools()
cve_tools.set_state(state)

trust_tools = TrustAttackTools()
trust_tools.set_state(state)

lateral_tools = LateralMovementTools()
lateral_tools.set_state(state)

tools: list = [
network_tools,
credential_tools,
Expand All @@ -251,6 +505,14 @@ def create_redteam_agent(
bloodhound_tools,
certipy_tools,
delegation_tools,
# New exploitation tools (CAP-838)
coercion_tools,
mssql_tools,
acl_tools,
cve_tools,
trust_tools,
lateral_tools,
# Reporting and completion
reporting_tools,
complete_operation,
]
Expand All @@ -270,6 +532,9 @@ def create_redteam_agent(
log_agent_end,
log_tool_usage,
log_tool_result,
vulnerability_discovery_hook, # Force exploitation when vulns found
track_vulnerability_discoveries, # Track discovered vs exploited vulns
periodic_priority_check, # Remind agent of unexploited discoveries
unstall_hook,
],
stop_conditions=[
Expand Down
18 changes: 18 additions & 0 deletions src/ares/tools/red/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
"""Red team penetration testing tools."""

from ares.tools.red.network import (
ACLExploitTools,
BloodHoundTools,
CertipyTools,
CoercionTools,
CrackingTools,
CredentialHarvestingTools,
CVEExploitTools,
DelegationTools,
GoldenTicketTools,
LateralMovementTools,
MSSQLTools,
NetworkEnumerationTools,
RedTeamReportingTools,
SharePilferingTools,
TrustAttackTools,
)

__all__ = [
"ACLExploitTools",
"BloodHoundTools",
"CVEExploitTools",
"CertipyTools",
"CoercionTools",
"CrackingTools",
"CredentialHarvestingTools",
"DelegationTools",
"GoldenTicketTools",
"LateralMovementTools",
"MSSQLTools",
"NetworkEnumerationTools",
"RedTeamReportingTools",
"SharePilferingTools",
"TrustAttackTools",
]
Loading