diff --git a/automation/config.py b/automation/config.py index 88b3e3e..5fbcc32 100644 --- a/automation/config.py +++ b/automation/config.py @@ -35,6 +35,11 @@ class Settings(BaseSettings): # Watchdog (scans for stale RUNNING runs past their timeout) watchdog_interval_seconds: int = 60 + # Sandbox cleanup delay after automation runs complete (in minutes). + # Sandboxes are kept available for inspection within this window. + # Set to 0 for immediate cleanup (legacy behavior). + sandbox_cleanup_delay_mins: int = 60 + # Service key for authenticating with the SaaS API to fetch per-user # API keys (called by the dispatcher before each automation run). service_key: str = "" diff --git a/automation/models.py b/automation/models.py index 0d1694d..af50b4c 100644 --- a/automation/models.py +++ b/automation/models.py @@ -157,6 +157,13 @@ class AutomationRun(Base): DateTime(timezone=True), nullable=True ) + # Scheduled time for sandbox cleanup. Set when run transitions to terminal + # state (COMPLETED/FAILED). NULL means immediate cleanup or already cleaned. + # Used by the cleanup scanner to delay sandbox deletion for debugging. + cleanup_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True, index=True + ) + # Relationship back to automation automation: Mapped["Automation"] = relationship("Automation", back_populates="runs") diff --git a/automation/router.py b/automation/router.py index 983a1b1..8c33aec 100644 --- a/automation/router.py +++ b/automation/router.py @@ -3,6 +3,7 @@ import asyncio import logging import uuid +from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import func, select, update @@ -229,8 +230,12 @@ async def complete_run( ``authenticate_request``) and the resulting user must own the run's parent automation. - If keep_alive is False, deletes the sandbox after updating the run status. + Sandbox cleanup behavior depends on the ``sandbox_cleanup_delay_mins`` setting: + - If 0: immediate cleanup (legacy behavior) + - If > 0: sets ``cleanup_at`` timestamp for delayed cleanup by watchdog """ + from automation.config import get_settings + result = await session.execute( select(AutomationRun) .where(AutomationRun.id == run_id) @@ -262,6 +267,13 @@ async def complete_run( if body.status == "FAILED" and body.error: values["error_detail"] = body.error + # Schedule sandbox cleanup if not keeping alive + settings = get_settings() + if not run.keep_alive and run.sandbox_id: + delay_mins = settings.sandbox_cleanup_delay_mins + if delay_mins > 0: + values["cleanup_at"] = now + timedelta(minutes=delay_mins) + stmt = ( update(AutomationRun) .where( @@ -281,12 +293,15 @@ async def complete_run( await session.refresh(run) logger.info("Run %s → %s", run_id, new_status.value) - # Clean up sandbox if not keeping alive - if not run.keep_alive and run.sandbox_id: - # Fire-and-forget sandbox deletion in background - from automation.config import get_settings - - settings = get_settings() + # If delay is 0, clean up sandbox immediately (legacy behavior) + should_cleanup_now = ( + not run.keep_alive + and run.sandbox_id + and settings.sandbox_cleanup_delay_mins == 0 + ) + if should_cleanup_now: + # sandbox_id is guaranteed non-None by the condition above + assert run.sandbox_id is not None asyncio.create_task( cleanup_sandbox( api_url=settings.openhands_api_base_url, diff --git a/automation/schemas.py b/automation/schemas.py index 4933cca..fa9f8be 100644 --- a/automation/schemas.py +++ b/automation/schemas.py @@ -232,6 +232,7 @@ class AutomationRunResponse(BaseModel): created_at: datetime started_at: datetime | None completed_at: datetime | None + cleanup_at: datetime | None # When sandbox cleanup is scheduled model_config = {"from_attributes": True} diff --git a/automation/watchdog.py b/automation/watchdog.py index a45596e..7999742 100644 --- a/automation/watchdog.py +++ b/automation/watchdog.py @@ -6,10 +6,16 @@ The ``timeout_at`` column is set to ``started_at + max_duration`` when the dispatcher transitions a run to RUNNING (see ``mark_run_status``). + +Also handles delayed sandbox cleanup. When runs complete (via callback or +verification), a ``cleanup_at`` timestamp is set based on the configured +``sandbox_cleanup_delay_mins``. The cleanup scanner processes runs past +their cleanup deadline. """ import asyncio import logging +from datetime import timedelta from typing import Any from sqlalchemy import select, update @@ -40,6 +46,20 @@ def _run_extra( return extra +def _compute_cleanup_at(settings: Settings, now=None): + """Compute cleanup_at timestamp based on settings. + + Returns None if delay is 0 (immediate cleanup), otherwise returns + now + delay. + """ + if now is None: + now = utcnow() + delay_mins = settings.sandbox_cleanup_delay_mins + if delay_mins <= 0: + return None + return now + timedelta(minutes=delay_mins) + + async def _verify_and_mark_run( session: AsyncSession, run: AutomationRun, @@ -51,6 +71,10 @@ async def _verify_and_mark_run( If verification succeeds, marks the run based on the actual result. If verification fails (sandbox unavailable), marks as FAILED with timeout error. + Sandbox cleanup is handled via the cleanup_at timestamp. If + sandbox_cleanup_delay_mins > 0, cleanup is delayed. If 0, immediate + cleanup is performed. + Returns True if the run was marked with a terminal status. """ run_id = str(run.id) @@ -58,7 +82,7 @@ async def _verify_and_mark_run( extra = _run_extra(run_id=run_id, sandbox_id=sandbox_id) now = utcnow() - # If no sandbox_id, we can't verify - mark as failed + # If no sandbox_id, we can't verify - mark as failed (no cleanup needed) if not sandbox_id: logger.warning("No sandbox_id for stale run, marking FAILED", extra=extra) stmt = ( @@ -81,33 +105,53 @@ async def _verify_and_mark_run( api_key = await get_api_key_for_automation_run(run) except Exception as e: logger.warning("Failed to get API key for verification: %s", e, extra=extra) + # Can't cleanup without API key, just mark as failed + # Schedule cleanup if delay is configured - but without API key, cleanup + # will fail anyway; the sandbox may need manual cleanup or will be cleaned + # up by other means (e.g., sandbox TTL) + cleanup_at = _compute_cleanup_at(settings, now) + values: dict = { + "status": AutomationRunStatus.FAILED, + "completed_at": now, + "error_detail": f"Timed out: could not get API key for verification: {e}", + } + if cleanup_at and not run.keep_alive: + values["cleanup_at"] = cleanup_at stmt = ( update(AutomationRun) .where( AutomationRun.id == run.id, AutomationRun.status == AutomationRunStatus.RUNNING, ) - .values( - status=AutomationRunStatus.FAILED, - completed_at=now, - error_detail=f"Timed out: could not get API key for verification: {e}", - ) + .values(**values) ) result = await session.execute(stmt) # type: ignore[assignment] + # Note: Can't cleanup sandbox without API key return result.rowcount > 0 - # Try to verify via sandbox + # Try to verify via sandbox - pass keep_alive=True to prevent deletion + # (cleanup is handled by the cleanup scanner) verification = await verify_run_status( api_url=settings.openhands_api_base_url, api_key=api_key, sandbox_id=sandbox_id, - keep_alive=run.keep_alive, + keep_alive=True, # Always prevent immediate deletion, use cleanup_at instead run_id=run_id, ) + # Compute cleanup_at for terminal states + cleanup_at = _compute_cleanup_at(settings, now) + if verification.verified: exit_code = verification.exit_code + # Build base values for update + base_values: dict = { + "completed_at": now, + } + if cleanup_at and not run.keep_alive: + base_values["cleanup_at"] = cleanup_at + # exit_code == 0: Command completed successfully, we just missed the callback if exit_code == 0: logger.info( @@ -124,7 +168,7 @@ async def _verify_and_mark_run( ) .values( status=AutomationRunStatus.COMPLETED, - completed_at=now, + **base_values, ) ) @@ -147,8 +191,8 @@ async def _verify_and_mark_run( ) .values( status=AutomationRunStatus.FAILED, - completed_at=now, error_detail=f"Timed out: {error_msg}", + **base_values, ) ) @@ -174,12 +218,22 @@ async def _verify_and_mark_run( ) .values( status=AutomationRunStatus.FAILED, - completed_at=now, error_detail=error_detail, + **base_values, ) ) result = await session.execute(stmt) # type: ignore[assignment] + + # Immediate cleanup if delay is 0 + if not cleanup_at and not run.keep_alive: + await cleanup_sandbox( + api_url=settings.openhands_api_base_url, + api_key=api_key, + sandbox_id=sandbox_id, + run_id=run_id, + ) + return result.rowcount > 0 # Verification failed - sandbox not available or command still running @@ -190,15 +244,6 @@ async def _verify_and_mark_run( extra=extra, ) - # Clean up sandbox if not keep_alive (best effort, may already be gone) - if not run.keep_alive and sandbox_id: - await cleanup_sandbox( - api_url=settings.openhands_api_base_url, - api_key=api_key, - sandbox_id=sandbox_id, - run_id=run_id, - ) - error_msg = verification.error or "no completion callback received" logger.warning( @@ -210,19 +255,34 @@ async def _verify_and_mark_run( extra=extra, ) + # Build values for failed state + values = { + "status": AutomationRunStatus.FAILED, + "completed_at": now, + "error_detail": f"Timed out: {error_msg}", + } + if cleanup_at and not run.keep_alive: + values["cleanup_at"] = cleanup_at + stmt = ( update(AutomationRun) .where( AutomationRun.id == run.id, AutomationRun.status == AutomationRunStatus.RUNNING, ) - .values( - status=AutomationRunStatus.FAILED, - completed_at=now, - error_detail=f"Timed out: {error_msg}", - ) + .values(**values) ) result = await session.execute(stmt) # type: ignore[assignment] + + # Immediate cleanup if delay is 0 (best effort, sandbox may already be gone) + if not cleanup_at and not run.keep_alive: + await cleanup_sandbox( + api_url=settings.openhands_api_base_url, + api_key=api_key, + sandbox_id=sandbox_id, + run_id=run_id, + ) + return result.rowcount > 0 @@ -279,12 +339,101 @@ async def mark_stale_runs( return marked +async def cleanup_pending_sandboxes( + session_factory: async_sessionmaker[AsyncSession], + settings: Settings, +) -> int: + """Clean up sandboxes for completed runs that are past their cleanup_at deadline. + + Finds runs where: + - cleanup_at < now (cleanup deadline has passed) + - sandbox_id is not NULL (sandbox hasn't been cleaned up yet) + - keep_alive is False (not marked for preservation) + + After deleting the sandbox, clears the sandbox_id to prevent duplicate + cleanup attempts. + + Returns the number of sandboxes cleaned up. + """ + now = utcnow() + cleaned = 0 + + async with session_factory() as session: + # Fetch runs ready for cleanup + result = await session.execute( + select(AutomationRun) + .options(selectinload(AutomationRun.automation)) + .where( + AutomationRun.cleanup_at.isnot(None), + AutomationRun.cleanup_at < now, + AutomationRun.sandbox_id.isnot(None), + AutomationRun.keep_alive == False, # noqa: E712 + ) + ) + runs_to_cleanup = result.scalars().all() + + for run in runs_to_cleanup: + run_id = str(run.id) + sandbox_id = run.sandbox_id + # sandbox_id is guaranteed non-None by the query filter above + assert sandbox_id is not None + extra = _run_extra(run_id=run_id, sandbox_id=sandbox_id) + + logger.info( + "Cleaning up sandbox (cleanup_at=%s, now=%s)", + run.cleanup_at, + now, + extra=extra, + ) + + try: + # Get API key for sandbox deletion + api_key = await get_api_key_for_automation_run(run) + + # Delete the sandbox + deleted = await cleanup_sandbox( + api_url=settings.openhands_api_base_url, + api_key=api_key, + sandbox_id=sandbox_id, + run_id=run_id, + ) + + # Clear sandbox_id and cleanup_at to prevent duplicate cleanup + stmt = ( + update(AutomationRun) + .where(AutomationRun.id == run.id) + .values(sandbox_id=None, cleanup_at=None) + ) + await session.execute(stmt) + cleaned += 1 + + if deleted: + logger.info("Sandbox cleaned up successfully", extra=extra) + else: + logger.warning( + "Sandbox cleanup returned False (may already be gone)", + extra=extra, + ) + + except Exception: + logger.exception("Error cleaning up sandbox", extra=extra) + + if cleaned: + await session.commit() + + return cleaned + + async def watchdog_loop( session_factory: async_sessionmaker[AsyncSession], settings: Settings, shutdown_event: asyncio.Event | None = None, ) -> None: - """Main watchdog loop — scans for stale runs periodically. + """Main watchdog loop — scans for stale runs and cleanup periodically. + + Performs two tasks each interval: + 1. Marks stale RUNNING runs as FAILED (after verification) + 2. Cleans up sandboxes past their cleanup_at deadline Args: session_factory: Async session maker for database access. @@ -294,8 +443,9 @@ async def watchdog_loop( interval = settings.watchdog_interval_seconds logger.info( - "Watchdog started, scanning every %ds", + "Watchdog started, scanning every %ds (cleanup_delay=%d mins)", interval, + settings.sandbox_cleanup_delay_mins, ) while True: @@ -304,9 +454,15 @@ async def watchdog_loop( break try: + # Mark stale runs marked = await mark_stale_runs(session_factory, settings) if marked: logger.info("Processed %d stale run(s)", marked) + + # Clean up sandboxes past their cleanup deadline + cleaned = await cleanup_pending_sandboxes(session_factory, settings) + if cleaned: + logger.info("Cleaned up %d sandbox(es)", cleaned) except Exception: logger.exception("Error in watchdog scan") diff --git a/migrations/versions/003_add_cleanup_at.py b/migrations/versions/003_add_cleanup_at.py new file mode 100644 index 0000000..424dae7 --- /dev/null +++ b/migrations/versions/003_add_cleanup_at.py @@ -0,0 +1,34 @@ +"""Add cleanup_at column for delayed sandbox cleanup. + +This column stores the scheduled time for sandbox cleanup after automation +runs complete. The watchdog cleanup scanner uses this to delay sandbox +deletion, allowing operators to inspect sandboxes for debugging. + +Revision ID: 003 +Revises: 002 +Create Date: 2026-04-01 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + + +revision: str = "003" +down_revision: str = "002" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "automation_runs", + sa.Column("cleanup_at", sa.DateTime(timezone=True), nullable=True), + ) + op.create_index("ix_automation_runs_cleanup_at", "automation_runs", ["cleanup_at"]) + + +def downgrade() -> None: + op.drop_index("ix_automation_runs_cleanup_at", table_name="automation_runs") + op.drop_column("automation_runs", "cleanup_at") diff --git a/tests/conftest.py b/tests/conftest.py index 71654e9..091d90a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -147,9 +147,28 @@ def sync_client(async_engine, async_session_factory): @pytest.fixture def mock_settings(): - """Return a mock Settings instance for dispatcher tests.""" + """Return a mock Settings instance for dispatcher tests. + + Uses sandbox_cleanup_delay_mins=0 for immediate cleanup behavior + (legacy behavior, compatible with existing tests). + """ + return Settings( + openhands_api_base_url="https://test.example.com", + service_key="test-service-key", + base_url="http://localhost:8000", + sandbox_cleanup_delay_mins=0, # Immediate cleanup for existing tests + ) + + +@pytest.fixture +def mock_settings_delayed_cleanup(): + """Return a mock Settings instance with delayed sandbox cleanup. + + Uses sandbox_cleanup_delay_mins=60 (1 hour delay). + """ return Settings( openhands_api_base_url="https://test.example.com", service_key="test-service-key", base_url="http://localhost:8000", + sandbox_cleanup_delay_mins=60, # 1 hour delay ) diff --git a/tests/test_config.py b/tests/test_config.py index 071a521..8023db4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -35,3 +35,22 @@ def test_root_path_default_base_url(self): """Default settings (no base_url) should return empty root_path.""" settings = Settings() assert settings.root_path == "" + + +class TestSandboxCleanupDelay: + """Tests for sandbox_cleanup_delay_mins configuration.""" + + def test_default_cleanup_delay(self): + """Default cleanup delay should be 60 minutes.""" + settings = Settings() + assert settings.sandbox_cleanup_delay_mins == 60 + + def test_custom_cleanup_delay(self): + """Should accept custom cleanup delay.""" + settings = Settings(sandbox_cleanup_delay_mins=120) + assert settings.sandbox_cleanup_delay_mins == 120 + + def test_zero_cleanup_delay_for_immediate_cleanup(self): + """Zero delay means immediate cleanup (legacy behavior).""" + settings = Settings(sandbox_cleanup_delay_mins=0) + assert settings.sandbox_cleanup_delay_mins == 0 diff --git a/tests/test_router.py b/tests/test_router.py index 70bf438..3e033e8 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -1045,3 +1045,192 @@ async def test_list_runs_default_limit_is_50(self, async_client, async_session): data = response.json() assert data["total"] == 60 assert len(data["runs"]) == 50 # Default limit + + +class TestCompleteRun: + """Tests for POST /v1/runs/{run_id}/complete endpoint.""" + + async def test_complete_run_sets_cleanup_at_when_delay_configured( + self, async_client, async_session + ): + """When sandbox_cleanup_delay_mins > 0, should set cleanup_at.""" + from datetime import timedelta + from unittest.mock import patch + + from automation.models import AutomationRun, AutomationRunStatus + + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Test Automation", + trigger={"type": "cron", "schedule": "0 9 * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run script.py", + ) + async_session.add(automation) + await async_session.commit() + + # Create a RUNNING run with a sandbox_id + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + sandbox_id="test-sandbox-123", + started_at=utcnow(), + timeout_at=utcnow() + timedelta(minutes=10), + keep_alive=False, + ) + async_session.add(run) + await async_session.commit() + + # Mock settings to have delay > 0 + from automation.config import Settings + + mock_settings = Settings( + openhands_api_base_url="https://test.example.com", + service_key="test-key", + base_url="http://localhost:8000", + sandbox_cleanup_delay_mins=60, # 1 hour delay + ) + + with patch("automation.config.get_settings", return_value=mock_settings): + response = await async_client.post( + f"/v1/runs/{run.id}/complete", + json={"status": "COMPLETED", "conversation_id": "conv-123"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "COMPLETED" + assert data["conversation_id"] == "conv-123" + + # Verify cleanup_at was set + await async_session.refresh(run) + assert run.cleanup_at is not None + assert run.completed_at is not None + # cleanup_at should be ~60 minutes after completed_at + delta = run.cleanup_at - run.completed_at + assert timedelta(minutes=59) < delta < timedelta(minutes=61) + + async def test_complete_run_immediate_cleanup_when_delay_zero( + self, async_client, async_session + ): + """When sandbox_cleanup_delay_mins = 0, should not set cleanup_at.""" + from datetime import timedelta + from unittest.mock import AsyncMock, patch + + from automation.models import AutomationRun, AutomationRunStatus + + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Test Automation", + trigger={"type": "cron", "schedule": "0 9 * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run script.py", + ) + async_session.add(automation) + await async_session.commit() + + # Create a RUNNING run with a sandbox_id + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + sandbox_id="test-sandbox-456", + started_at=utcnow(), + timeout_at=utcnow() + timedelta(minutes=10), + keep_alive=False, + ) + async_session.add(run) + await async_session.commit() + + # Mock settings to have delay = 0 + from automation.config import Settings + + mock_settings = Settings( + openhands_api_base_url="https://test.example.com", + service_key="test-key", + base_url="http://localhost:8000", + sandbox_cleanup_delay_mins=0, # Immediate cleanup + ) + + with ( + patch("automation.config.get_settings", return_value=mock_settings), + patch( + "automation.router.cleanup_sandbox", new_callable=AsyncMock + ) as mock_cleanup, + ): + response = await async_client.post( + f"/v1/runs/{run.id}/complete", + json={"status": "COMPLETED", "conversation_id": "conv-456"}, + ) + + assert response.status_code == 200 + + # Verify cleanup was called immediately + mock_cleanup.assert_called_once() + + # Verify cleanup_at was NOT set + await async_session.refresh(run) + assert run.cleanup_at is None + + async def test_complete_run_no_cleanup_when_keep_alive_true( + self, async_client, async_session + ): + """When keep_alive is True, should not set cleanup_at or cleanup.""" + from datetime import timedelta + from unittest.mock import AsyncMock, patch + + from automation.models import AutomationRun, AutomationRunStatus + + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Test Automation", + trigger={"type": "cron", "schedule": "0 9 * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run script.py", + ) + async_session.add(automation) + await async_session.commit() + + # Create a RUNNING run with keep_alive=True + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + sandbox_id="test-sandbox-keep-alive", + started_at=utcnow(), + timeout_at=utcnow() + timedelta(minutes=10), + keep_alive=True, # Should prevent cleanup + ) + async_session.add(run) + await async_session.commit() + + # Mock settings with delay > 0 + from automation.config import Settings + + mock_settings = Settings( + openhands_api_base_url="https://test.example.com", + service_key="test-key", + base_url="http://localhost:8000", + sandbox_cleanup_delay_mins=60, + ) + + with ( + patch("automation.config.get_settings", return_value=mock_settings), + patch( + "automation.router.cleanup_sandbox", new_callable=AsyncMock + ) as mock_cleanup, + ): + response = await async_client.post( + f"/v1/runs/{run.id}/complete", + json={"status": "COMPLETED"}, + ) + + assert response.status_code == 200 + + # Verify cleanup was NOT called + mock_cleanup.assert_not_called() + + # Verify cleanup_at was NOT set + await async_session.refresh(run) + assert run.cleanup_at is None diff --git a/tests/test_watchdog.py b/tests/test_watchdog.py index cfed531..587c23b 100644 --- a/tests/test_watchdog.py +++ b/tests/test_watchdog.py @@ -2,6 +2,8 @@ The watchdog processes stale runs (RUNNING but past timeout_at) and marks them with appropriate status based on sandbox verification results. + +Also tests delayed sandbox cleanup functionality. """ import uuid @@ -13,7 +15,11 @@ from automation.models import Automation, AutomationRun, AutomationRunStatus from automation.utils import utcnow from automation.utils.sandbox import VerificationResult -from automation.watchdog import _verify_and_mark_run +from automation.watchdog import ( + _compute_cleanup_at, + _verify_and_mark_run, + cleanup_pending_sandboxes, +) # Test UUIDs @@ -371,3 +377,331 @@ async def test_verification_failed_no_cleanup_if_keep_alive( assert result is True # Cleanup should NOT be called when keep_alive is True mock_cleanup.assert_not_called() + + +class TestComputeCleanupAt: + """Tests for _compute_cleanup_at helper function.""" + + def test_returns_none_when_delay_is_zero(self, mock_settings): + """When delay is 0, should return None (immediate cleanup).""" + assert mock_settings.sandbox_cleanup_delay_mins == 0 + result = _compute_cleanup_at(mock_settings) + assert result is None + + def test_returns_future_timestamp_when_delay_positive( + self, mock_settings_delayed_cleanup + ): + """When delay > 0, should return now + delay.""" + assert mock_settings_delayed_cleanup.sandbox_cleanup_delay_mins == 60 + now = utcnow() + result = _compute_cleanup_at(mock_settings_delayed_cleanup, now=now) + expected = now + timedelta(minutes=60) + assert result == expected + + def test_uses_current_time_when_now_not_provided( + self, mock_settings_delayed_cleanup + ): + """Should use current time when now parameter is not provided.""" + before = utcnow() + result = _compute_cleanup_at(mock_settings_delayed_cleanup) + after = utcnow() + + # Result should be set (delay > 0) and roughly 60 minutes in the future + assert result is not None + expected_min = before + timedelta(minutes=60) + expected_max = after + timedelta(minutes=60) + assert expected_min <= result <= expected_max + + +class TestDelayedCleanup: + """Tests for delayed sandbox cleanup behavior.""" + + @pytest.mark.asyncio + async def test_delayed_cleanup_sets_cleanup_at( + self, async_session_factory, mock_settings_delayed_cleanup + ): + """When delay > 0, should set cleanup_at instead of immediate cleanup.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Delayed Cleanup Automation", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + now = utcnow() + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.RUNNING, + sandbox_id="test-sandbox-delayed", + started_at=now - timedelta(minutes=5), + timeout_at=now - timedelta(minutes=1), + keep_alive=False, + ) + session.add(run) + await session.commit() + run_id = run.id + + verification = VerificationResult( + verified=True, + success=True, + exit_code=0, + stdout="Success", + stderr="", + ) + + with ( + patch( + "automation.watchdog.verify_run_status", + new_callable=AsyncMock, + return_value=verification, + ), + patch( + "automation.watchdog.get_api_key_for_automation_run", + new_callable=AsyncMock, + return_value="test-api-key", + ), + patch( + "automation.watchdog.cleanup_sandbox", + new_callable=AsyncMock, + ) as mock_cleanup, + ): + async with async_session_factory() as session: + run = await session.get(AutomationRun, run_id) + result = await _verify_and_mark_run( + session, run, mock_settings_delayed_cleanup + ) + await session.commit() + + assert result is True + # Cleanup should NOT be called immediately when delay > 0 + mock_cleanup.assert_not_called() + + # Verify cleanup_at was set + async with async_session_factory() as session: + run = await session.get(AutomationRun, run_id) + assert run.status == AutomationRunStatus.COMPLETED + assert run.cleanup_at is not None + # cleanup_at should be ~60 minutes after completed_at + delta = run.cleanup_at - run.completed_at + assert timedelta(minutes=59) < delta < timedelta(minutes=61) + + +class TestCleanupPendingSandboxes: + """Tests for the cleanup_pending_sandboxes function.""" + + @pytest.mark.asyncio + async def test_cleans_up_runs_past_cleanup_at( + self, async_session_factory, mock_settings + ): + """Should clean up runs where cleanup_at has passed.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Cleanup Test Automation", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + now = utcnow() + # Run with cleanup_at in the past + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.COMPLETED, + sandbox_id="test-sandbox-to-cleanup", + started_at=now - timedelta(hours=2), + completed_at=now - timedelta(hours=1), + cleanup_at=now - timedelta(minutes=1), # Past deadline + keep_alive=False, + ) + session.add(run) + await session.commit() + run_id = run.id + + with ( + patch( + "automation.watchdog.get_api_key_for_automation_run", + new_callable=AsyncMock, + return_value="test-api-key", + ), + patch( + "automation.watchdog.cleanup_sandbox", + new_callable=AsyncMock, + return_value=True, + ) as mock_cleanup, + ): + cleaned = await cleanup_pending_sandboxes( + async_session_factory, mock_settings + ) + + assert cleaned == 1 + mock_cleanup.assert_called_once() + + # Verify sandbox_id and cleanup_at were cleared + async with async_session_factory() as session: + run = await session.get(AutomationRun, run_id) + assert run.sandbox_id is None + assert run.cleanup_at is None + + @pytest.mark.asyncio + async def test_skips_runs_not_past_cleanup_at( + self, async_session_factory, mock_settings + ): + """Should not clean up runs where cleanup_at is in the future.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Future Cleanup Automation", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + now = utcnow() + # Run with cleanup_at in the future + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.COMPLETED, + sandbox_id="test-sandbox-not-yet", + started_at=now - timedelta(hours=1), + completed_at=now - timedelta(minutes=30), + cleanup_at=now + timedelta(minutes=30), # Future deadline + keep_alive=False, + ) + session.add(run) + await session.commit() + run_id = run.id + + with ( + patch( + "automation.watchdog.get_api_key_for_automation_run", + new_callable=AsyncMock, + return_value="test-api-key", + ), + patch( + "automation.watchdog.cleanup_sandbox", + new_callable=AsyncMock, + ) as mock_cleanup, + ): + cleaned = await cleanup_pending_sandboxes( + async_session_factory, mock_settings + ) + + assert cleaned == 0 + mock_cleanup.assert_not_called() + + # sandbox_id should still be set + async with async_session_factory() as session: + run = await session.get(AutomationRun, run_id) + assert run.sandbox_id == "test-sandbox-not-yet" + assert run.cleanup_at is not None + + @pytest.mark.asyncio + async def test_skips_runs_with_keep_alive( + self, async_session_factory, mock_settings + ): + """Should not clean up runs where keep_alive is True.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="Keep Alive Cleanup Automation", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + now = utcnow() + # Run with cleanup_at in the past but keep_alive=True + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.COMPLETED, + sandbox_id="test-sandbox-keep-alive", + started_at=now - timedelta(hours=2), + completed_at=now - timedelta(hours=1), + cleanup_at=now - timedelta(minutes=1), # Past deadline + keep_alive=True, # Should prevent cleanup + ) + session.add(run) + await session.commit() + run_id = run.id + + with ( + patch( + "automation.watchdog.cleanup_sandbox", + new_callable=AsyncMock, + ) as mock_cleanup, + ): + cleaned = await cleanup_pending_sandboxes( + async_session_factory, mock_settings + ) + + assert cleaned == 0 + mock_cleanup.assert_not_called() + + # sandbox_id should still be set + async with async_session_factory() as session: + run = await session.get(AutomationRun, run_id) + assert run.sandbox_id == "test-sandbox-keep-alive" + + @pytest.mark.asyncio + async def test_skips_runs_without_sandbox_id( + self, async_session_factory, mock_settings + ): + """Should not process runs where sandbox_id is already cleared.""" + async with async_session_factory() as session: + automation = Automation( + user_id=TEST_USER_ID, + org_id=TEST_ORG_ID, + name="No Sandbox Automation", + trigger={"type": "cron", "schedule": "* * * * *", "timezone": "UTC"}, + tarball_path="s3://bucket/code.tar.gz", + entrypoint="uv run main.py", + enabled=True, + ) + session.add(automation) + await session.commit() + + now = utcnow() + # Run with cleanup_at in the past but no sandbox_id + run = AutomationRun( + automation_id=automation.id, + status=AutomationRunStatus.COMPLETED, + sandbox_id=None, # Already cleaned up + started_at=now - timedelta(hours=2), + completed_at=now - timedelta(hours=1), + cleanup_at=now - timedelta(minutes=1), + keep_alive=False, + ) + session.add(run) + await session.commit() + + with ( + patch( + "automation.watchdog.cleanup_sandbox", + new_callable=AsyncMock, + ) as mock_cleanup, + ): + cleaned = await cleanup_pending_sandboxes( + async_session_factory, mock_settings + ) + + assert cleaned == 0 + mock_cleanup.assert_not_called()