Skip to content
22 changes: 21 additions & 1 deletion openhands-agent-server/openhands/agent_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile
import traceback
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
Expand All @@ -17,6 +17,7 @@

from openhands.agent_server.auth_router import auth_router
from openhands.agent_server.bash_router import bash_router
from openhands.agent_server.bash_service import get_default_bash_event_service
from openhands.agent_server.cloud_proxy_router import cloud_proxy_router
from openhands.agent_server.config import (
Config,
Expand Down Expand Up @@ -188,9 +189,28 @@ async def start_tool_preload_service():
async with service:
# Store the initialized service in app state for dependency injection
api.state.conversation_service = service

config = api.state.config
retention_task: asyncio.Task | None = None
if config.bash_events_retention_seconds is not None:
retention_task = asyncio.create_task(
get_default_bash_event_service().run_retention_cleanup_loop(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: This starts cleanup on the module-level default BashEventService, which is created from get_default_config() during router import rather than from api.state.config. With create_app(Config(bash_events_dir=custom, bash_events_retention_seconds=...)), the lifespan uses the app's retention value but deletes from the singleton's default workspace/bash_events directory instead of the configured directory. Please wire the bash event service from the app config for both routes/lifespan (or otherwise initialize the singleton from this config) and add a lifespan/config test so custom bash_events_dir is honored.

config.bash_events_retention_seconds
)
)
logger.info(
"Bash events retention cleanup started (retention: %ds)",
config.bash_events_retention_seconds,
)

try:
yield
finally:
if retention_task is not None:
retention_task.cancel()
with suppress(asyncio.CancelledError):
await retention_task

# Define async functions for stopping each service
async def stop_vscode_service():
if vscode_service is not None:
Expand Down
74 changes: 72 additions & 2 deletions openhands-agent-server/openhands/agent_server/bash_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import signal
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from uuid import UUID

Expand All @@ -18,7 +18,7 @@
)
from openhands.agent_server.pub_sub import PubSub, Subscriber
from openhands.sdk.logger import get_logger
from openhands.sdk.utils import sanitized_env
from openhands.sdk.utils import sanitized_env, utc_now


logger = get_logger(__name__)
Expand Down Expand Up @@ -344,6 +344,76 @@ async def read_stream(stream, is_stderr=False):
self._save_event_to_file(error_output)
await self._pub_sub(error_output)

def delete_events_older_than(self, cutoff: datetime) -> int:
"""Delete bash event files with a recorded timestamp older than ``cutoff``.

This is a synchronous method — all operations are blocking filesystem
I/O. Callers on the asyncio event loop should use
``await asyncio.to_thread(service.delete_events_older_than, cutoff)``
to avoid stalling the loop.

File names are prefixed with ``YYYYMMDDHHMMSS`` in ascending sort order,
so scanning stops as soon as a file at or after the cutoff is reached.

Returns:
int: The number of event files deleted.
"""
cutoff_str = self._timestamp_to_str(cutoff)
files = self._get_event_files_by_pattern("*") # ascending chronological order
Comment thread
tofarr marked this conversation as resolved.
count = 0
for path in files:
if path.name >= cutoff_str:
break # remaining files are at or newer than cutoff
try:
path.unlink(missing_ok=True)
count += 1
except Exception as e:
logger.warning("Failed to delete bash event file %s: %s", path, e)
if count:
logger.info(
"Deleted %d bash event file(s) older than %s", count, cutoff_str
)
return count

async def run_retention_cleanup_loop(
self,
retention_seconds: int,
interval_seconds: float | None = None,
) -> None:
"""Periodically purge bash event files older than ``retention_seconds``.

Runs until cancelled (e.g. during application shutdown). Cleanup runs
immediately on entry so that files accumulated across a server restart
are purged without waiting for the first interval to elapse.

Blocking filesystem work is dispatched to a thread via
``asyncio.to_thread`` to keep the event loop free.

Args:
retention_seconds: Age threshold in seconds; older files are deleted.
interval_seconds: How often to run the cleanup. Defaults to
``max(60, retention_seconds / 2)``. Pass a smaller value in
tests to avoid long waits.
"""
interval = (
interval_seconds
if interval_seconds is not None
else max(60.0, retention_seconds / 2)
)
while True:
try:
cutoff = utc_now() - timedelta(seconds=retention_seconds)
await asyncio.to_thread(self.delete_events_older_than, cutoff)
except Exception as e:
Comment thread
tofarr marked this conversation as resolved.
logger.warning("Bash events retention cleanup error: %s", e)
# Brief back-off to prevent log flooding if the failure is persistent
# (e.g. permission error, full disk). Cap at the normal interval so
# we don't over-delay in low-retention configurations.
await asyncio.sleep(min(interval, 60.0))
# Always sleep the full interval after the error back-off, so total
# wait on error = min(interval, 60) + interval ≈ 2× normal cadence.
await asyncio.sleep(interval)
Comment thread
tofarr marked this conversation as resolved.

