From 531524b049f82e155a6bf8e4e57034b26be1a100 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 23 Apr 2026 15:08:50 +0000 Subject: [PATCH] feat: add agent-server execution mode for open source deployments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for a dual execution backend: 1. Cloud sandbox mode (existing, default): Per-run sandbox provisioning via Cloud API — unchanged behavior. 2. Agent-server mode (new): Connects directly to a persistent agent-server via AUTOMATION_AGENT_SERVER_URL config. No sandbox creation, polling, or cleanup. Aimed at open source / self-hosted deployments. Both modes share the same agent-server HTTP APIs for file upload and bash execution — only the connection setup differs. Changes: - config.py: Add agent_server_url, agent_server_api_key settings and is_agent_server_mode property - execution.py: Introduce AgentConnection abstraction and _connect_cloud_sandbox / _connect_agent_server helpers. Refactor dispatch_automation() and run_automation() to branch on mode. - dispatcher.py: Pass agent-server config through to dispatch call - watchdog.py: Pass agent-server config to verification, skip sandbox cleanup in agent-server mode - utils/sandbox.py: Update verify_run_status() to accept agent-server URL directly (bypasses sandbox discovery in agent-server mode) - app.py: Log which execution backend is active on startup Refs: #62 Co-authored-by: openhands --- automation/app.py | 7 ++ automation/config.py | 12 ++ automation/dispatcher.py | 4 +- automation/execution.py | 242 +++++++++++++++++++++++------------- automation/utils/sandbox.py | 52 +++++--- automation/watchdog.py | 8 +- 6 files changed, 216 insertions(+), 109 deletions(-) diff --git a/automation/app.py b/automation/app.py index 21fecc4..3634276 100644 --- a/automation/app.py +++ b/automation/app.py @@ -48,6 +48,13 @@ async def lifespan(app: FastAPI): logging.getLogger(noisy_logger).setLevel(logging.WARNING) logger.info("Starting OpenHands Automations Service") + if settings.is_agent_server_mode: + logger.info( + "Execution backend: agent-server mode (%s)", + settings.agent_server_url, + ) + else: + logger.info("Execution backend: Cloud sandbox mode") # Create shared httpx client for auth (stored in app.state for DI) app.state.http_client = create_http_client() diff --git a/automation/config.py b/automation/config.py index 13e87dc..46a2ce7 100644 --- a/automation/config.py +++ b/automation/config.py @@ -63,8 +63,20 @@ class Settings(BaseSettings): # Used by the OpenHands server when forwarding GitHub events webhook_secret: str = "" + # --- Execution backend --- + # When set, the automation engine connects directly to this agent-server + # instead of provisioning Cloud sandboxes per run. + # This enables open source / self-hosted deployments. + agent_server_url: str | None = None + agent_server_api_key: str | None = None + model_config = {"env_prefix": "AUTOMATION_"} + @property + def is_agent_server_mode(self) -> bool: + """True when configured to use a persistent agent-server.""" + return self.agent_server_url is not None + @property def base_path(self) -> str: """Route prefix derived from base_url path component + /api/automation. diff --git a/automation/dispatcher.py b/automation/dispatcher.py index 5b6e25f..a17d6bb 100644 --- a/automation/dispatcher.py +++ b/automation/dispatcher.py @@ -191,7 +191,7 @@ def log_extra(sandbox_id: str | None = None) -> dict[str, Any]: else: effective_timeout = MAX_RUN_DURATION_SECONDS - # 5. Dispatch to sandbox (fire-and-forget) + # 5. Dispatch (fire-and-forget) result = await dispatch_automation( api_url=settings.openhands_api_base_url, api_key=api_key, @@ -201,6 +201,8 @@ def log_extra(sandbox_id: str | None = None) -> dict[str, Any]: timeout=effective_timeout, callback_url=callback_url, run_id=run_id, + agent_server_url=settings.agent_server_url, + agent_server_api_key=settings.agent_server_api_key, ) sandbox_extra = log_extra(sandbox_id=result.sandbox_id) diff --git a/automation/execution.py b/automation/execution.py index bad398f..ea5546c 100644 --- a/automation/execution.py +++ b/automation/execution.py @@ -1,7 +1,17 @@ -"""Sandbox execution for automation runs. +"""Execution backends for automation runs. -One function does the whole job: spin up a sandbox, upload a tarball, -extract it, run setup, run the entrypoint, tear down. +Supports two execution modes (selected by config): + +1. **Cloud sandbox mode** (default): Provisions a fresh Cloud sandbox per run + via the OpenHands Cloud API, discovers the agent-server inside, uploads + the tarball, runs the entrypoint, and cleans up after. + +2. **Agent-server mode**: Connects directly to a pre-existing persistent + agent-server (e.g., running in a k8s cluster). No sandbox provisioning, + no polling, no cleanup. Aimed at open-source / self-hosted deployments. + +Both modes share the same agent-server HTTP calls for file upload and bash +execution — only the *connection setup* differs. """ import asyncio @@ -163,6 +173,46 @@ async def _create_and_wait( raise TimeoutError(f"Sandbox {sandbox_id} not ready after {ready_timeout}s") +# -- Agent-server mode helpers ------------------------------------------------ + + +@dataclass(frozen=True) +class AgentConnection: + """Connection details for an agent-server, regardless of how it was obtained.""" + + agent_url: str + session_key: str + sandbox_id: str | None = None # Only set in Cloud sandbox mode + + +async def _connect_cloud_sandbox( + client: httpx.AsyncClient, + api_url: str, + api_key: str, +) -> AgentConnection: + """Cloud mode: create a sandbox and return connection details.""" + sandbox_id, session_key, agent_url = await _create_and_wait( + client, api_url, api_key + ) + return AgentConnection( + agent_url=agent_url, + session_key=session_key, + sandbox_id=sandbox_id, + ) + + +def _connect_agent_server( + agent_server_url: str, + agent_server_api_key: str | None, +) -> AgentConnection: + """Agent-server mode: return connection details from config.""" + return AgentConnection( + agent_url=agent_server_url.rstrip("/"), + session_key=agent_server_api_key or "", + sandbox_id=None, + ) + + async def _upload( client: httpx.AsyncClient, agent_url: str, @@ -315,27 +365,28 @@ async def dispatch_automation( timeout: int = MAX_RUN_DURATION_SECONDS, callback_url: str | None = None, run_id: str | None = None, + *, + agent_server_url: str | None = None, + agent_server_api_key: str | None = None, ) -> DispatchResult: - """Dispatch an automation to a sandbox (fire-and-forget). - - 1. Create sandbox and wait until RUNNING. - 2. Get tarball into sandbox (upload bytes OR download from URL). - 3. Extract it, run ``setup.sh`` (if present), then start *entrypoint*. - 4. Return immediately without waiting for the entrypoint to complete. - - The SDK inside the sandbox will POST to callback_url when finished. - The caller should store sandbox_id to verify status later if needed. - - *tarball_source*: Either raw bytes (uploaded to sandbox) or a URL string - (downloaded directly inside sandbox via curl). URLs avoid downloading - untrusted/large files on the automation service. - - *env_vars* are exported before the entrypoint runs. The sandbox - identity env vars (``SANDBOX_ID``, ``SESSION_API_KEY``) are - **always** injected so the SDK's ``local_agent_server_mode`` works. - If *callback_url* / *run_id* are set they are injected as - ``AUTOMATION_CALLBACK_URL`` / ``AUTOMATION_RUN_ID`` so the SDK's - ``OpenHandsCloudWorkspace`` can POST completion status on exit. + """Dispatch an automation (fire-and-forget). + + **Cloud sandbox mode** (default, ``agent_server_url`` is ``None``): + Creates a fresh sandbox, uploads the tarball, starts the entrypoint, + and returns immediately. The SDK inside the sandbox POSTs to + *callback_url* when finished. + + **Agent-server mode** (``agent_server_url`` is set): + Connects directly to the configured agent-server — no sandbox + provisioning or cleanup. + + *tarball_source*: Either raw bytes (uploaded) or a URL string + (downloaded directly inside the runtime via curl). + + *env_vars* are exported before the entrypoint runs. In Cloud mode + the sandbox identity env vars (``SANDBOX_ID``, ``SESSION_API_KEY``) + are injected automatically. In agent-server mode + ``AGENT_SERVER_URL`` is injected instead. """ env_vars = dict(env_vars) if env_vars else {} if callback_url: @@ -343,48 +394,66 @@ async def dispatch_automation( if run_id: env_vars["AUTOMATION_RUN_ID"] = run_id api_url = api_url.rstrip("/") + + is_agent_server_mode = agent_server_url is not None sandbox_id: str | None = None - # Helper for consistent structured logging with run_id/sandbox_id def log_extra() -> dict[str, Any]: return _log_extra(run_id=run_id, sandbox_id=sandbox_id) - logger.info("Dispatching automation to sandbox", extra=log_extra()) + logger.info( + "Dispatching automation (%s mode)", + "agent-server" if is_agent_server_mode else "cloud-sandbox", + extra=log_extra(), + ) async with httpx.AsyncClient(timeout=60.0) as client: + # --- Obtain agent connection (mode-dependent) --- try: - sandbox_id, session_key, agent_url = await _create_and_wait( - client, api_url, api_key - ) + if is_agent_server_mode: + conn = _connect_agent_server(agent_server_url, agent_server_api_key) + else: + conn = await _connect_cloud_sandbox(client, api_url, api_key) + sandbox_id = conn.sandbox_id logger.info( - "Sandbox ready: %s at %s", sandbox_id, agent_url, extra=log_extra() + "Agent-server ready at %s (sandbox_id=%s)", + conn.agent_url, + sandbox_id, + extra=log_extra(), ) except Exception as e: - # If sandbox creation started but failed to reach RUNNING, - # still attempt cleanup. - logger.exception("Sandbox creation failed", extra=log_extra()) + logger.exception("Connection setup failed", extra=log_extra()) if sandbox_id: await delete_sandbox(client, api_url, api_key, sandbox_id) return DispatchResult(success=False, sandbox_id=sandbox_id, error=str(e)) + # --- Upload tarball + start entrypoint (shared logic) --- try: - # Always inject sandbox identity so the SDK can call - # get_llm() / get_secrets() inside the sandbox. - env_vars.setdefault("SANDBOX_ID", sandbox_id) - env_vars.setdefault("SESSION_API_KEY", session_key) + # Inject identity env vars + if is_agent_server_mode: + env_vars.setdefault("AGENT_SERVER_URL", conn.agent_url) + else: + env_vars.setdefault("SANDBOX_ID", conn.sandbox_id or "") + env_vars.setdefault("SESSION_API_KEY", conn.session_key) - # Get tarball into sandbox: upload bytes or download from URL + # Get tarball into the runtime if isinstance(tarball_source, bytes): - logger.info("Uploading tarball to sandbox", extra=log_extra()) + logger.info("Uploading tarball", extra=log_extra()) await _upload( - client, agent_url, session_key, tarball_source, TARBALL_PATH + client, + conn.agent_url, + conn.session_key, + tarball_source, + TARBALL_PATH, ) else: - logger.info( - "Downloading tarball in sandbox from URL", extra=log_extra() - ) + logger.info("Downloading tarball in runtime from URL", extra=log_extra()) await _download_in_sandbox( - client, agent_url, session_key, tarball_source, TARBALL_PATH + client, + conn.agent_url, + conn.session_key, + tarball_source, + TARBALL_PATH, ) exports = "" @@ -402,7 +471,7 @@ def log_extra() -> dict[str, Any]: logger.info("Starting entrypoint: %s", entrypoint, extra=log_extra()) command_id = await _start_bash( - client, agent_url, session_key, cmd, timeout=timeout + client, conn.agent_url, conn.session_key, cmd, timeout=timeout ) logger.info( "Entrypoint started (command_id=%s), disconnecting", @@ -413,7 +482,6 @@ def log_extra() -> dict[str, Any]: return DispatchResult(success=True, sandbox_id=sandbox_id) except PermanentDispatchError: - # Clean up sandbox before re-raising so dispatcher can disable automation if sandbox_id: try: await delete_sandbox(client, api_url, api_key, sandbox_id) @@ -422,7 +490,6 @@ def log_extra() -> dict[str, Any]: raise except Exception as e: logger.exception("Automation dispatch failed", extra=log_extra()) - # Delete sandbox on dispatch failure to avoid orphaned sandboxes if sandbox_id: await delete_sandbox(client, api_url, api_key, sandbox_id) return DispatchResult(success=False, sandbox_id=sandbox_id, error=str(e)) @@ -450,28 +517,21 @@ async def run_automation( callback_url: str | None = None, run_id: str | None = None, keep_sandbox: bool = False, + *, + agent_server_url: str | None = None, + agent_server_api_key: str | None = None, ) -> AutomationResult: - """Execute an automation end-to-end in a fresh sandbox (blocking). + """Execute an automation end-to-end (blocking). Use this for testing or when you need to wait for the result immediately. For production async execution, use dispatch_automation() instead. - 1. Create sandbox and wait until RUNNING. - 2. Get tarball into sandbox (upload bytes OR download from URL). - 3. Extract it, run ``setup.sh`` (if present), then run *entrypoint*. - 4. Wait for completion and return the result. - 5. Delete the sandbox (unless *keep_sandbox* is True). - - *tarball_source*: Either raw bytes (uploaded to sandbox) or a URL string - (downloaded directly inside sandbox via curl). URLs avoid downloading - untrusted/large files on the automation service. - - *env_vars* are exported before the entrypoint runs. The sandbox - identity env vars (``SANDBOX_ID``, ``SESSION_API_KEY``) are - **always** injected so the SDK's ``local_agent_server_mode`` works. - If *callback_url* / *run_id* are set they are injected as - ``AUTOMATION_CALLBACK_URL`` / ``AUTOMATION_RUN_ID`` so the SDK's - ``OpenHandsCloudWorkspace`` can POST completion status on exit. + Supports both Cloud sandbox mode and agent-server mode — see + ``dispatch_automation()`` for details on the two modes. + + In Cloud mode the sandbox is deleted after execution unless + *keep_sandbox* is True. In agent-server mode there is nothing to + clean up. """ env_vars = dict(env_vars) if env_vars else {} if callback_url: @@ -479,48 +539,60 @@ async def run_automation( if run_id: env_vars["AUTOMATION_RUN_ID"] = run_id api_url = api_url.rstrip("/") + + is_agent_server_mode = agent_server_url is not None sandbox_id: str | None = None - # Helper for consistent structured logging with run_id/sandbox_id def log_extra() -> dict[str, Any]: return _log_extra(run_id=run_id, sandbox_id=sandbox_id) logger.info("Starting automation execution", extra=log_extra()) async with httpx.AsyncClient(timeout=60.0) as client: + # --- Obtain agent connection --- try: - sandbox_id, session_key, agent_url = await _create_and_wait( - client, api_url, api_key - ) + if is_agent_server_mode: + conn = _connect_agent_server(agent_server_url, agent_server_api_key) + else: + conn = await _connect_cloud_sandbox(client, api_url, api_key) + sandbox_id = conn.sandbox_id logger.info( - "Sandbox ready: %s at %s", sandbox_id, agent_url, extra=log_extra() + "Agent-server ready at %s (sandbox_id=%s)", + conn.agent_url, + sandbox_id, + extra=log_extra(), ) except Exception as e: - # If sandbox creation started but failed to reach RUNNING, - # still attempt cleanup. - logger.exception("Sandbox creation failed", extra=log_extra()) + logger.exception("Connection setup failed", extra=log_extra()) if sandbox_id: await delete_sandbox(client, api_url, api_key, sandbox_id) return AutomationResult(success=False, sandbox_id=sandbox_id, error=str(e)) + # --- Execute (shared logic) --- try: - # Always inject sandbox identity so the SDK can call - # get_llm() / get_secrets() inside the sandbox. - env_vars.setdefault("SANDBOX_ID", sandbox_id) - env_vars.setdefault("SESSION_API_KEY", session_key) + if is_agent_server_mode: + env_vars.setdefault("AGENT_SERVER_URL", conn.agent_url) + else: + env_vars.setdefault("SANDBOX_ID", conn.sandbox_id or "") + env_vars.setdefault("SESSION_API_KEY", conn.session_key) - # Get tarball into sandbox: upload bytes or download from URL if isinstance(tarball_source, bytes): - logger.info("Uploading tarball to sandbox", extra=log_extra()) + logger.info("Uploading tarball", extra=log_extra()) await _upload( - client, agent_url, session_key, tarball_source, TARBALL_PATH + client, + conn.agent_url, + conn.session_key, + tarball_source, + TARBALL_PATH, ) else: - logger.info( - "Downloading tarball in sandbox from URL", extra=log_extra() - ) + logger.info("Downloading tarball in runtime from URL", extra=log_extra()) await _download_in_sandbox( - client, agent_url, session_key, tarball_source, TARBALL_PATH + client, + conn.agent_url, + conn.session_key, + tarball_source, + TARBALL_PATH, ) exports = "" @@ -538,13 +610,12 @@ def log_extra() -> dict[str, Any]: logger.info("Executing entrypoint: %s", entrypoint, extra=log_extra()) exit_code, stdout, stderr = await _bash( - client, agent_url, session_key, cmd, timeout=timeout + client, conn.agent_url, conn.session_key, cmd, timeout=timeout ) success = exit_code == 0 error_msg = None if not success: - # Include both stderr and stdout tail - some errors go to stdout error_parts = [f"exit_code={exit_code}"] if stderr: error_parts.append(f"stderr: {stderr[-1000:]}") @@ -570,7 +641,8 @@ def log_extra() -> dict[str, Any]: logger.exception("Automation execution failed", extra=log_extra()) return AutomationResult(success=False, sandbox_id=sandbox_id, error=str(e)) finally: - if not keep_sandbox: + # Only clean up in Cloud sandbox mode + if sandbox_id and not keep_sandbox: logger.info("Deleting sandbox", extra=log_extra()) await delete_sandbox(client, api_url, api_key, sandbox_id) diff --git a/automation/utils/sandbox.py b/automation/utils/sandbox.py index 78ba737..73a37fc 100644 --- a/automation/utils/sandbox.py +++ b/automation/utils/sandbox.py @@ -198,39 +198,52 @@ async def verify_run_status( sandbox_id: str, keep_alive: bool = False, run_id: str | None = None, + agent_server_url: str | None = None, + agent_server_api_key: str | None = None, ) -> VerificationResult: - """Verify an automation run's status by querying its sandbox. + """Verify an automation run's status by querying its runtime. - Connects to the sandbox, queries the last bash command's exit code, + **Cloud sandbox mode** (``agent_server_url`` is ``None``): + Discovers the agent-server inside the sandbox, queries bash history, and optionally deletes the sandbox. + **Agent-server mode** (``agent_server_url`` is set): + Queries the configured agent-server directly. No sandbox cleanup. + Args: api_url: OpenHands API URL api_key: API key for authentication - sandbox_id: The sandbox to query + sandbox_id: The sandbox to query (Cloud mode) keep_alive: If True, don't delete the sandbox after verification run_id: Optional run ID for logging + agent_server_url: Direct agent-server URL (agent-server mode) + agent_server_api_key: API key for the agent-server Returns: VerificationResult with the verification outcome """ api_url = api_url.rstrip("/") extra = _log_extra(run_id=run_id, sandbox_id=sandbox_id) + is_agent_server_mode = agent_server_url is not None async with httpx.AsyncClient(timeout=60.0) as client: - # Get sandbox agent URL - result = await get_sandbox_agent_url(client, api_url, api_key, sandbox_id) - if result is None: - logger.info("Sandbox not available for verification", extra=extra) - return VerificationResult( - verified=False, - error="Sandbox not available", - ) - - agent_url, session_key = result - logger.info("Connected to sandbox for verification", extra=extra) - - # Get last bash command result + # --- Resolve agent-server connection --- + if is_agent_server_mode: + agent_url = agent_server_url.rstrip("/") + session_key = agent_server_api_key or "" + logger.info("Using configured agent-server for verification", extra=extra) + else: + result = await get_sandbox_agent_url(client, api_url, api_key, sandbox_id) + if result is None: + logger.info("Sandbox not available for verification", extra=extra) + return VerificationResult( + verified=False, + error="Sandbox not available", + ) + agent_url, session_key = result + logger.info("Connected to sandbox for verification", extra=extra) + + # --- Query bash history (shared) --- bash_result = await get_last_bash_command_result(client, agent_url, session_key) if not bash_result.found: @@ -239,8 +252,7 @@ async def verify_run_status( bash_result.error, extra=extra, ) - # Still try to clean up if needed - if not keep_alive: + if not is_agent_server_mode and not keep_alive: await delete_sandbox(client, api_url, api_key, sandbox_id) return VerificationResult( verified=False, @@ -262,8 +274,8 @@ async def verify_run_status( extra=extra, ) - # Clean up sandbox if not keeping alive - if not keep_alive: + # Only clean up in Cloud sandbox mode + if not is_agent_server_mode and not keep_alive: logger.info("Deleting sandbox", extra=extra) await delete_sandbox(client, api_url, api_key, sandbox_id) diff --git a/automation/watchdog.py b/automation/watchdog.py index a45596e..6d3e100 100644 --- a/automation/watchdog.py +++ b/automation/watchdog.py @@ -96,13 +96,15 @@ async def _verify_and_mark_run( result = await session.execute(stmt) # type: ignore[assignment] return result.rowcount > 0 - # Try to verify via sandbox + # Try to verify via sandbox / agent-server verification = await verify_run_status( api_url=settings.openhands_api_base_url, api_key=api_key, sandbox_id=sandbox_id, keep_alive=run.keep_alive, run_id=run_id, + agent_server_url=settings.agent_server_url, + agent_server_api_key=settings.agent_server_api_key, ) if verification.verified: @@ -190,8 +192,8 @@ async def _verify_and_mark_run( extra=extra, ) - # Clean up sandbox if not keep_alive (best effort, may already be gone) - if not run.keep_alive and sandbox_id: + # Clean up sandbox if not keep_alive (Cloud mode only; no-op in agent-server mode) + if not settings.is_agent_server_mode and not run.keep_alive and sandbox_id: await cleanup_sandbox( api_url=settings.openhands_api_base_url, api_key=api_key,