Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions operator_use/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ACPAgentEntry,
ACPServerSettings,
HeartbeatConfig,
SessionConfig,
ToolsConfig,
RetryConfig,
SubagentConfig,
Expand Down Expand Up @@ -50,6 +51,7 @@
"ACPAgentEntry",
"ACPServerSettings",
"HeartbeatConfig",
"SessionConfig",
"ToolsConfig",
"RetryConfig",
"SubagentConfig",
Expand Down
7 changes: 7 additions & 0 deletions operator_use/config/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ class HeartbeatConfig(Base):
llm_config: Optional[LLMConfig] = None # Dedicated LLM for heartbeat tasks


class SessionConfig(Base):
"""Session lifecycle configuration."""

ttl_hours: float = 24.0 # Session idle timeout in hours (default: 24h)


class Config(BaseSettings):
"""Root configuration for Operator."""

Expand All @@ -298,6 +304,7 @@ class Config(BaseSettings):
search: SearchConfig = Field(default_factory=SearchConfig)
providers: ProvidersConfig = Field(default_factory=ProvidersConfig)
heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig)
session: SessionConfig = Field(default_factory=SessionConfig)
# Named registry of pre-approved remote ACP agents.
# The LLM can only call agents listed here — it never supplies raw URLs.
acp_agents: Dict[str, ACPAgentEntry] = Field(default_factory=dict)
Expand Down
147 changes: 133 additions & 14 deletions operator_use/session/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, Optional

from operator_use.messages.service import BaseMessage
from operator_use.utils.helper import ensure_directory
from operator_use.session.views import Session
from operator_use.session.views import Session, DEFAULT_SESSION_TTL


class SessionStore:
"""Store for sessions, keyed by session id. Persists to JSONL files."""
"""Store for sessions, keyed by session id. Persists to JSONL files.

def __init__(self, workspace: Path):
When *encryption_key* is provided (a URL-safe base-64 Fernet key), session
files are written as a single encrypted blob instead of plain JSONL lines.
The key can be generated with ``cryptography.fernet.Fernet.generate_key()``.
"""

def __init__(self, workspace: Path, encryption_key: Optional[str] = None):
self.workspace = Path(workspace)
self.sessions_dir = ensure_directory(self.workspace / "sessions")
self._sessions: dict[str, Session] = {}
self._fernet = None
if encryption_key:
from cryptography.fernet import Fernet
key_bytes = encryption_key.encode() if isinstance(encryption_key, str) else encryption_key
self._fernet = Fernet(key_bytes)

def _session_id_to_filename(self, session_id: str) -> str:
"""Make session_id filesystem-safe (e.g. `:` invalid on Windows)."""
Expand All @@ -26,10 +36,20 @@ def _session_id_to_filename(self, session_id: str) -> str:
def _sessions_path(self, session_id: str) -> Path:
return self.sessions_dir / f"{self._session_id_to_filename(session_id)}.jsonl"

def load(self, session_id: str) -> Session | None:
def load(self, session_id: str, ttl: float = DEFAULT_SESSION_TTL) -> Session | None:
path = self._sessions_path(session_id)
if not path.exists():
return None

if self._fernet:
return self._load_encrypted(session_id, path, ttl)

raw = path.read_bytes()
if raw.startswith(b"gAAAAA") and self._fernet is None:
raise ValueError(
f"Session file for '{session_id}' is Fernet-encrypted but no encryption_key was provided."
)

