diff --git a/DEVOPS_RUNBOOK.md b/DEVOPS_RUNBOOK.md new file mode 100644 index 000000000..f8d0cc7c5 --- /dev/null +++ b/DEVOPS_RUNBOOK.md @@ -0,0 +1,311 @@ +# SuperSandbox — DevOps Deployment Runbook + +## Overview + +SuperSandbox is a sandbox platform that runs isolated Linux containers on Kubernetes with gVisor for syscall-level security. It provides: +- Sandbox lifecycle (create / pause / resume / delete) +- Persistent `/workspace` volume per sandbox (survives pause/resume) +- Pod logs API +- Interactive WebSocket terminal + +## Architecture + +``` +Client (API / WebSocket) + ↓ +SuperSandbox Server (FastAPI, port 8080) + ↓ +Kubernetes API + ↓ +agent-sandbox-controller (reconciles Sandbox CRs → Pods) + ↓ +Pods (gVisor runtime) + PVCs (workspace storage) +``` + +--- + +## Prerequisites + +| Component | Version | Purpose | +|-----------|---------|---------| +| Kubernetes | 1.21+ | Cluster | +| containerd | 2.x | Container runtime (NOT Docker runtime) | +| gVisor (runsc) | latest | Sandbox isolation | +| agent-sandbox-controller | v0.2.1 | Sandbox CRD → Pod reconciliation | +| Python | 3.10+ | SuperSandbox server | +| uv | latest | Python package manager | + +--- + +## Step 1: Kubernetes Cluster Requirements + +The cluster MUST use **containerd** as the container runtime (not dockershim). + +Verify: +```bash +kubectl get nodes -o jsonpath='{.items[*].status.nodeInfo.containerRuntimeVersion}' +# Should show: containerd://2.x.x +``` + +--- + +## Step 2: Install gVisor + +gVisor provides syscall-level isolation. Each sandbox pod runs inside a gVisor sandbox. + +### On each worker node: + +```bash +# Install runsc binary (adjust arch: amd64 or aarch64) +ARCH=$(uname -m) +if [ "$ARCH" = "x86_64" ]; then ARCH="amd64"; fi +if [ "$ARCH" = "aarch64" ]; then ARCH="aarch64"; fi + +curl -fsSL -o /usr/local/bin/runsc \ + https://storage.googleapis.com/gvisor/releases/release/latest/${ARCH}/runsc +chmod +x /usr/local/bin/runsc + +curl -fsSL -o /usr/local/bin/containerd-shim-runsc-v1 \ + https://storage.googleapis.com/gvisor/releases/release/latest/${ARCH}/containerd-shim-runsc-v1 +chmod +x /usr/local/bin/containerd-shim-runsc-v1 +``` + +### Configure containerd to use runsc: + +Add to `/etc/containerd/config.toml`: +```toml +[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runsc] + runtime_type = "io.containerd.runsc.v1" +``` + +Then restart containerd: +```bash +systemctl restart containerd +``` + +### Create the RuntimeClass: + +```bash +kubectl apply -f - < **Note**: For in-cluster deployment, leave `kubeconfig_path = ""`. The server will use the pod's service account. Ensure the service account has permissions to manage Sandbox CRDs, PVCs, and Pods in the `opensandbox` namespace. + +### RBAC for in-cluster deployment: + +```yaml +apiVersion: v1 +kind: ServiceAccount +metadata: + name: supersandbox-server + namespace: opensandbox +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: supersandbox-server +rules: + - apiGroups: ["agents.x-k8s.io"] + resources: ["sandboxes"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + - apiGroups: [""] + resources: ["pods", "pods/log", "pods/exec"] + verbs: ["get", "list", "watch", "create", "delete"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "create", "delete"] + - apiGroups: ["node.k8s.io"] + resources: ["runtimeclasses"] + verbs: ["get", "list"] + - apiGroups: [""] + resources: ["services", "secrets"] + verbs: ["get", "list", "create", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: supersandbox-server +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: supersandbox-server +subjects: + - kind: ServiceAccount + name: supersandbox-server + namespace: opensandbox +``` + +### Start the server: +```bash +opensandbox-server --config ~/.sandbox.toml +``` + +Or via Docker/K8s deployment (containerize the server directory). + +--- + +## Step 7: Verify + +```bash +# 1. Health check +curl http://:8080/health + +# 2. Create a sandbox +curl -X POST http://:8080/sandboxes \ + -H "Content-Type: application/json" \ + -d '{ + "image": {"uri": "python:3.11-slim"}, + "timeout": 3600, + "resourceLimits": {"cpu": "500m", "memory": "512Mi"}, + "entrypoint": ["python", "-c", "import time; time.sleep(3600)"] + }' + +# 3. Check pod is running with gVisor +kubectl get pods -n opensandbox -o jsonpath='{.items[*].spec.runtimeClassName}' +# Should show: gvisor + +# 4. Check PVC created +kubectl get pvc -n opensandbox + +# 5. Test logs +curl "http://:8080/sandboxes//logs?tail=10" + +# 6. Test pause/resume +curl -X POST http://:8080/sandboxes//pause +kubectl get pods -n opensandbox # Should be empty +kubectl get pvc -n opensandbox # PVC still exists + +curl -X POST http://:8080/sandboxes//resume +kubectl get pods -n opensandbox # Pod recreated + +# 7. Delete +curl -X DELETE http://:8080/sandboxes/ +``` + +--- + +## API Quick Reference + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/sandboxes` | POST | Create sandbox | +| `/sandboxes` | GET | List sandboxes | +| `/sandboxes/{id}` | GET | Get sandbox status | +| `/sandboxes/{id}` | DELETE | Delete sandbox + PVC | +| `/sandboxes/{id}/pause` | POST | Pause (scale to 0, keep PVC) | +| `/sandboxes/{id}/resume` | POST | Resume (scale to 1, remount PVC) | +| `/sandboxes/{id}/logs?tail=100&follow=false` | GET | Pod logs | +| `/sandboxes/{id}/terminal` | WebSocket | Interactive bash terminal | +| `/sandboxes/{id}/endpoints/{port}` | GET | Get pod IP:port | + +--- + +## Production Considerations + +- **Storage class**: Default uses `standard`. For production, configure a proper StorageClass (e.g., `gp3` on AWS, `pd-ssd` on GCP) +- **API key**: Set `api_key` in `[server]` config section +- **Resource quotas**: Set ResourceQuota on the `opensandbox` namespace to limit total sandbox resources +- **Network policies**: Consider adding NetworkPolicies to isolate sandbox pods +- **Monitoring**: The server exposes `/health` for liveness probes +- **Volume size**: Adjust `default_workspace_volume_size` in config based on your use case diff --git a/server/src/api/lifecycle.py b/server/src/api/lifecycle.py index d507cc9ac..1a1671e32 100644 --- a/server/src/api/lifecycle.py +++ b/server/src/api/lifecycle.py @@ -19,10 +19,12 @@ All business logic is delegated to the service layer that backs each operation. """ +import asyncio +import logging from typing import List, Optional import httpx -from fastapi import APIRouter, Header, Query, Request, status +from fastapi import APIRouter, Header, Query, Request, WebSocket, WebSocketDisconnect, status from fastapi.exceptions import HTTPException from fastapi.responses import Response, StreamingResponse @@ -497,3 +499,124 @@ async def proxy_sandbox_endpoint_request(request: Request, sandbox_id: str, port raise HTTPException( status_code=500, detail=f"An internal error occurred in the proxy: {e}" ) + + +# ============================================================================ +# Sandbox Logs & Terminal +# ============================================================================ + +_lifecycle_logger = logging.getLogger(__name__) + + +@router.get( + "/sandboxes/{sandbox_id}/logs", + responses={ + 200: {"description": "Pod logs returned successfully"}, + 404: {"description": "Sandbox or pod not found"}, + 409: {"description": "Sandbox is paused"}, + 500: {"description": "An unexpected server error occurred"}, + }, +) +async def get_sandbox_logs( + sandbox_id: str, + tail: int = Query(100, ge=1, le=10000, description="Number of lines from the end"), + follow: bool = Query(False, description="Stream logs in real time"), + x_request_id: Optional[str] = Header(None, alias="X-Request-ID"), +) -> Response: + """ + Get logs from a sandbox pod. + + Returns the stdout/stderr output of the sandbox container. + Use follow=true for real-time streaming via Server-Sent Events. + """ + if follow: + resp = sandbox_service.get_sandbox_logs( + sandbox_id, tail_lines=tail, follow=True + ) + + async def stream_logs(): + try: + for line in resp: + if isinstance(line, bytes): + yield line + else: + yield line.encode("utf-8") + except Exception: + pass + finally: + resp.close() + + return StreamingResponse( + stream_logs(), + media_type="text/plain; charset=utf-8", + ) + else: + logs = sandbox_service.get_sandbox_logs( + sandbox_id, tail_lines=tail, follow=False + ) + return Response(content=logs, media_type="text/plain; charset=utf-8") + + +@router.websocket("/sandboxes/{sandbox_id}/terminal") +async def sandbox_terminal(websocket: WebSocket, sandbox_id: str): + """ + Interactive WebSocket terminal to a sandbox pod. + + Opens an interactive shell (bash) in the sandbox container. + Clients send keystrokes as text, receive terminal output as text. + """ + await websocket.accept() + + try: + ws_client = sandbox_service.exec_sandbox_terminal(sandbox_id) + except HTTPException as e: + detail = e.detail + if isinstance(detail, dict): + reason = detail.get("message", str(detail)) + else: + reason = str(detail) + await websocket.close(code=1008, reason=reason[:123]) + return + except Exception as e: + await websocket.close(code=1011, reason=str(e)[:123]) + return + + async def read_from_k8s(): + """Read from K8s exec stream → send to WebSocket.""" + try: + while ws_client.is_open(): + data = await asyncio.to_thread(ws_client.read_stdout, timeout=1) + if data: + await websocket.send_text(data) + except WebSocketDisconnect: + pass + except Exception as e: + _lifecycle_logger.debug("K8s read loop ended: %s", e) + + async def write_to_k8s(): + """Receive from WebSocket → write to K8s exec stream.""" + try: + while True: + data = await websocket.receive_text() + ws_client.write_stdin(data) + except WebSocketDisconnect: + pass + except Exception as e: + _lifecycle_logger.debug("WebSocket write loop ended: %s", e) + + read_task = asyncio.create_task(read_from_k8s()) + write_task = asyncio.create_task(write_to_k8s()) + + try: + done, pending = await asyncio.wait( + [read_task, write_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + finally: + ws_client.close() + try: + await websocket.close() + except Exception: + pass diff --git a/server/src/config.py b/server/src/config.py index cf2843acc..0245095df 100644 --- a/server/src/config.py +++ b/server/src/config.py @@ -340,6 +340,10 @@ class StorageConfig(BaseModel): "'ossfs_mount_root//'." ), ) + default_workspace_volume_size: str = Field( + default="1Gi", + description="Default storage size for auto-created workspace PVCs.", + ) class EgressConfig(BaseModel): diff --git a/server/src/middleware/auth.py b/server/src/middleware/auth.py index 344055b62..d23884822 100644 --- a/server/src/middleware/auth.py +++ b/server/src/middleware/auth.py @@ -65,6 +65,13 @@ def __init__(self, app, config: Optional[AppConfig] = None): # Read the API key directly from config; suitable for dev/test usage self.valid_api_keys = self._load_api_keys() + async def __call__(self, scope, receive, send): + """Pass WebSocket connections through without HTTP auth.""" + if scope["type"] == "websocket": + await self.app(scope, receive, send) + return + await super().__call__(scope, receive, send) + def _load_api_keys(self) -> set: """ Load valid API keys from configuration. diff --git a/server/src/middleware/request_id.py b/server/src/middleware/request_id.py index eed2655a1..6fa443ce4 100644 --- a/server/src/middleware/request_id.py +++ b/server/src/middleware/request_id.py @@ -50,6 +50,13 @@ class RequestIdMiddleware(BaseHTTPMiddleware): RequestIdFilter without passing request_id explicitly. """ + async def __call__(self, scope, receive, send): + """Pass WebSocket connections through without request ID processing.""" + if scope["type"] == "websocket": + await self.app(scope, receive, send) + return + await super().__call__(scope, receive, send) + async def dispatch(self, request: Request, call_next: Callable) -> Response: raw = request.headers.get(X_REQUEST_ID_HEADER) request_id = (raw and raw.strip()) or uuid.uuid4().hex diff --git a/server/src/services/constants.py b/server/src/services/constants.py index 08459eaa2..b506ffa45 100644 --- a/server/src/services/constants.py +++ b/server/src/services/constants.py @@ -57,6 +57,9 @@ class SandboxErrorCodes: K8S_POD_READY_TIMEOUT = "KUBERNETES::POD_READY_TIMEOUT" K8S_API_ERROR = "KUBERNETES::API_ERROR" K8S_POD_IP_NOT_AVAILABLE = "KUBERNETES::POD_IP_NOT_AVAILABLE" + K8S_INVALID_STATE = "KUBERNETES::INVALID_STATE" + K8S_PVC_CREATE_FAILED = "KUBERNETES::PVC_CREATE_FAILED" + K8S_PVC_DELETE_FAILED = "KUBERNETES::PVC_DELETE_FAILED" # Common error codes UNKNOWN_ERROR = "SANDBOX::UNKNOWN_ERROR" diff --git a/server/src/services/docker.py b/server/src/services/docker.py index c7cf785e2..7d1d5cee6 100644 --- a/server/src/services/docker.py +++ b/server/src/services/docker.py @@ -1785,6 +1785,14 @@ def _resolve_host_mapped_endpoint( self._attach_egress_auth_headers(endpoint, labels) return endpoint + def get_sandbox_logs(self, sandbox_id: str, tail_lines: int = 100, follow: bool = False): + """Get logs from a Docker sandbox container.""" + raise NotImplementedError("Logs are not yet implemented for Docker runtime") + + def exec_sandbox_terminal(self, sandbox_id: str, command: str = "/bin/bash"): + """Open terminal to a Docker sandbox container.""" + raise NotImplementedError("Terminal is not yet implemented for Docker runtime") + def _attach_egress_auth_headers( self, endpoint: Endpoint, diff --git a/server/src/services/k8s/agent_sandbox_provider.py b/server/src/services/k8s/agent_sandbox_provider.py index b1e25a316..7c02e2728 100644 --- a/server/src/services/k8s/agent_sandbox_provider.py +++ b/server/src/services/k8s/agent_sandbox_provider.py @@ -33,6 +33,7 @@ from src.services.helpers import format_ingress_endpoint from src.api.schema import Endpoint, ImageSpec, NetworkPolicy, Volume from src.services.k8s.agent_sandbox_template import AgentSandboxTemplateManager +from src.services.constants import SANDBOX_ID_LABEL from src.services.k8s.client import K8sClient from src.services.k8s.egress_helper import ( apply_egress_to_spec, @@ -99,6 +100,12 @@ def __init__( self.ingress_config = app_config.ingress if app_config else None self.execd_init_resources = k8s_config.execd_init_resources if k8s_config else None + # Workspace volume configuration + self._workspace_volume_size = ( + app_config.storage.default_workspace_volume_size + if app_config and app_config.storage else "1Gi" + ) + # Initialize secure runtime resolver self.resolver = SecureRuntimeResolver(app_config) if app_config else None self.runtime_class = ( @@ -108,6 +115,9 @@ def __init__( def _resource_name(self, sandbox_id: str) -> str: return _to_dns1035_label(sandbox_id, prefix="sandbox") + def _pvc_name(self, sandbox_id: str) -> str: + return f"workspace-{self._resource_name(sandbox_id)}" + def _resource_name_candidates(self, sandbox_id: str) -> List[str]: candidates = [] primary = self._resource_name(sandbox_id) @@ -160,6 +170,24 @@ def create_workload( if volumes: apply_volumes_to_pod_spec(pod_spec, volumes) + # Auto-create and attach workspace PVC + pvc_name = self._pvc_name(sandbox_id) + self.k8s_client.create_pvc( + namespace=namespace, + name=pvc_name, + storage_size=self._workspace_volume_size, + labels={SANDBOX_ID_LABEL: sandbox_id}, + ) + pod_spec["volumes"].append({ + "name": "workspace", + "persistentVolumeClaim": {"claimName": pvc_name}, + }) + pod_spec["containers"][0].setdefault("volumeMounts", []).append({ + "name": "workspace", + "mountPath": "/workspace", + "readOnly": False, + }) + if self.service_account: pod_spec["serviceAccountName"] = self.service_account @@ -194,13 +222,21 @@ def create_workload( else: sandbox["spec"]["shutdownTime"] = expires_at.isoformat() - created = self.k8s_client.create_custom_object( - group=self.group, - version=self.version, - namespace=namespace, - plural=self.plural, - body=sandbox, - ) + try: + created = self.k8s_client.create_custom_object( + group=self.group, + version=self.version, + namespace=namespace, + plural=self.plural, + body=sandbox, + ) + except Exception: + # Clean up orphaned PVC if Sandbox CR creation fails + try: + self.k8s_client.delete_pvc(namespace=namespace, name=pvc_name) + except Exception as cleanup_err: + logger.warning("Failed to clean up PVC %s: %s", pvc_name, cleanup_err) + raise return { "name": created["metadata"]["name"], @@ -383,7 +419,7 @@ def get_workload(self, sandbox_id: str, namespace: str) -> Optional[Dict[str, An return None def delete_workload(self, sandbox_id: str, namespace: str) -> None: - """Delete the Sandbox CRD for the given sandbox ID.""" + """Delete the Sandbox CRD and associated workspace PVC.""" sandbox = self.get_workload(sandbox_id, namespace) if not sandbox: raise Exception(f"Sandbox for sandbox {sandbox_id} not found") @@ -397,6 +433,47 @@ def delete_workload(self, sandbox_id: str, namespace: str) -> None: grace_period_seconds=0, ) + # Clean up the workspace PVC + pvc_name = self._pvc_name(sandbox_id) + try: + self.k8s_client.delete_pvc(namespace=namespace, name=pvc_name) + except Exception as e: + logger.warning("Failed to delete PVC %s: %s", pvc_name, e) + + def pause_workload(self, sandbox_id: str, namespace: str) -> None: + """Pause a sandbox by scaling replicas to 0.""" + sandbox = self.get_workload(sandbox_id, namespace) + if not sandbox: + raise Exception(f"Sandbox {sandbox_id} not found") + current_replicas = sandbox.get("spec", {}).get("replicas", 1) + if current_replicas == 0: + raise Exception(f"Sandbox {sandbox_id} is already paused") + self.k8s_client.patch_custom_object( + group=self.group, + version=self.version, + namespace=namespace, + plural=self.plural, + name=sandbox["metadata"]["name"], + body={"spec": {"replicas": 0}}, + ) + + def resume_workload(self, sandbox_id: str, namespace: str) -> None: + """Resume a paused sandbox by scaling replicas to 1.""" + sandbox = self.get_workload(sandbox_id, namespace) + if not sandbox: + raise Exception(f"Sandbox {sandbox_id} not found") + current_replicas = sandbox.get("spec", {}).get("replicas", 1) + if current_replicas != 0: + raise Exception(f"Sandbox {sandbox_id} is not paused") + self.k8s_client.patch_custom_object( + group=self.group, + version=self.version, + namespace=namespace, + plural=self.plural, + name=sandbox["metadata"]["name"], + body={"spec": {"replicas": 1}}, + ) + def list_workloads(self, namespace: str, label_selector: str) -> List[Dict[str, Any]]: """List Sandbox CRDs matching the given label selector.""" return self.k8s_client.list_custom_objects( @@ -444,6 +521,16 @@ def get_expiration(self, workload: Dict[str, Any]) -> Optional[datetime]: def get_status(self, workload: Dict[str, Any]) -> Dict[str, Any]: """Derive sandbox state from the Sandbox CRD status conditions.""" + # Check if sandbox is paused (replicas == 0) + spec = workload.get("spec", {}) + if spec.get("replicas", 1) == 0: + return { + "state": "Paused", + "reason": "SANDBOX_PAUSED", + "message": "Sandbox is paused (scaled to zero)", + "last_transition_at": workload.get("metadata", {}).get("creationTimestamp"), + } + status = workload.get("status", {}) conditions = status.get("conditions", []) diff --git a/server/src/services/k8s/batchsandbox_provider.py b/server/src/services/k8s/batchsandbox_provider.py index a4a2eab4c..c190f4b76 100644 --- a/server/src/services/k8s/batchsandbox_provider.py +++ b/server/src/services/k8s/batchsandbox_provider.py @@ -682,6 +682,14 @@ def delete_workload(self, sandbox_id: str, namespace: str) -> None: grace_period_seconds=0, ) + def pause_workload(self, sandbox_id: str, namespace: str) -> None: + """Pause is not supported for BatchSandbox provider.""" + raise NotImplementedError("Pause is not supported for BatchSandbox provider") + + def resume_workload(self, sandbox_id: str, namespace: str) -> None: + """Resume is not supported for BatchSandbox provider.""" + raise NotImplementedError("Resume is not supported for BatchSandbox provider") + def list_workloads(self, namespace: str, label_selector: str) -> List[Dict[str, Any]]: """List BatchSandboxes matching label selector.""" return self.k8s_client.list_custom_objects( diff --git a/server/src/services/k8s/client.py b/server/src/services/k8s/client.py index ca341a953..8954971c2 100644 --- a/server/src/services/k8s/client.py +++ b/server/src/services/k8s/client.py @@ -24,6 +24,7 @@ from kubernetes import client, config from kubernetes.client import ApiException, CoreV1Api, CustomObjectsApi, NodeV1Api +from kubernetes.stream import stream as k8s_stream from src.config import KubernetesRuntimeConfig from src.services.k8s.informer import WorkloadInformer @@ -295,3 +296,153 @@ def read_runtime_class(self, name: str) -> Any: if self._read_limiter: self._read_limiter.acquire() return self.get_node_v1_api().read_runtime_class(name) + + # ------------------------------------------------------------------ + # PVC operations + # ------------------------------------------------------------------ + + def create_pvc( + self, + namespace: str, + name: str, + storage_size: str, + labels: Optional[Dict[str, str]] = None, + ) -> Any: + """Create a PersistentVolumeClaim.""" + if self._write_limiter: + self._write_limiter.acquire() + pvc = client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta(name=name, labels=labels), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadWriteOnce"], + resources=client.V1VolumeResourceRequirements( + requests={"storage": storage_size}, + ), + ), + ) + return self.get_core_v1_api().create_namespaced_persistent_volume_claim( + namespace=namespace, + body=pvc, + ) + + def delete_pvc(self, namespace: str, name: str) -> None: + """Delete a PersistentVolumeClaim. No-op if already deleted.""" + if self._write_limiter: + self._write_limiter.acquire() + try: + self.get_core_v1_api().delete_namespaced_persistent_volume_claim( + name=name, + namespace=namespace, + ) + except ApiException as e: + if e.status == 404: + logger.debug("PVC %s already deleted", name) + return + raise + + def get_pvc(self, namespace: str, name: str) -> Optional[Any]: + """Get a PersistentVolumeClaim. Returns None if not found.""" + if self._read_limiter: + self._read_limiter.acquire() + try: + return self.get_core_v1_api().read_namespaced_persistent_volume_claim( + name=name, + namespace=namespace, + ) + except ApiException as e: + if e.status == 404: + return None + raise + + # ------------------------------------------------------------------ + # Pod log operations + # ------------------------------------------------------------------ + + def read_pod_log( + self, + namespace: str, + pod_name: str, + container: str = "sandbox", + tail_lines: Optional[int] = None, + follow: bool = False, + ) -> Any: + """Read logs from a pod container. + + Args: + namespace: Kubernetes namespace + pod_name: Pod name + container: Container name (default: "sandbox") + tail_lines: Number of lines from the end to return + follow: If True, returns a streaming response object + + Returns: + str when follow=False, urllib3.HTTPResponse when follow=True + """ + if self._read_limiter: + self._read_limiter.acquire() + kwargs: Dict[str, Any] = { + "name": pod_name, + "namespace": namespace, + "container": container, + } + if tail_lines is not None: + kwargs["tail_lines"] = tail_lines + if follow: + kwargs["follow"] = True + kwargs["_preload_content"] = False + return self.get_core_v1_api().read_namespaced_pod_log(**kwargs) + + # ------------------------------------------------------------------ + # Pod exec operations + # ------------------------------------------------------------------ + + def exec_interactive( + self, + namespace: str, + pod_name: str, + container: str = "sandbox", + command: Optional[List[str]] = None, + ) -> Any: + """Open an interactive exec stream to a pod (with PTY). + + Returns a WSClient object with .write_stdin(), .read_stdout(), + .read_stderr(), .is_open(), .close() methods. + """ + if command is None: + command = ["/bin/bash"] + if self._read_limiter: + self._read_limiter.acquire() + return k8s_stream( + self.get_core_v1_api().connect_get_namespaced_pod_exec, + name=pod_name, + namespace=namespace, + container=container, + command=command, + stdin=True, + stdout=True, + stderr=True, + tty=True, + _preload_content=False, + ) + + # ------------------------------------------------------------------ + # Pod name resolution + # ------------------------------------------------------------------ + + def get_pod_name_for_sandbox( + self, + namespace: str, + sandbox_id: str, + ) -> Optional[str]: + """Find the running pod name for a sandbox by label selector. + + Returns the first running pod name, or None. + """ + pods = self.list_pods( + namespace=namespace, + label_selector=f"opensandbox.io/id={sandbox_id}", + ) + for pod in pods: + if pod.status and pod.status.phase == "Running": + return pod.metadata.name + return None diff --git a/server/src/services/k8s/kubernetes_service.py b/server/src/services/k8s/kubernetes_service.py index 288340fcd..6e246353f 100644 --- a/server/src/services/k8s/kubernetes_service.py +++ b/server/src/services/k8s/kubernetes_service.py @@ -544,39 +544,109 @@ def delete_sandbox(self, sandbox_id: str) -> None: def pause_sandbox(self, sandbox_id: str) -> None: """ - Pause sandbox (not supported in Kubernetes). - + Pause a running sandbox by scaling its pod to zero. + The workspace PVC is preserved so data persists. + Args: sandbox_id: Unique sandbox identifier - + Raises: - HTTPException: Always raises 501 Not Implemented + HTTPException: If sandbox not found, not running, or pause fails """ - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, - detail={ - "code": SandboxErrorCodes.API_NOT_SUPPORTED, - "message": "Pause operation is not supported in Kubernetes runtime", - }, - ) - + try: + workload = self.workload_provider.get_workload( + sandbox_id=sandbox_id, + namespace=self.namespace, + ) + if not workload: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": SandboxErrorCodes.K8S_SANDBOX_NOT_FOUND, + "message": f"Sandbox '{sandbox_id}' not found", + }, + ) + + status_info = self.workload_provider.get_status(workload) + if status_info["state"] != "Running": + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={ + "code": SandboxErrorCodes.K8S_INVALID_STATE, + "message": ( + f"Cannot pause sandbox in state '{status_info['state']}'. " + "Must be Running." + ), + }, + ) + + self.workload_provider.pause_workload(sandbox_id, self.namespace) + logger.info(f"Paused sandbox: {sandbox_id}") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error pausing sandbox {sandbox_id}: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "code": SandboxErrorCodes.K8S_API_ERROR, + "message": f"Failed to pause sandbox: {str(e)}", + }, + ) from e + def resume_sandbox(self, sandbox_id: str) -> None: """ - Resume sandbox (not supported in Kubernetes). - + Resume a paused sandbox by scaling its pod back to one. + The workspace PVC is remounted automatically. + Args: sandbox_id: Unique sandbox identifier - + Raises: - HTTPException: Always raises 501 Not Implemented + HTTPException: If sandbox not found, not paused, or resume fails """ - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, - detail={ - "code": SandboxErrorCodes.API_NOT_SUPPORTED, - "message": "Resume operation is not supported in Kubernetes runtime", - }, - ) + try: + workload = self.workload_provider.get_workload( + sandbox_id=sandbox_id, + namespace=self.namespace, + ) + if not workload: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": SandboxErrorCodes.K8S_SANDBOX_NOT_FOUND, + "message": f"Sandbox '{sandbox_id}' not found", + }, + ) + + status_info = self.workload_provider.get_status(workload) + if status_info["state"] != "Paused": + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={ + "code": SandboxErrorCodes.K8S_INVALID_STATE, + "message": ( + f"Cannot resume sandbox in state '{status_info['state']}'. " + "Must be Paused." + ), + }, + ) + + self.workload_provider.resume_workload(sandbox_id, self.namespace) + logger.info(f"Resumed sandbox: {sandbox_id}") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error resuming sandbox {sandbox_id}: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "code": SandboxErrorCodes.K8S_API_ERROR, + "message": f"Failed to resume sandbox: {str(e)}", + }, + ) from e def renew_expiration( self, @@ -715,6 +785,85 @@ def get_endpoint( }, ) from e + def get_sandbox_pod_name(self, sandbox_id: str) -> str: + """ + Resolve a sandbox ID to its running pod name. + + Raises: + HTTPException: 404 if sandbox or pod not found, 409 if paused + """ + workload = self.workload_provider.get_workload( + sandbox_id=sandbox_id, + namespace=self.namespace, + ) + if not workload: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": SandboxErrorCodes.K8S_SANDBOX_NOT_FOUND, + "message": f"Sandbox '{sandbox_id}' not found", + }, + ) + + status_info = self.workload_provider.get_status(workload) + if status_info["state"] == "Paused": + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={ + "code": SandboxErrorCodes.K8S_INVALID_STATE, + "message": "Sandbox is paused. Resume it first.", + }, + ) + + pod_name = self.k8s_client.get_pod_name_for_sandbox( + namespace=self.namespace, + sandbox_id=sandbox_id, + ) + if not pod_name: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": SandboxErrorCodes.K8S_POD_IP_NOT_AVAILABLE, + "message": "No running pod found for this sandbox.", + }, + ) + return pod_name + + def get_sandbox_logs( + self, + sandbox_id: str, + tail_lines: int = 100, + follow: bool = False, + ) -> Any: + """ + Get logs from a sandbox pod. + + Returns: + str when follow=False, streaming response when follow=True + """ + pod_name = self.get_sandbox_pod_name(sandbox_id) + return self.k8s_client.read_pod_log( + namespace=self.namespace, + pod_name=pod_name, + container="sandbox", + tail_lines=tail_lines, + follow=follow, + ) + + def exec_sandbox_terminal(self, sandbox_id: str, command: str = "/bin/bash") -> Any: + """ + Open an interactive exec stream to a sandbox pod. + + Returns a WSClient for bidirectional communication. + """ + pod_name = self.get_sandbox_pod_name(sandbox_id) + return self.k8s_client.exec_interactive( + namespace=self.namespace, + pod_name=pod_name, + container="sandbox", + command=[command], + ) + def _attach_egress_auth_headers(self, endpoint: Endpoint, workload: Any) -> None: token = self._get_egress_auth_token(workload) if not token: diff --git a/server/src/services/k8s/workload_provider.py b/server/src/services/k8s/workload_provider.py index a73b326a6..bebcb4d29 100644 --- a/server/src/services/k8s/workload_provider.py +++ b/server/src/services/k8s/workload_provider.py @@ -176,6 +176,28 @@ def get_endpoint_info(self, workload: Any, port: int, sandbox_id: str) -> Option """ pass + @abstractmethod + def pause_workload(self, sandbox_id: str, namespace: str) -> None: + """ + Pause (scale down) a workload, preserving persistent volumes. + + Args: + sandbox_id: Unique sandbox identifier + namespace: Kubernetes namespace + """ + pass + + @abstractmethod + def resume_workload(self, sandbox_id: str, namespace: str) -> None: + """ + Resume (scale up) a previously paused workload. + + Args: + sandbox_id: Unique sandbox identifier + namespace: Kubernetes namespace + """ + pass + def supports_image_auth(self) -> bool: """ Whether this provider supports per-request image pull authentication. diff --git a/server/src/services/sandbox_service.py b/server/src/services/sandbox_service.py index 6831d939a..2f041dd39 100644 --- a/server/src/services/sandbox_service.py +++ b/server/src/services/sandbox_service.py @@ -21,6 +21,7 @@ from abc import ABC, abstractmethod import socket +from typing import Any, Optional from uuid import uuid4 from src.api.schema import ( @@ -222,3 +223,37 @@ def get_endpoint(self, sandbox_id: str, port: int, resolve_internal: bool = Fals HTTPException: If sandbox not found or endpoint not available """ pass + + @abstractmethod + def get_sandbox_logs( + self, + sandbox_id: str, + tail_lines: int = 100, + follow: bool = False, + ) -> Any: + """ + Get logs from a sandbox. + + Args: + sandbox_id: Unique sandbox identifier + tail_lines: Number of lines from the end + follow: Stream logs in real time + + Returns: + str when follow=False, streaming response when follow=True + """ + pass + + @abstractmethod + def exec_sandbox_terminal(self, sandbox_id: str, command: str = "/bin/bash") -> Any: + """ + Open an interactive terminal session to a sandbox. + + Args: + sandbox_id: Unique sandbox identifier + command: Shell command to execute + + Returns: + A bidirectional stream object + """ + pass