Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions code-interpreter/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from app.api.routes import router as api_router
from app.app_configs import EXECUTOR_BACKEND, HOST, PORT, PYTHON_EXECUTOR_DOCKER_IMAGE
from app.models.schemas import HealthResponse
from app.services.executor_factory import get_executor

# Configure logging
logging.basicConfig(
Expand Down Expand Up @@ -101,8 +103,10 @@ def create_app() -> FastAPI:
)

@app.get("/health")
def health() -> dict[str, str]: # sync + strictly typed
return {"status": "ok"}
def health() -> HealthResponse:
"""Health check that verifies the executor backend is operational."""
result = get_executor().check_health()
return HealthResponse(status=result.status, message=result.message)

app.include_router(api_router, prefix="/v1")
return app
Expand Down
5 changes: 5 additions & 0 deletions code-interpreter/app/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ class ListFilesResponse(BaseModel):
default_factory=list,
description="List of all stored files with their metadata.",
)


class HealthResponse(BaseModel):
status: Literal["ok", "error"]
message: StrictStr | None = None
15 changes: 15 additions & 0 deletions code-interpreter/app/services/executor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ class StreamResult:
StreamEvent = StreamChunk | StreamResult


@dataclass(frozen=True, slots=True)
class HealthCheck:
"""Result of an executor health check."""

status: Literal["ok", "error"]
message: str | None = None


