diff --git a/openhands-agent-server/openhands/agent_server/api.py b/openhands-agent-server/openhands/agent_server/api.py index 070080ed32..2040ae84c0 100644 --- a/openhands-agent-server/openhands/agent_server/api.py +++ b/openhands-agent-server/openhands/agent_server/api.py @@ -3,7 +3,7 @@ import tempfile import traceback from collections.abc import AsyncIterator, Sequence -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from pathlib import Path from typing import Any from urllib.parse import urlparse @@ -17,6 +17,7 @@ from openhands.agent_server.auth_router import auth_router from openhands.agent_server.bash_router import bash_router +from openhands.agent_server.bash_service import get_default_bash_event_service from openhands.agent_server.cloud_proxy_router import cloud_proxy_router from openhands.agent_server.config import ( Config, @@ -188,9 +189,28 @@ async def start_tool_preload_service(): async with service: # Store the initialized service in app state for dependency injection api.state.conversation_service = service + + config = api.state.config + retention_task: asyncio.Task | None = None + if config.bash_events_retention_seconds is not None: + retention_task = asyncio.create_task( + get_default_bash_event_service().run_retention_cleanup_loop( + config.bash_events_retention_seconds + ) + ) + logger.info( + "Bash events retention cleanup started (retention: %ds)", + config.bash_events_retention_seconds, + ) + try: yield finally: + if retention_task is not None: + retention_task.cancel() + with suppress(asyncio.CancelledError): + await retention_task + # Define async functions for stopping each service async def stop_vscode_service(): if vscode_service is not None: diff --git a/openhands-agent-server/openhands/agent_server/bash_service.py b/openhands-agent-server/openhands/agent_server/bash_service.py index 4df7439012..5c10f64d00 100644 --- a/openhands-agent-server/openhands/agent_server/bash_service.py +++ b/openhands-agent-server/openhands/agent_server/bash_service.py @@ -4,7 +4,7 @@ import os import signal from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from uuid import UUID @@ -18,7 +18,7 @@ ) from openhands.agent_server.pub_sub import PubSub, Subscriber from openhands.sdk.logger import get_logger -from openhands.sdk.utils import sanitized_env +from openhands.sdk.utils import sanitized_env, utc_now logger = get_logger(__name__) @@ -344,6 +344,76 @@ async def read_stream(stream, is_stderr=False): self._save_event_to_file(error_output) await self._pub_sub(error_output) + def delete_events_older_than(self, cutoff: datetime) -> int: + """Delete bash event files with a recorded timestamp older than ``cutoff``. + + This is a synchronous method — all operations are blocking filesystem + I/O. Callers on the asyncio event loop should use + ``await asyncio.to_thread(service.delete_events_older_than, cutoff)`` + to avoid stalling the loop. + + File names are prefixed with ``YYYYMMDDHHMMSS`` in ascending sort order, + so scanning stops as soon as a file at or after the cutoff is reached. + + Returns: + int: The number of event files deleted. + """ + cutoff_str = self._timestamp_to_str(cutoff) + files = self._get_event_files_by_pattern("*") # ascending chronological order + count = 0 + for path in files: + if path.name >= cutoff_str: + break # remaining files are at or newer than cutoff + try: + path.unlink(missing_ok=True) + count += 1 + except Exception as e: + logger.warning("Failed to delete bash event file %s: %s", path, e) + if count: + logger.info( + "Deleted %d bash event file(s) older than %s", count, cutoff_str + ) + return count + + async def run_retention_cleanup_loop( + self, + retention_seconds: int, + interval_seconds: float | None = None, + ) -> None: + """Periodically purge bash event files older than ``retention_seconds``. + + Runs until cancelled (e.g. during application shutdown). Cleanup runs + immediately on entry so that files accumulated across a server restart + are purged without waiting for the first interval to elapse. + + Blocking filesystem work is dispatched to a thread via + ``asyncio.to_thread`` to keep the event loop free. + + Args: + retention_seconds: Age threshold in seconds; older files are deleted. + interval_seconds: How often to run the cleanup. Defaults to + ``max(60, retention_seconds / 2)``. Pass a smaller value in + tests to avoid long waits. + """ + interval = ( + interval_seconds + if interval_seconds is not None + else max(60.0, retention_seconds / 2) + ) + while True: + try: + cutoff = utc_now() - timedelta(seconds=retention_seconds) + await asyncio.to_thread(self.delete_events_older_than, cutoff) + except Exception as e: + logger.warning("Bash events retention cleanup error: %s", e) + # Brief back-off to prevent log flooding if the failure is persistent + # (e.g. permission error, full disk). Cap at the normal interval so + # we don't over-delay in low-retention configurations. + await asyncio.sleep(min(interval, 60.0)) + # Always sleep the full interval after the error back-off, so total + # wait on error = min(interval, 60) + interval ≈ 2× normal cadence. + await asyncio.sleep(interval) + async def subscribe_to_events(self, subscriber: Subscriber[BashEventBase]) -> UUID: """Subscribe to bash events. diff --git a/openhands-agent-server/openhands/agent_server/config.py b/openhands-agent-server/openhands/agent_server/config.py index 87d1a46be2..b76a18f460 100644 --- a/openhands-agent-server/openhands/agent_server/config.py +++ b/openhands-agent-server/openhands/agent_server/config.py @@ -139,6 +139,19 @@ class Config(BaseModel): "Defaults to 'workspace/bash_events'." ), ) + bash_events_retention_seconds: int | None = Field( + default=None, + gt=0, + description=( + "How long bash event files are retained on disk, in seconds. " + "A background task purges events older than this window on a " + "rolling basis. None (default) retains events indefinitely. " + "Should be set higher than the longest expected command timeout: " + "a command whose BashCommand file is purged mid-execution will " + "complete normally, but its on-disk event history will be " + "incomplete. A value >= 2x max command timeout avoids this." + ), + ) static_files_path: Path | None = Field( default=None, description=( diff --git a/tests/agent_server/test_bash_service.py b/tests/agent_server/test_bash_service.py index 2228f10a34..033de166db 100644 --- a/tests/agent_server/test_bash_service.py +++ b/tests/agent_server/test_bash_service.py @@ -1,8 +1,10 @@ """Tests for bash_service.py.""" import asyncio +import contextlib import time from collections.abc import AsyncIterator +from datetime import UTC, datetime from pathlib import Path from uuid import UUID @@ -14,6 +16,7 @@ from openhands.agent_server import bash_router as bash_router_module from openhands.agent_server.bash_service import BashEventService from openhands.agent_server.config import Config +from openhands.agent_server.models import BashCommand from openhands.agent_server.server_details_router import ( mark_initialization_complete, server_details_router, @@ -80,3 +83,90 @@ async def test_bash_timeout_runs_sigterm_trap( await asyncio.sleep(0.2) # let the trap's filesystem write land assert marker.exists(), "SIGTERM trap did not run; cleanup skipped." + + +# --------------------------------------------------------------------------- +# delete_events_older_than +# --------------------------------------------------------------------------- + +_OLD = datetime(2020, 1, 1, tzinfo=UTC) +_NEW = datetime(2022, 1, 1, tzinfo=UTC) +_CUTOFF = datetime(2021, 1, 1, tzinfo=UTC) + + +def test_delete_events_older_than_removes_old_keeps_new(tmp_path: Path): + service = BashEventService(bash_events_dir=tmp_path / "bash_events") + + old_cmd = BashCommand(command="echo old", timestamp=_OLD) + new_cmd = BashCommand(command="echo new", timestamp=_NEW) + service._save_event_to_file(old_cmd) + service._save_event_to_file(new_cmd) + + count = service.delete_events_older_than(_CUTOFF) + + assert count == 1 + remaining = service._get_event_files_by_pattern("*") + assert len(remaining) == 1 + assert new_cmd.id.hex in remaining[0].name + + +def test_delete_events_older_than_empty_directory(tmp_path: Path): + service = BashEventService(bash_events_dir=tmp_path / "bash_events") + count = service.delete_events_older_than(_CUTOFF) + assert count == 0 + + +def test_delete_events_older_than_all_newer_are_skipped(tmp_path: Path): + service = BashEventService(bash_events_dir=tmp_path / "bash_events") + + new_cmd = BashCommand(command="echo new", timestamp=_NEW) + service._save_event_to_file(new_cmd) + + count = service.delete_events_older_than(_CUTOFF) + + assert count == 0 + assert len(service._get_event_files_by_pattern("*")) == 1 + + +def test_delete_events_older_than_returns_correct_count(tmp_path: Path): + service = BashEventService(bash_events_dir=tmp_path / "bash_events") + + for i in range(3): + service._save_event_to_file(BashCommand(command=f"echo {i}", timestamp=_OLD)) + service._save_event_to_file(BashCommand(command="echo new", timestamp=_NEW)) + + count = service.delete_events_older_than(_CUTOFF) + + assert count == 3 + assert len(service._get_event_files_by_pattern("*")) == 1 + + +# --------------------------------------------------------------------------- +# run_retention_cleanup_loop +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(5) +async def test_run_retention_cleanup_loop_purges_old_events(tmp_path: Path): + service = BashEventService(bash_events_dir=tmp_path / "bash_events") + + # Write an event whose recorded timestamp is well in the past. + service._save_event_to_file(BashCommand(command="echo old", timestamp=_OLD)) + assert len(service._get_event_files_by_pattern("*")) == 1 + + # Run the loop with a 1-second retention window and a 50 ms tick so + # the test doesn't have to wait for the default 60-second interval. + task = asyncio.create_task( + service.run_retention_cleanup_loop(retention_seconds=1, interval_seconds=0.05) + ) + try: + # Give the loop time to fire at least once. + await asyncio.sleep(0.15) + finally: + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + assert len(service._get_event_files_by_pattern("*")) == 0, ( + "Old event file should have been purged by the retention loop" + )