async def subscribe_to_events(self, subscriber: Subscriber[BashEventBase]) -> UUID:
"""Subscribe to bash events.

Expand Down
13 changes: 13 additions & 0 deletions openhands-agent-server/openhands/agent_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ class Config(BaseModel):
"Defaults to 'workspace/bash_events'."
),
)
bash_events_retention_seconds: int | None = Field(
default=None,
gt=0,
description=(
"How long bash event files are retained on disk, in seconds. "
"A background task purges events older than this window on a "
"rolling basis. None (default) retains events indefinitely. "
"Should be set higher than the longest expected command timeout: "
"a command whose BashCommand file is purged mid-execution will "
"complete normally, but its on-disk event history will be "
"incomplete. A value >= 2x max command timeout avoids this."
),
)
static_files_path: Path | None = Field(
default=None,
description=(
Expand Down
90 changes: 90 additions & 0 deletions tests/agent_server/test_bash_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Tests for bash_service.py."""

import asyncio
import contextlib
import time
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from pathlib import Path
from uuid import UUID

Expand All @@ -14,6 +16,7 @@
from openhands.agent_server import bash_router as bash_router_module
from openhands.agent_server.bash_service import BashEventService
from openhands.agent_server.config import Config
from openhands.agent_server.models import BashCommand
from openhands.agent_server.server_details_router import (
mark_initialization_complete,
server_details_router,
Expand Down Expand Up @@ -80,3 +83,90 @@ async def test_bash_timeout_runs_sigterm_trap(

await asyncio.sleep(0.2) # let the trap's filesystem write land
assert marker.exists(), "SIGTERM trap did not run; cleanup skipped."


# ---------------------------------------------------------------------------
# delete_events_older_than
# ---------------------------------------------------------------------------

_OLD = datetime(2020, 1, 1, tzinfo=UTC)
_NEW = datetime(2022, 1, 1, tzinfo=UTC)
_CUTOFF = datetime(2021, 1, 1, tzinfo=UTC)


def test_delete_events_older_than_removes_old_keeps_new(tmp_path: Path):
service = BashEventService(bash_events_dir=tmp_path / "bash_events")

old_cmd = BashCommand(command="echo old", timestamp=_OLD)
new_cmd = BashCommand(command="echo new", timestamp=_NEW)
service._save_event_to_file(old_cmd)
service._save_event_to_file(new_cmd)

count = service.delete_events_older_than(_CUTOFF)

assert count == 1
remaining = service._get_event_files_by_pattern("*")
assert len(remaining) == 1
assert new_cmd.id.hex in remaining[0].name


def test_delete_events_older_than_empty_directory(tmp_path: Path):
service = BashEventService(bash_events_dir=tmp_path / "bash_events")
count = service.delete_events_older_than(_CUTOFF)
assert count == 0


def test_delete_events_older_than_all_newer_are_skipped(tmp_path: Path):
service = BashEventService(bash_events_dir=tmp_path / "bash_events")

new_cmd = BashCommand(command="echo new", timestamp=_NEW)
service._save_event_to_file(new_cmd)

count = service.delete_events_older_than(_CUTOFF)

assert count == 0
assert len(service._get_event_files_by_pattern("*")) == 1


def test_delete_events_older_than_returns_correct_count(tmp_path: Path):
service = BashEventService(bash_events_dir=tmp_path / "bash_events")

for i in range(3):
service._save_event_to_file(BashCommand(command=f"echo {i}", timestamp=_OLD))
service._save_event_to_file(BashCommand(command="echo new", timestamp=_NEW))

count = service.delete_events_older_than(_CUTOFF)

assert count == 3
assert len(service._get_event_files_by_pattern("*")) == 1


# ---------------------------------------------------------------------------
# run_retention_cleanup_loop
# ---------------------------------------------------------------------------


@pytest.mark.timeout(5)
async def test_run_retention_cleanup_loop_purges_old_events(tmp_path: Path):
service = BashEventService(bash_events_dir=tmp_path / "bash_events")

# Write an event whose recorded timestamp is well in the past.
service._save_event_to_file(BashCommand(command="echo old", timestamp=_OLD))
assert len(service._get_event_files_by_pattern("*")) == 1

# Run the loop with a 1-second retention window and a 50 ms tick so
# the test doesn't have to wait for the default 60-second interval.
task = asyncio.create_task(
service.run_retention_cleanup_loop(retention_seconds=1, interval_seconds=0.05)
)
try:
# Give the loop time to fire at least once.
await asyncio.sleep(0.15)
finally:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task

assert len(service._get_event_files_by_pattern("*")) == 0, (
"Old event file should have been purged by the retention loop"
)
Loading