class ExecutorProtocol(Protocol):
def execute_python(
self,
Expand All @@ -114,6 +122,13 @@ def execute_python(


class BaseExecutor(ABC):
def check_health(self) -> HealthCheck:
"""Check if the executor backend is operational.
Default implementation returns ok. Override for backend-specific checks.
"""
return HealthCheck(status="ok")

@abstractmethod
def execute_python(
self,
Expand Down
46 changes: 46 additions & 0 deletions code-interpreter/app/services/executor_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
BaseExecutor,
EntryKind,
ExecutionResult,
HealthCheck,
StreamChunk,
StreamEvent,
StreamResult,
Expand All @@ -49,6 +50,51 @@ def __init__(self) -> None:
self.image = PYTHON_EXECUTOR_DOCKER_IMAGE
self.run_args = PYTHON_EXECUTOR_DOCKER_RUN_ARGS

def check_health(self) -> HealthCheck:
"""Verify Docker daemon is reachable and the executor image is available."""
# Check Docker daemon connectivity
try:
result = subprocess.run(
[self.docker_binary, "version", "--format", "{{.Server.Version}}"],
capture_output=True,
timeout=5,
check=False,
)
except FileNotFoundError:
return HealthCheck(status="error", message="Docker binary not found")
except subprocess.TimeoutExpired:
return HealthCheck(status="error", message="Docker daemon not responding")

if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="replace").strip()
return HealthCheck(
status="error",
message=f"Docker daemon not reachable: {stderr}",
)

# Check executor image is available locally
image_with_tag = f"{self.image}:latest"
try:
img_result = subprocess.run(
[self.docker_binary, "image", "inspect", image_with_tag],
capture_output=True,
timeout=5,
check=False,
)
except subprocess.TimeoutExpired:
return HealthCheck(
status="error",
message=f"Timeout checking image {image_with_tag}",
)

if img_result.returncode != 0:
return HealthCheck(
status="error",
message=f"Executor image {image_with_tag} not available locally",
)

return HealthCheck(status="ok")

def _resolve_docker_binary(self) -> str:
candidate = PYTHON_EXECUTOR_DOCKER_BIN
docker_path = which(candidate)
Expand Down
17 changes: 17 additions & 0 deletions code-interpreter/app/services/executor_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
BaseExecutor,
EntryKind,
ExecutionResult,
HealthCheck,
WorkspaceEntry,
wrap_last_line_interactive,
)
Expand Down Expand Up @@ -76,6 +77,22 @@ def __init__(self) -> None:
self.image = KUBERNETES_EXECUTOR_IMAGE
self.service_account = KUBERNETES_EXECUTOR_SERVICE_ACCOUNT

def check_health(self) -> HealthCheck:
"""Verify Kubernetes API is reachable and the namespace is accessible."""
try:
self.v1.read_namespace(name=self.namespace)
except ApiException as e:
return HealthCheck(
status="error",
message=f"Kubernetes API error (namespace={self.namespace}): {e.reason}",
)
except Exception as e:
return HealthCheck(
status="error",
message=f"Kubernetes API not reachable: {e}",
)
return HealthCheck(status="ok")

def _create_pod_manifest(
self,
pod_name: str,
Expand Down
2 changes: 1 addition & 1 deletion code-interpreter/tests/e2e/test_basic_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_execute_endpoint_basic_flow() -> None:
pytest.fail(f"Failed to reach Code Interpreter service at {BASE_URL}: {exc!s}")

assert health_response.status_code == 200, health_response.text
assert health_response.json() == {"status": "ok"}
assert health_response.json()["status"] == "ok"

execute_payload: dict[str, Any] = {
"code": "print('hello from e2e')",
Expand Down
139 changes: 139 additions & 0 deletions code-interpreter/tests/integration_tests/test_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from __future__ import annotations

import subprocess
from collections.abc import Generator
from unittest.mock import patch

import pytest
from fastapi.testclient import TestClient

from app.main import create_app
from app.services.executor_base import HealthCheck
from app.services.executor_docker import DockerExecutor
from app.services.executor_factory import get_executor


@pytest.fixture(autouse=True)
def _clear_executor_cache() -> Generator[None, None, None]:
"""Reset the lru_cache on get_executor so patches take effect."""
get_executor.cache_clear()
yield
get_executor.cache_clear()


def test_health_returns_ok_when_backend_healthy() -> None:
client = TestClient(create_app())
response = client.get("/health")

assert response.status_code == 200
body = response.json()
assert body["status"] == "ok"
assert body["message"] is None


def test_health_returns_error_when_backend_unhealthy() -> None:
unhealthy = HealthCheck(status="error", message="daemon down")

with patch.object(DockerExecutor, "check_health", return_value=unhealthy):
client = TestClient(create_app())
response = client.get("/health")

assert response.status_code == 200
body = response.json()
assert body["status"] == "error"
assert body["message"] == "daemon down"


def _make_completed(returncode: int, stderr: bytes = b"") -> subprocess.CompletedProcess[bytes]:
return subprocess.CompletedProcess(args=[], returncode=returncode, stdout=b"", stderr=stderr)


def test_docker_health_ok() -> None:
"""Both Docker daemon and image check succeed."""
with patch("app.services.executor_docker.subprocess.run", return_value=_make_completed(0)):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "ok"
assert result.message is None


def test_docker_health_daemon_unreachable() -> None:
"""Docker daemon returns non-zero exit code."""
with patch(
"app.services.executor_docker.subprocess.run",
return_value=_make_completed(1, stderr=b"Cannot connect to the Docker daemon"),
):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "error"
assert "Docker daemon not reachable" in (result.message or "")


def test_docker_health_daemon_timeout() -> None:
"""Docker daemon command times out."""
with patch(
"app.services.executor_docker.subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd="docker", timeout=5),
):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "error"
assert "not responding" in (result.message or "")


def test_docker_health_binary_not_found() -> None:
"""Docker binary does not exist."""
with patch(
"app.services.executor_docker.subprocess.run",
side_effect=FileNotFoundError,
):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "error"
assert "not found" in (result.message or "")


def test_docker_health_image_missing() -> None:
"""Docker daemon is reachable but the executor image is not available."""
daemon_ok = _make_completed(0)
image_missing = _make_completed(1)

call_count = 0

def _side_effect(*args: object, **kwargs: object) -> subprocess.CompletedProcess[bytes]:
nonlocal call_count
call_count += 1
# First call: docker version (daemon check) → ok
# Second call: docker image inspect → fail
return daemon_ok if call_count == 1 else image_missing

with patch("app.services.executor_docker.subprocess.run", side_effect=_side_effect):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "error"
assert "not available locally" in (result.message or "")


def test_docker_health_image_check_timeout() -> None:
"""Docker daemon is reachable but the image inspect times out."""
daemon_ok = _make_completed(0)
call_count = 0

def _side_effect(*args: object, **kwargs: object) -> subprocess.CompletedProcess[bytes]:
nonlocal call_count
call_count += 1
if call_count == 1:
return daemon_ok
raise subprocess.TimeoutExpired(cmd="docker", timeout=5)

with patch("app.services.executor_docker.subprocess.run", side_effect=_side_effect):
executor = DockerExecutor()
result = executor.check_health()

assert result.status == "error"
assert "Timeout checking image" in (result.message or "")