Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions autobot-backend/orchestration/dag_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import asyncio
import logging
import os
import ssl
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Coroutine, Dict, List, Optional, Set
Expand All @@ -40,6 +43,7 @@ class NodeType(str, Enum):
STEP = "step"
CONDITION = "condition"
PARALLEL = "parallel"
DISTRIBUTED_SHELL = "distributed_shell" # Issue #3406: fleet fan-out

@classmethod
def _missing_(cls, value: object) -> "NodeType":
Expand Down Expand Up @@ -464,6 +468,16 @@ async def _execute_node(
if node.node_type == NodeType.CONDITION:
return await self._execute_condition_node(node, dag, ctx)

# Issue #3406: distributed_shell has its own fan-out executor
if node.node_type == NodeType.DISTRIBUTED_SHELL:
try:
result = await execute_distributed_shell(node, ctx)
except Exception as exc:
logger.error("distributed_shell node %s raised: %s", node.node_id, exc)
result = {"success": False, "error": str(exc), "node_id": node.node_id}
ctx.step_results[node.node_id] = result
return [e.target for e in dag.successors(node.node_id)]

# Regular (STEP / PARALLEL / unknown-as-STEP) node
try:
result = await self._execute_step(node, ctx)
Expand Down Expand Up @@ -553,6 +567,161 @@ def _get_next_node_ids(
return [e.target for e in dag.successors(node.node_id)]


# ---------------------------------------------------------------------------
# Issue #3406: Distributed shell fan-out
# ---------------------------------------------------------------------------


def _build_slm_ssl_context() -> ssl.SSLContext:
"""Create SSL context for SLM HTTP calls (mirrors slm_client pattern)."""
ctx = ssl.create_default_context()
if os.environ.get("AUTOBOT_SKIP_TLS_VERIFY", "").lower() == "true":
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx


async def _execute_on_node(
slm_url: str,
auth_token: str,
node_id: str,
script: str,
language: str,
timeout: int,
) -> Dict[str, Any]:
"""POST /nodes/{node_id}/execute on the SLM backend.

Returns a per-node result dict compatible with DAGExecutionContext.
"""
import aiohttp # lazy import — not available in all environments

url = f"{slm_url}/api/nodes/{node_id}/execute"
payload = {"command": script, "language": language, "timeout": timeout}
headers = {"Authorization": f"Bearer {auth_token}"}
ssl_ctx = _build_slm_ssl_context()
connector = aiohttp.TCPConnector(ssl=ssl_ctx)

try:
async with aiohttp.ClientSession(
headers=headers, connector=connector
) as session:
async with session.post(
url, json=payload, timeout=aiohttp.ClientTimeout(total=timeout + 30)
) as resp:
if resp.status == 200:
data = await resp.json()
return {
"node_id": node_id,
"exit_code": data.get("exit_code", -1),
"stdout": data.get("stdout", ""),
"stderr": data.get("stderr", ""),
"duration_ms": data.get("duration_ms", 0),
"success": data.get("exit_code", -1) == 0,
}
body = await resp.text()
return {
"node_id": node_id,
"exit_code": -1,
"stdout": "",
"stderr": f"HTTP {resp.status}: {body[:500]}",
"duration_ms": 0,
"success": False,
}
except Exception as exc:
logger.error("distributed_shell: node %s raised %s", node_id, exc)
return {
"node_id": node_id,
"exit_code": -1,
"stdout": "",
"stderr": str(exc),
"duration_ms": 0,
"success": False,
}


async def execute_distributed_shell(node: DAGNode, ctx: DAGExecutionContext) -> Dict[str, Any]:
"""Fan out *node.data* shell script to all listed fleet nodes in parallel.

Expected ``node.data`` schema::

{
"nodes": ["node-id-1", "node-id-2", ...],
"script": "echo hello",
"language": "bash", # optional, default "bash"
"timeout": 300, # optional, default 300
}

Returns a result dict whose ``success`` is True only when all nodes
return exit_code 0. Per-node details are in ``node_results``.
"""
slm_url = os.environ.get("SLM_URL", "").rstrip("/")
auth_token = os.environ.get("SLM_AUTH_TOKEN", "")
if not slm_url:
return {
"success": False,
"error": "SLM_URL not configured for distributed_shell",
"node_results": [],
"node_id": node.node_id,
}

target_nodes: List[str] = node.data.get("nodes", [])
script: str = node.data.get("script", "")
language: str = node.data.get("language", "bash")
timeout: int = int(node.data.get("timeout", 300))

if not target_nodes:
return {
"success": False,
"error": "distributed_shell: 'nodes' list is empty",
"node_results": [],
"node_id": node.node_id,
}
if not script:
return {
"success": False,
"error": "distributed_shell: 'script' is required",
"node_results": [],
"node_id": node.node_id,
}

logger.info(
"distributed_shell %s: fanning out to %d node(s): %s",
node.node_id,
len(target_nodes),
target_nodes,
)
t0 = time.monotonic()

results: List[Dict[str, Any]] = await asyncio.gather(
*(
_execute_on_node(slm_url, auth_token, nid, script, language, timeout)
for nid in target_nodes
)
)

total_ms = int((time.monotonic() - t0) * 1000)
all_ok = all(r.get("success", False) for r in results)
failed = [r["node_id"] for r in results if not r.get("success", False)]

if not all_ok:
logger.warning(
"distributed_shell %s: %d/%d node(s) failed: %s",
node.node_id,
len(failed),
len(target_nodes),
failed,
)

return {
"success": all_ok,
"node_id": node.node_id,
"node_results": results,
"total_duration_ms": total_ms,
"failed_nodes": failed,
"error": None if all_ok else f"Nodes failed: {failed}",
}


# ---------------------------------------------------------------------------
# Convenience helpers used by WorkflowExecutor integration
# ---------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions autobot-slm-backend/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .mfa import router as mfa_router
from .monitoring import router as monitoring_router
from .nodes import router as nodes_router
from .nodes_execution import router as nodes_execution_router
from .npu import router as npu_router
from .orchestration import router as orchestration_router
from .secrets import router as secrets_router
Expand All @@ -52,6 +53,7 @@
"api_keys_router",
"auth_router",
"nodes_router",
"nodes_execution_router",
"deployments_router",
"settings_router",
"errors_router",
Expand Down
Loading
Loading