messages: list[BaseMessage] = []
created_at = datetime.now()
updated_at = datetime.now()
Expand All @@ -49,16 +69,53 @@ def load(self, session_id: str) -> Session | None:
continue
if "role" in obj:
messages.append(BaseMessage.from_dict(obj))
return Session(

return Session._from_persisted(
id=session_id,
messages=messages,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
ttl=ttl,
)

def _load_encrypted(self, session_id: str, path: Path, ttl: float) -> Session | None:
"""Load and decrypt a session file written by _save_encrypted()."""
from cryptography.fernet import InvalidToken

if self._fernet is None:
raise ValueError(
f"Session {session_id!r} appears to be encrypted but no encryption_key was provided."
)
raw = path.read_bytes()
try:
decrypted = self._fernet.decrypt(raw)
except InvalidToken as exc:
raise ValueError(
f"Failed to decrypt session '{session_id}': wrong key or corrupted data."
) from exc

payload = json.loads(decrypted.decode())
created_at = datetime.fromisoformat(payload.get("created_at", datetime.now().isoformat()))
updated_at = datetime.fromisoformat(payload.get("updated_at", datetime.now().isoformat()))
metadata = payload.get("metadata", {})
messages = [BaseMessage.from_dict(m) for m in payload.get("messages", [])]
return Session._from_persisted(
id=session_id,
messages=messages,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
ttl=ttl,
)

def save(self, session: Session) -> None:
path = self._sessions_path(session.id)

if self._fernet:
self._save_encrypted(session, path)
return

with open(path, "w", encoding="utf-8") as f:
meta = {
"type": "metadata",
Expand All @@ -71,15 +128,45 @@ def save(self, session: Session) -> None:
for msg in session.messages:
f.write(json.dumps(msg.to_dict()) + "\n")

def get_or_create(self, session_id: str | None = None) -> Session:
"""Get a session by id, or create and store a new one. Loads from JSONL if exists."""
def _save_encrypted(self, session: Session, path: Path) -> None:
"""Serialize the session to JSON and write as a Fernet-encrypted blob."""
payload = {
"id": session.id,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata,
"messages": [msg.to_dict() for msg in session.messages],
}
token = self._fernet.encrypt(json.dumps(payload).encode())
path.write_bytes(token)

def get_or_create(
self,
session_id: Optional[str] = None,
ttl: float = DEFAULT_SESSION_TTL,
) -> Session:
"""Get a session by id, or create and store a new one.

Loads from JSONL if exists. If the loaded session is expired (based on
real idle time derived from *updated_at*), it is deleted and a fresh
session is returned instead.
"""
id = session_id or str(uuid.uuid4())
if session := self._sessions.get(id):
return session
if session := self.load(id):
self._sessions[id] = session
return session
session = Session(id=id)

if cached := self._sessions.get(id):
if not cached.is_expired():
return cached
# In-memory session has expired — evict and fall through to create
del self._sessions[id]

if session := self.load(id, ttl=ttl):
if session.is_expired():
self.delete(id)
else:
self._sessions[id] = session
return session

session = Session(id=id, ttl=ttl)
self._sessions[id] = session
return session

Expand Down Expand Up @@ -108,6 +195,38 @@ def archive(self, session_id: str) -> bool:
return True
return False

def cleanup(self, ttl: float = DEFAULT_SESSION_TTL) -> list[str]:
"""Delete all sessions whose idle time (since *updated_at*) exceeds *ttl*.

Returns the list of session IDs that were removed.
Archived session files are skipped.
"""
# Build a reverse map: filesystem stem -> original session_id (in-memory key).
# Sessions with `:` in their IDs are stored under the original ID in
# self._sessions but their filename stem uses `_` as a replacement.
stem_to_original: dict[str, str] = {
self._session_id_to_filename(sid): sid for sid in self._sessions
}

removed: list[str] = []
for path in self.sessions_dir.glob("*.jsonl"):
# Skip archived sessions
if "_archived_" in path.stem:
continue
session_id_fs = path.stem
session = self.load(session_id_fs, ttl=ttl)
if session is None:
continue
if session.is_expired():
path.unlink()
# Evict from in-memory cache using the original session ID if known,
# otherwise fall back to the filesystem-safe stem.
original_id = stem_to_original.get(session_id_fs, session_id_fs)
if original_id in self._sessions:
del self._sessions[original_id]
removed.append(original_id)
return removed

def list_sessions(self) -> list[dict[str, Any]]:
"""Load sessions from the sessions directory. Returns list of dicts with id, created_at, updated_at, path."""
result: list[dict[str, Any]] = []
Expand Down
58 changes: 56 additions & 2 deletions operator_use/session/views.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Session views."""

import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from typing import TYPE_CHECKING, Any

from operator_use.messages.service import BaseMessage

if TYPE_CHECKING:
from operator_use.config.service import Config

DEFAULT_SESSION_TTL = 86400.0 # 24 hours (config-driven default)


@dataclass
class Session:
Expand All @@ -16,17 +22,65 @@ class Session:
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict)
ttl: float = DEFAULT_SESSION_TTL
# _last_activity is set in __post_init__ so that tests can monkeypatch
# time.monotonic before instantiation and get a consistent starting value.
_last_activity: float = field(init=False, default=0.0)

def __post_init__(self) -> None:
self._last_activity = time.monotonic()

def add_message(self, message: BaseMessage) -> None:
"""Add a message and update updated_at."""
self.messages.append(message)
self.updated_at = datetime.now()
self.touch()

def get_history(self) -> list[BaseMessage]:
"""Return the message history."""
Comment on lines +25 to 40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Loaded sessions never expire 📎 Requirement gap ⛨ Security

_last_activity is initialized with time.monotonic() on object creation and is not restored from
persisted timestamps, so sessions loaded from disk will appear “fresh” and won’t expire based on
real age/idle time.
Agent Prompt
## Issue description
Sessions loaded from disk will not expire correctly because `_last_activity` is not persisted/restored and is initialized at load time.

## Issue Context
To expire sessions on next access, the expiry calculation must use a persisted timestamp (e.g., `updated_at` or a dedicated `last_activity` field stored in the JSONL metadata) and accessors like `get_or_create()` should invalidate/delete expired sessions.

## Fix Focus Areas
- operator_use/session/views.py[22-46]
- operator_use/session/service.py[29-58]
- operator_use/session/service.py[74-84]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

return list(self.messages)

def clear(self) -> None:
"""Clear all messages."""
"""Clear all messages and refresh the TTL window."""
self.messages.clear()
self.updated_at = datetime.now()
self.touch()

def touch(self) -> None:
"""Refresh last_activity timestamp, extending the session TTL window."""
self._last_activity = time.monotonic()

def is_expired(self) -> bool:
"""Return True if idle time since last activity exceeds the TTL."""
return (time.monotonic() - self._last_activity) > self.ttl

@classmethod
def from_config(cls, id: str, config: "Config") -> "Session":
"""Construct a Session using TTL from config.session.ttl_hours."""
ttl = config.session.ttl_hours * 3600
return cls(id=id, ttl=ttl)

@classmethod
def _from_persisted(
cls,
id: str,
messages: list[BaseMessage],
created_at: datetime,
updated_at: datetime,
metadata: dict[str, Any],
ttl: float = DEFAULT_SESSION_TTL,
) -> "Session":
"""Reconstruct a Session from disk, anchoring _last_activity to the
real idle time derived from updated_at so that loaded sessions expire
correctly rather than resetting to 'now'."""
session = cls(
id=id,
messages=messages,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
ttl=ttl,
)
idle_seconds = max(0.0, (datetime.now() - updated_at).total_seconds())
session._last_activity = time.monotonic() - idle_seconds
return session
3 changes: 2 additions & 1 deletion operator_use/tools/control_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import logging
import os
import subprocess
import sys
from typing import Optional

Expand Down Expand Up @@ -125,7 +126,7 @@ async def _do_restart(graceful_fn=None) -> None:
``os._exit(75)`` which skips cleanup but guarantees the process terminates.
"""
global _requested_exit_code
os.system("cls" if os.name == "nt" else "clear")
subprocess.run(["cls"] if os.name == "nt" else ["clear"], check=False)
frames = ["↑", "↗", "→", "↘", "↓", "↙", "←", "↖"]
for i in range(20):
sys.stdout.write(f"\r {frames[i % len(frames)]} Restarting Operator...")
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"platformdirs>=4.0.0",
"psutil>=7.0.0",
"pynacl>=1.6.2",
"cryptography>=41.0",
"comtypes>=1.4.15; sys_platform == 'win32'",
"pywin32>=311; sys_platform == 'win32'",
"pyobjc-framework-Cocoa>=10.0; sys_platform == 'darwin'",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def test_agent_run_with_tool_call_then_text(tmp_path):

# Register a simple echo tool
from pydantic import BaseModel
from operator_use.tools.service import Tool
from operator_use.agent.tools.service import Tool

class EchoParams(BaseModel):
message: str
Expand Down
Loading
Loading