From c17cd2b04393a7dbf1f4d739d0f549049f347f8f Mon Sep 17 00:00:00 2001 From: mrveiss Date: Fri, 3 Apr 2026 23:36:23 +0300 Subject: [PATCH 1/2] feat(slm): add remote shell execution API + distributed_shell workflow step type for fleet parallelism (#3406) Co-Authored-By: Claude Sonnet 4.6 --- autobot-backend/orchestration/dag_executor.py | 169 ++++++++++++ autobot-slm-backend/api/__init__.py | 2 + autobot-slm-backend/api/nodes_execution.py | 258 ++++++++++++++++++ autobot-slm-backend/main.py | 2 + docs/examples/parallel_fleet_workflow.py | 149 ++++++++++ docs/user/guides/workflows.md | 107 +++++++- 6 files changed, 673 insertions(+), 14 deletions(-) create mode 100644 autobot-slm-backend/api/nodes_execution.py create mode 100644 docs/examples/parallel_fleet_workflow.py diff --git a/autobot-backend/orchestration/dag_executor.py b/autobot-backend/orchestration/dag_executor.py index 50d90c277..e25241747 100644 --- a/autobot-backend/orchestration/dag_executor.py +++ b/autobot-backend/orchestration/dag_executor.py @@ -23,6 +23,9 @@ import asyncio import logging +import os +import ssl +import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Coroutine, Dict, List, Optional, Set @@ -40,6 +43,7 @@ class NodeType(str, Enum): STEP = "step" CONDITION = "condition" PARALLEL = "parallel" + DISTRIBUTED_SHELL = "distributed_shell" # Issue #3406: fleet fan-out @classmethod def _missing_(cls, value: object) -> "NodeType": @@ -464,6 +468,16 @@ async def _execute_node( if node.node_type == NodeType.CONDITION: return await self._execute_condition_node(node, dag, ctx) + # Issue #3406: distributed_shell has its own fan-out executor + if node.node_type == NodeType.DISTRIBUTED_SHELL: + try: + result = await execute_distributed_shell(node, ctx) + except Exception as exc: + logger.error("distributed_shell node %s raised: %s", node.node_id, exc) + result = {"success": False, "error": str(exc), "node_id": node.node_id} + ctx.step_results[node.node_id] = result + return [e.target for e in dag.successors(node.node_id)] + # Regular (STEP / PARALLEL / unknown-as-STEP) node try: result = await self._execute_step(node, ctx) @@ -553,6 +567,161 @@ def _get_next_node_ids( return [e.target for e in dag.successors(node.node_id)] +# --------------------------------------------------------------------------- +# Issue #3406: Distributed shell fan-out +# --------------------------------------------------------------------------- + + +def _build_slm_ssl_context() -> ssl.SSLContext: + """Create SSL context for SLM HTTP calls (mirrors slm_client pattern).""" + ctx = ssl.create_default_context() + if os.environ.get("AUTOBOT_SKIP_TLS_VERIFY", "").lower() == "true": + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +async def _execute_on_node( + slm_url: str, + auth_token: str, + node_id: str, + script: str, + language: str, + timeout: int, +) -> Dict[str, Any]: + """POST /nodes/{node_id}/execute on the SLM backend. + + Returns a per-node result dict compatible with DAGExecutionContext. + """ + import aiohttp # lazy import — not available in all environments + + url = f"{slm_url}/api/nodes/{node_id}/execute" + payload = {"command": script, "language": language, "timeout": timeout} + headers = {"Authorization": f"Bearer {auth_token}"} + ssl_ctx = _build_slm_ssl_context() + connector = aiohttp.TCPConnector(ssl=ssl_ctx) + + try: + async with aiohttp.ClientSession( + headers=headers, connector=connector + ) as session: + async with session.post( + url, json=payload, timeout=aiohttp.ClientTimeout(total=timeout + 30) + ) as resp: + if resp.status == 200: + data = await resp.json() + return { + "node_id": node_id, + "exit_code": data.get("exit_code", -1), + "stdout": data.get("stdout", ""), + "stderr": data.get("stderr", ""), + "duration_ms": data.get("duration_ms", 0), + "success": data.get("exit_code", -1) == 0, + } + body = await resp.text() + return { + "node_id": node_id, + "exit_code": -1, + "stdout": "", + "stderr": f"HTTP {resp.status}: {body[:500]}", + "duration_ms": 0, + "success": False, + } + except Exception as exc: + logger.error("distributed_shell: node %s raised %s", node_id, exc) + return { + "node_id": node_id, + "exit_code": -1, + "stdout": "", + "stderr": str(exc), + "duration_ms": 0, + "success": False, + } + + +async def execute_distributed_shell(node: DAGNode, ctx: DAGExecutionContext) -> Dict[str, Any]: + """Fan out *node.data* shell script to all listed fleet nodes in parallel. + + Expected ``node.data`` schema:: + + { + "nodes": ["node-id-1", "node-id-2", ...], + "script": "echo hello", + "language": "bash", # optional, default "bash" + "timeout": 300, # optional, default 300 + } + + Returns a result dict whose ``success`` is True only when all nodes + return exit_code 0. Per-node details are in ``node_results``. + """ + slm_url = os.environ.get("SLM_URL", "").rstrip("/") + auth_token = os.environ.get("SLM_AUTH_TOKEN", "") + if not slm_url: + return { + "success": False, + "error": "SLM_URL not configured for distributed_shell", + "node_results": [], + "node_id": node.node_id, + } + + target_nodes: List[str] = node.data.get("nodes", []) + script: str = node.data.get("script", "") + language: str = node.data.get("language", "bash") + timeout: int = int(node.data.get("timeout", 300)) + + if not target_nodes: + return { + "success": False, + "error": "distributed_shell: 'nodes' list is empty", + "node_results": [], + "node_id": node.node_id, + } + if not script: + return { + "success": False, + "error": "distributed_shell: 'script' is required", + "node_results": [], + "node_id": node.node_id, + } + + logger.info( + "distributed_shell %s: fanning out to %d node(s): %s", + node.node_id, + len(target_nodes), + target_nodes, + ) + t0 = time.monotonic() + + results: List[Dict[str, Any]] = await asyncio.gather( + *( + _execute_on_node(slm_url, auth_token, nid, script, language, timeout) + for nid in target_nodes + ) + ) + + total_ms = int((time.monotonic() - t0) * 1000) + all_ok = all(r.get("success", False) for r in results) + failed = [r["node_id"] for r in results if not r.get("success", False)] + + if not all_ok: + logger.warning( + "distributed_shell %s: %d/%d node(s) failed: %s", + node.node_id, + len(failed), + len(target_nodes), + failed, + ) + + return { + "success": all_ok, + "node_id": node.node_id, + "node_results": results, + "total_duration_ms": total_ms, + "failed_nodes": failed, + "error": None if all_ok else f"Nodes failed: {failed}", + } + + # --------------------------------------------------------------------------- # Convenience helpers used by WorkflowExecutor integration # --------------------------------------------------------------------------- diff --git a/autobot-slm-backend/api/__init__.py b/autobot-slm-backend/api/__init__.py index 999002aa8..7b584d35b 100644 --- a/autobot-slm-backend/api/__init__.py +++ b/autobot-slm-backend/api/__init__.py @@ -29,6 +29,7 @@ from .mfa import router as mfa_router from .monitoring import router as monitoring_router from .nodes import router as nodes_router +from .nodes_execution import router as nodes_execution_router from .npu import router as npu_router from .orchestration import router as orchestration_router from .secrets import router as secrets_router @@ -52,6 +53,7 @@ "api_keys_router", "auth_router", "nodes_router", + "nodes_execution_router", "deployments_router", "settings_router", "errors_router", diff --git a/autobot-slm-backend/api/nodes_execution.py b/autobot-slm-backend/api/nodes_execution.py new file mode 100644 index 000000000..2db51feff --- /dev/null +++ b/autobot-slm-backend/api/nodes_execution.py @@ -0,0 +1,258 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Node Remote Execution API + +Issue #3406: Adds POST /nodes/{node_id}/execute — a guarded endpoint that +runs a shell script on the target node. Commands are validated against an +injection-pattern denylist and an optional allowlist before execution. + +Security model +-------------- +- Shell injection patterns (backtick, process substitution, null-byte, etc.) + are always rejected. +- An opt-in ALLOWED_COMMANDS_PATTERN env var restricts commands to an + additional regex if set. +- The node must be ONLINE before a job is accepted. +- All executions are audit-logged via the standard node event system. +""" + +import asyncio +import logging +import os +import re +import time +import uuid + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, Field +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from models.database import EventSeverity, EventType, Node, NodeEvent, NodeStatus +from services.auth import get_current_user +from services.database import get_db + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/nodes", tags=["nodes-execution"]) + +# --------------------------------------------------------------------------- +# Security: static injection-pattern denylist +# --------------------------------------------------------------------------- + +# Patterns that are unconditionally rejected regardless of allowlist. +_INJECTION_PATTERNS: list[re.Pattern] = [ + re.compile(r"`"), # backtick command substitution + re.compile(r"\$\("), # $(…) command substitution + re.compile(r"<\("), # process substitution <(…) + re.compile(r">\("), # process substitution >(…) + re.compile(r"\x00"), # null byte + re.compile(r";\s*rm\s"), # destructive rm chaining + re.compile(r"\|\s*bash"), # pipe-to-bash + re.compile(r"\|\s*sh\b"), # pipe-to-sh + re.compile(r"curl\s.*\|\s*(bash|sh)"), # curl-pipe-execute + re.compile(r"wget\s.*-O\s*-"), # wget stdout pipe +] + +# Optional: set ALLOWED_COMMANDS_PATTERN to a regex; commands not matching +# are rejected. Empty / unset means no additional restriction. +_ALLOWED_RE_SRC = os.getenv("ALLOWED_COMMANDS_PATTERN", "") +_ALLOWED_RE: re.Pattern | None = ( + re.compile(_ALLOWED_RE_SRC) if _ALLOWED_RE_SRC else None +) + + +def _validate_command(script: str) -> None: + """Raise HTTPException 400 if *script* contains forbidden patterns.""" + for pattern in _INJECTION_PATTERNS: + if pattern.search(script): + logger.warning("Command rejected — injection pattern: %s", pattern.pattern) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Command rejected: forbidden pattern detected", + ) + if _ALLOWED_RE and not _ALLOWED_RE.search(script): + logger.warning("Command rejected — not in allowlist: %.80s", script) + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Command rejected: does not match configured allowlist", + ) + + +# --------------------------------------------------------------------------- +# Request / response schemas +# --------------------------------------------------------------------------- + + +class NodeExecuteRequest(BaseModel): + """Body for POST /nodes/{node_id}/execute.""" + + command: str = Field( + ..., + description="Shell command or script body to execute on the node.", + min_length=1, + max_length=32_768, + ) + language: str = Field( + default="bash", + description="Interpreter: 'bash' or 'sh'.", + pattern=r"^(bash|sh)$", + ) + timeout: int = Field( + default=300, + ge=1, + le=3600, + description="Maximum execution time in seconds (1–3600).", + ) + + +class NodeExecuteResponse(BaseModel): + """Result of a remote execution job.""" + + node_id: str + job_id: str + exit_code: int + stdout: str + stderr: str + duration_ms: int + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _require_online_node(node_id: str, db: AsyncSession) -> Node: + """Fetch node and verify it is ONLINE; raise 404/409 otherwise.""" + result = await db.execute(select(Node).where(Node.node_id == node_id)) + node = result.scalar_one_or_none() + if node is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Node {node_id!r} not found", + ) + if node.status != NodeStatus.ONLINE.value: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Node {node_id!r} is not ONLINE (status: {node.status})", + ) + return node + + +async def _audit_execute_event( + db: AsyncSession, + node_id: str, + job_id: str, + exit_code: int, + duration_ms: int, + severity: EventSeverity, +) -> None: + """Persist an audit NodeEvent for the remote-execute job.""" + event = NodeEvent( + event_id=str(uuid.uuid4())[:16], + node_id=node_id, + event_type=EventType.MANUAL_ACTION.value, + severity=severity.value, + message=f"Remote execution job {job_id}: exit_code={exit_code}", + details={ + "job_id": job_id, + "exit_code": exit_code, + "duration_ms": duration_ms, + }, + ) + db.add(event) + await db.commit() + + +async def _run_locally( + script: str, language: str, timeout: int +) -> tuple[int, str, str]: + """Execute *script* in a subprocess; return (exit_code, stdout, stderr).""" + interpreter = "/bin/bash" if language == "bash" else "/bin/sh" + proc = await asyncio.create_subprocess_exec( + interpreter, + "-c", + script, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + raw_out, raw_err = await asyncio.wait_for( + proc.communicate(), timeout=float(timeout) + ) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + return 124, "", f"Execution timed out after {timeout}s" + + return ( + proc.returncode if proc.returncode is not None else 1, + raw_out.decode("utf-8", errors="replace"), + raw_err.decode("utf-8", errors="replace"), + ) + + +# --------------------------------------------------------------------------- +# Route +# --------------------------------------------------------------------------- + + +@router.post( + "/{node_id}/execute", + response_model=NodeExecuteResponse, + summary="Execute a shell command on a fleet node", +) +async def execute_on_node( + node_id: str, + body: NodeExecuteRequest, + db: AsyncSession = Depends(get_db), + _user=Depends(get_current_user), +) -> NodeExecuteResponse: + """Run *body.command* on the node identified by *node_id*. + + The node must be ONLINE. Commands are validated against the injection + denylist before execution. The result is audit-logged as a NodeEvent. + + Currently executes locally (this host is the manager node). Future + iterations will fan out via the SLM agent Redis queue when the target + node is remote. + """ + _validate_command(body.command) + await _require_online_node(node_id, db) + + job_id = str(uuid.uuid4())[:16] + logger.info( + "Remote execute: node=%s job=%s language=%s timeout=%s", + node_id, + job_id, + body.language, + body.timeout, + ) + + t0 = time.monotonic() + exit_code, stdout, stderr = await _run_locally( + body.command, body.language, body.timeout + ) + duration_ms = int((time.monotonic() - t0) * 1000) + + severity = EventSeverity.INFO if exit_code == 0 else EventSeverity.WARNING + await _audit_execute_event(db, node_id, job_id, exit_code, duration_ms, severity) + + logger.info( + "Remote execute done: node=%s job=%s exit=%d dur=%dms", + node_id, + job_id, + exit_code, + duration_ms, + ) + + return NodeExecuteResponse( + node_id=node_id, + job_id=job_id, + exit_code=exit_code, + stdout=stdout, + stderr=stderr, + duration_ms=duration_ms, + ) diff --git a/autobot-slm-backend/main.py b/autobot-slm-backend/main.py index 9fe7a77e3..a6e974991 100644 --- a/autobot-slm-backend/main.py +++ b/autobot-slm-backend/main.py @@ -41,6 +41,7 @@ node_config_router, node_tls_router, node_vnc_router, + nodes_execution_router, nodes_router, npu_router, orchestration_router, @@ -390,6 +391,7 @@ async def _seed_default_agents(): app.include_router(agents_router, prefix="/api") app.include_router(auth_router, prefix="/api") app.include_router(nodes_router, prefix="/api") +app.include_router(nodes_execution_router, prefix="/api") # Issue #3406 app.include_router(services_router, prefix="/api") app.include_router(fleet_services_router, prefix="/api") app.include_router(deployments_router, prefix="/api") diff --git a/docs/examples/parallel_fleet_workflow.py b/docs/examples/parallel_fleet_workflow.py new file mode 100644 index 000000000..f9ba10eb9 --- /dev/null +++ b/docs/examples/parallel_fleet_workflow.py @@ -0,0 +1,149 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Parallel Fleet Workflow Example + +Issue #3406: Demonstrates building a workflow definition that uses +``distributed_shell`` steps to run shell scripts across multiple fleet +nodes in parallel via the DAG executor. + +Running this example +-------------------- +Set the required environment variables then execute the script directly: + + export SLM_URL=https:// + export SLM_AUTH_TOKEN= + python docs/examples/parallel_fleet_workflow.py + +The script builds an in-process WorkflowDAG and calls DAGExecutor +directly — it does not require a running AutoBot backend server. +""" + +import asyncio +import json +import logging +import os +import sys + +# --------------------------------------------------------------------------- +# Path bootstrap — allows running from the repo root without install +# --------------------------------------------------------------------------- +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) +_BACKEND_DIR = os.path.join(_REPO_ROOT, "autobot-backend") +if _BACKEND_DIR not in sys.path: + sys.path.insert(0, _BACKEND_DIR) + +from orchestration.dag_executor import ( # noqa: E402 + DAGExecutionContext, + DAGExecutor, + DAGNode, + NodeType, + WorkflowDAG, + execute_distributed_shell, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", +) +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Workflow definition +# --------------------------------------------------------------------------- + +# Three fleet nodes to target (replace with real node IDs from your SLM). +TARGET_NODES = [ + os.getenv("DEMO_NODE_1", "node-001"), + os.getenv("DEMO_NODE_2", "node-002"), + os.getenv("DEMO_NODE_3", "node-003"), +] + +WORKFLOW_NODES = [ + { + "id": "collect-facts", + "type": "distributed_shell", + "data": { + "nodes": TARGET_NODES, + "script": "hostname && uname -r && df -h /", + "language": "bash", + "timeout": 60, + }, + }, + { + "id": "check-services", + "type": "distributed_shell", + "data": { + "nodes": TARGET_NODES, + "script": "systemctl is-active autobot-agent || true", + "language": "bash", + "timeout": 30, + }, + }, + { + "id": "report", + "type": "distributed_shell", + "data": { + "nodes": TARGET_NODES, + "script": "echo 'Fleet health check complete on $(hostname)'", + "language": "bash", + "timeout": 15, + }, + }, +] + +WORKFLOW_EDGES = [ + {"source": "collect-facts", "target": "check-services"}, + {"source": "check-services", "target": "report"}, +] + + +# --------------------------------------------------------------------------- +# Step executor callback (required by DAGExecutor for non-distributed steps) +# --------------------------------------------------------------------------- + + +async def _noop_step(node: DAGNode, ctx: DAGExecutionContext): + """Fallback for any STEP nodes — not used in this example.""" + return {"success": True, "node_id": node.node_id} + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def run_workflow() -> None: + dag = WorkflowDAG(WORKFLOW_NODES, WORKFLOW_EDGES) + + cycle = dag.detect_cycle() + if cycle: + logger.error("DAG has a cycle: %s", " -> ".join(cycle)) + return + + executor = DAGExecutor(step_executor_callback=_noop_step) + logger.info("Starting parallel fleet workflow (3 steps, %d nodes each)", len(TARGET_NODES)) + ctx = await executor.execute(dag, workflow_id="fleet-health-check-demo") + + logger.info("Workflow finished: status=%s", ctx.status) + + for step_id, result in ctx.step_results.items(): + print(f"\n--- Step: {step_id} ---") + print(f" success: {result.get('success')}") + print(f" total_duration_ms: {result.get('total_duration_ms')}") + for node_result in result.get("node_results", []): + print( + f" [{node_result['node_id']}] exit={node_result['exit_code']} " + f"stdout={node_result['stdout'][:120]!r}" + ) + if result.get("failed_nodes"): + print(f" FAILED nodes: {result['failed_nodes']}") + + if ctx.error: + logger.error("Workflow error: %s", ctx.error) + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(run_workflow()) diff --git a/docs/user/guides/workflows.md b/docs/user/guides/workflows.md index ab2a3e0b9..62895256c 100644 --- a/docs/user/guides/workflows.md +++ b/docs/user/guides/workflows.md @@ -59,14 +59,15 @@ Click **Workflow Automation** in the navigation bar, or navigate to Common step types include: -| Step Type | Description | -|-----------|-------------| -| AI Task | Send a prompt to an AI agent and use its response | -| Web Request | Fetch data from a URL | -| Knowledge Query | Search or update the knowledge base | -| Notification | Send an alert via email, webhook, or in-app message | -| Condition | Branch the workflow based on a yes/no check | -| Delay | Wait for a specified time before continuing | +| Step Type | Description | +|-------------------|----------------------------------------------------------------| +| AI Task | Send a prompt to an AI agent and use its response | +| Web Request | Fetch data from a URL | +| Knowledge Query | Search or update the knowledge base | +| Notification | Send an alert via email, webhook, or in-app message | +| Condition | Branch the workflow based on a yes/no check | +| Delay | Wait for a specified time before continuing | +| Distributed Shell | Run a shell script across multiple fleet nodes simultaneously | ## Running a Workflow @@ -89,12 +90,12 @@ To run a workflow on a recurring schedule: The **Overview** page shows all workflows and their current status: -| Status | Meaning | -|--------|---------| -| Idle | Not currently running | -| Running | Executing steps right now | -| Completed | Finished successfully | -| Failed | One or more steps encountered an error | +| Status | Meaning | +|-----------|----------------------------------------| +| Idle | Not currently running | +| Running | Executing steps right now | +| Completed | Finished successfully | +| Failed | One or more steps encountered an error | Click a workflow to see detailed logs for each step, including input, output, and any error messages. @@ -196,6 +197,84 @@ configuration using Python `string.Template` syntax. - `NotificationEvent.SERVICE_FAILED` and its default template are defined in `autobot-backend/services/notification_service.py`. +## Parallel Fleet Execution + +The `distributed_shell` step type lets a single workflow node run a shell +script on multiple fleet machines at the same time. All target nodes execute +the script concurrently via `asyncio.gather`; the step succeeds only when +every node returns exit code 0. + +### How it works + +1. The DAG executor calls `POST /api/nodes/{node_id}/execute` on the SLM + backend for each node in the `nodes` list simultaneously. +2. Each call is validated server-side against an injection-pattern denylist + before the script is executed. +3. Per-node results (exit code, stdout, stderr, duration) are collected and + stored in the step output. +4. If any node fails the whole step is marked failed and the per-node details + show which nodes returned a non-zero exit code. + +### Step configuration + +```json +{ + "id": "my-fleet-step", + "type": "distributed_shell", + "data": { + "nodes": ["node-001", "node-002", "node-003"], + "script": "hostname && systemctl is-active autobot-agent", + "language": "bash", + "timeout": 120 + } +} +``` + +| Field | Type | Default | Description | +|------------|--------------------|----------|------------------------------------------| +| `nodes` | list of node IDs | required | Fleet nodes to target | +| `script` | string | required | Shell script body | +| `language` | `"bash"` or `"sh"` | `"bash"` | Interpreter | +| `timeout` | integer (seconds) | 300 | Per-node execution timeout (1–3600) | + +### Step output shape + +```json +{ + "success": true, + "node_id": "my-fleet-step", + "total_duration_ms": 843, + "failed_nodes": [], + "node_results": [ + { + "node_id": "node-001", + "exit_code": 0, + "stdout": "node-001\nactive\n", + "stderr": "", + "duration_ms": 312, + "success": true + } + ] +} +``` + +### Required environment variables + +| Variable | Purpose | +| --- | --- | +| `SLM_URL` | Base URL of the SLM backend (e.g. `https://slm.example.com`) | +| `SLM_AUTH_TOKEN` | JWT token used to authenticate with the SLM execute endpoint | + +### Security + +Commands are validated before execution by a static denylist of shell +injection patterns (backtick substitution, `$(...)`, pipe-to-bash, and +others). An optional `ALLOWED_COMMANDS_PATTERN` environment variable on +the SLM host provides an additional regex allowlist. + +For a complete runnable example see +[`docs/examples/parallel_fleet_workflow.py`](../../examples/parallel_fleet_workflow.py). + ## Related Guides - [Working with Agents](working-with-agents.md) -- agents power many workflow From 4f26bfd395c64061d43d133a653300210093df82 Mon Sep 17 00:00:00 2001 From: mrveiss Date: Fri, 3 Apr 2026 23:47:53 +0300 Subject: [PATCH 2/2] fix(slm): route execute_on_node through SSH for remote nodes; fix denylist-blocked example - Replace _run_locally with _run_script (local) and _run_via_ssh (remote) - _is_local_ip() detects manager-host IPs; remote nodes use SSH via SLM_SSH_KEY + node.ssh_user/ssh_port (same pattern as code_distributor) - Remove misleading "future iterations" TODO comment - Example: change $(hostname) to $HOSTNAME to avoid denylist rejection Co-Authored-By: Claude Sonnet 4.6 --- autobot-slm-backend/api/nodes_execution.py | 84 +++++++++++++++++----- docs/examples/parallel_fleet_workflow.py | 2 +- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/autobot-slm-backend/api/nodes_execution.py b/autobot-slm-backend/api/nodes_execution.py index 2db51feff..b00171fa4 100644 --- a/autobot-slm-backend/api/nodes_execution.py +++ b/autobot-slm-backend/api/nodes_execution.py @@ -22,8 +22,10 @@ import logging import os import re +import socket import time import uuid +from pathlib import Path from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field @@ -166,27 +168,69 @@ async def _audit_execute_event( await db.commit() -async def _run_locally( +_SSH_KEY_PATH = os.environ.get("SLM_SSH_KEY", "/home/autobot/.ssh/autobot_key") # noqa: ssot-path + +_LOCAL_ADDRESSES = {"127.0.0.1", "::1", "localhost"} +try: + _LOCAL_ADDRESSES.add(socket.gethostbyname(socket.gethostname())) +except OSError: + pass + + +def _is_local_ip(ip: str) -> bool: + """Return True if *ip* resolves to this host.""" + return ip in _LOCAL_ADDRESSES + + +async def _run_script( script: str, language: str, timeout: int ) -> tuple[int, str, str]: - """Execute *script* in a subprocess; return (exit_code, stdout, stderr).""" + """Execute *script* locally via subprocess; return (exit_code, stdout, stderr).""" interpreter = "/bin/bash" if language == "bash" else "/bin/sh" proc = await asyncio.create_subprocess_exec( - interpreter, - "-c", - script, + interpreter, "-c", script, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: - raw_out, raw_err = await asyncio.wait_for( - proc.communicate(), timeout=float(timeout) - ) + raw_out, raw_err = await asyncio.wait_for(proc.communicate(), timeout=float(timeout)) except asyncio.TimeoutError: proc.kill() await proc.communicate() return 124, "", f"Execution timed out after {timeout}s" + return ( + proc.returncode if proc.returncode is not None else 1, + raw_out.decode("utf-8", errors="replace"), + raw_err.decode("utf-8", errors="replace"), + ) + +async def _run_via_ssh( + ip: str, ssh_user: str, ssh_port: int, script: str, language: str, timeout: int +) -> tuple[int, str, str]: + """Execute *script* on *ip* via SSH; return (exit_code, stdout, stderr).""" + interpreter = "bash" if language == "bash" else "sh" + cmd = [ + "ssh", "-p", str(ssh_port), + "-o", "StrictHostKeyChecking=no", + "-o", "BatchMode=yes", + "-o", f"ConnectTimeout={min(timeout, 30)}", + ] + if Path(_SSH_KEY_PATH).exists(): + cmd.extend(["-i", _SSH_KEY_PATH]) + cmd.append(f"{ssh_user}@{ip}") + cmd.extend([interpreter, "-c", script]) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + raw_out, raw_err = await asyncio.wait_for(proc.communicate(), timeout=float(timeout)) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + return 124, "", f"SSH execution timed out after {timeout}s" return ( proc.returncode if proc.returncode is not None else 1, raw_out.decode("utf-8", errors="replace"), @@ -215,26 +259,34 @@ async def execute_on_node( The node must be ONLINE. Commands are validated against the injection denylist before execution. The result is audit-logged as a NodeEvent. - Currently executes locally (this host is the manager node). Future - iterations will fan out via the SLM agent Redis queue when the target - node is remote. + Local nodes (manager host) execute via subprocess; remote nodes execute + via SSH using the SLM key (SLM_SSH_KEY env var, default + /home/autobot/.ssh/autobot_key) with the node's ssh_user and ssh_port. """ _validate_command(body.command) - await _require_online_node(node_id, db) + node = await _require_online_node(node_id, db) job_id = str(uuid.uuid4())[:16] logger.info( - "Remote execute: node=%s job=%s language=%s timeout=%s", + "Execute: node=%s ip=%s job=%s language=%s timeout=%s", node_id, + node.ip_address, job_id, body.language, body.timeout, ) t0 = time.monotonic() - exit_code, stdout, stderr = await _run_locally( - body.command, body.language, body.timeout - ) + if _is_local_ip(node.ip_address or ""): + exit_code, stdout, stderr = await _run_script( + body.command, body.language, body.timeout + ) + else: + ssh_user = node.ssh_user or "autobot" + ssh_port = int(node.ssh_port or 22) + exit_code, stdout, stderr = await _run_via_ssh( + node.ip_address, ssh_user, ssh_port, body.command, body.language, body.timeout + ) duration_ms = int((time.monotonic() - t0) * 1000) severity = EventSeverity.INFO if exit_code == 0 else EventSeverity.WARNING diff --git a/docs/examples/parallel_fleet_workflow.py b/docs/examples/parallel_fleet_workflow.py index f9ba10eb9..40636bc67 100644 --- a/docs/examples/parallel_fleet_workflow.py +++ b/docs/examples/parallel_fleet_workflow.py @@ -86,7 +86,7 @@ "type": "distributed_shell", "data": { "nodes": TARGET_NODES, - "script": "echo 'Fleet health check complete on $(hostname)'", + "script": "echo \"Fleet health check complete on $HOSTNAME\"", "language": "bash", "timeout": 15, },